fix(scrapers): cookie fast paths, centralized storage, and reauth hardening (#363)
## Problem Scraper cookie handling was fragmented across per-platform files with no shared access, httpx scrapers lacked `checking_login` fast paths on login, and several re-auth edge cases (CodeChef submit, CF cookie guard, AtCoder cookie persistence) caused unnecessary full re-logins or silent failures. ## Solution Centralize all cookie storage into a single `cookies.json` via helpers in `base.py`. Add `checking_login` fast paths to `kattis.py` (using the `x-username` response header as a session probe), `usaco.py`, and `cses.py` login flows. Fix `kattis.py` submit to emit `checking_login` only after loading cookies. Remove AtCoder cookie persistence from login entirely — always do a fresh session. Harden CodeChef and CF reauth with consistent status logging and cookie guard checks.
This commit is contained in:
parent
564f9286da
commit
b7ddf4c253
10 changed files with 813 additions and 194 deletions
|
|
@ -16,7 +16,7 @@ from bs4 import BeautifulSoup, Tag
|
|||
from requests.adapters import HTTPAdapter
|
||||
from urllib3.util.retry import Retry
|
||||
|
||||
from .base import BaseScraper, extract_precision
|
||||
from .base import BaseScraper, clear_platform_cookies, extract_precision, load_platform_cookies, save_platform_cookies
|
||||
from .models import (
|
||||
ContestListResult,
|
||||
ContestSummary,
|
||||
|
|
@ -379,26 +379,15 @@ def _ensure_browser() -> None:
|
|||
break
|
||||
|
||||
|
||||
def _login_headless(credentials: dict[str, str]) -> LoginResult:
|
||||
try:
|
||||
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
|
||||
except ImportError:
|
||||
return LoginResult(
|
||||
success=False,
|
||||
error="scrapling is required for AtCoder login. Install it: uv add 'scrapling[fetchers]>=0.4'",
|
||||
)
|
||||
def _at_check_logged_in(page) -> bool:
|
||||
return page.evaluate(
|
||||
"() => Array.from(document.querySelectorAll('a')).some(a => a.textContent.trim() === 'Sign Out')"
|
||||
)
|
||||
|
||||
_ensure_browser()
|
||||
|
||||
logged_in = False
|
||||
def _at_login_action(credentials: dict[str, str]):
|
||||
login_error: str | None = None
|
||||
|
||||
def check_login(page):
|
||||
nonlocal logged_in
|
||||
logged_in = page.evaluate(
|
||||
"() => Array.from(document.querySelectorAll('a')).some(a => a.textContent.trim() === 'Sign Out')"
|
||||
)
|
||||
|
||||
def login_action(page):
|
||||
nonlocal login_error
|
||||
try:
|
||||
|
|
@ -412,6 +401,45 @@ def _login_headless(credentials: dict[str, str]) -> LoginResult:
|
|||
except Exception as e:
|
||||
login_error = str(e)
|
||||
|
||||
return login_action, lambda: login_error
|
||||
|
||||
|
||||
def _login_headless(credentials: dict[str, str]) -> LoginResult:
|
||||
try:
|
||||
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
|
||||
except ImportError:
|
||||
return LoginResult(
|
||||
success=False,
|
||||
error="scrapling is required for AtCoder login. Install it: uv add 'scrapling[fetchers]>=0.4'",
|
||||
)
|
||||
|
||||
_ensure_browser()
|
||||
|
||||
saved_cookies = load_platform_cookies("atcoder") or []
|
||||
|
||||
if saved_cookies:
|
||||
print(json.dumps({"status": "checking_login"}), flush=True)
|
||||
logged_in = False
|
||||
|
||||
def check_action(page):
|
||||
nonlocal logged_in
|
||||
logged_in = _at_check_logged_in(page)
|
||||
|
||||
try:
|
||||
with StealthySession(
|
||||
headless=True,
|
||||
timeout=BROWSER_SESSION_TIMEOUT,
|
||||
google_search=False,
|
||||
cookies=saved_cookies,
|
||||
) as session:
|
||||
session.fetch(f"{BASE_URL}/home", page_action=check_action, network_idle=True)
|
||||
if logged_in:
|
||||
return LoginResult(success=True, error="")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
login_action, get_error = _at_login_action(credentials)
|
||||
|
||||
try:
|
||||
with StealthySession(
|
||||
headless=True,
|
||||
|
|
@ -424,16 +452,26 @@ def _login_headless(credentials: dict[str, str]) -> LoginResult:
|
|||
page_action=login_action,
|
||||
solve_cloudflare=True,
|
||||
)
|
||||
login_error = get_error()
|
||||
if login_error:
|
||||
return LoginResult(success=False, error=f"Login failed: {login_error}")
|
||||
|
||||
session.fetch(
|
||||
f"{BASE_URL}/home", page_action=check_login, network_idle=True
|
||||
)
|
||||
logged_in = False
|
||||
|
||||
def verify_action(page):
|
||||
nonlocal logged_in
|
||||
logged_in = _at_check_logged_in(page)
|
||||
|
||||
session.fetch(f"{BASE_URL}/home", page_action=verify_action, network_idle=True)
|
||||
if not logged_in:
|
||||
return LoginResult(
|
||||
success=False, error="Login failed (bad credentials?)"
|
||||
)
|
||||
return LoginResult(success=False, error="Login failed (bad credentials?)")
|
||||
|
||||
try:
|
||||
browser_cookies = session.context.cookies()
|
||||
if browser_cookies:
|
||||
save_platform_cookies("atcoder", browser_cookies)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return LoginResult(success=True, error="")
|
||||
except Exception as e:
|
||||
|
|
@ -446,6 +484,7 @@ def _submit_headless(
|
|||
file_path: str,
|
||||
language_id: str,
|
||||
credentials: dict[str, str],
|
||||
_retried: bool = False,
|
||||
) -> "SubmitResult":
|
||||
try:
|
||||
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
|
||||
|
|
@ -457,26 +496,24 @@ def _submit_headless(
|
|||
|
||||
_ensure_browser()
|
||||
|
||||
login_error: str | None = None
|
||||
submit_error: str | None = None
|
||||
saved_cookies: list[dict[str, Any]] = []
|
||||
if not _retried:
|
||||
saved_cookies = load_platform_cookies("atcoder") or []
|
||||
|
||||
def login_action(page):
|
||||
nonlocal login_error
|
||||
try:
|
||||
_solve_turnstile(page)
|
||||
page.fill('input[name="username"]', credentials.get("username", ""))
|
||||
page.fill('input[name="password"]', credentials.get("password", ""))
|
||||
page.click("#submit")
|
||||
page.wait_for_url(
|
||||
lambda url: "/login" not in url, timeout=BROWSER_NAV_TIMEOUT
|
||||
)
|
||||
except Exception as e:
|
||||
login_error = str(e)
|
||||
logged_in = bool(saved_cookies)
|
||||
submit_error: str | None = None
|
||||
needs_relogin = False
|
||||
|
||||
def check_login(page):
|
||||
nonlocal logged_in
|
||||
logged_in = _at_check_logged_in(page)
|
||||
|
||||
login_action, get_login_error = _at_login_action(credentials)
|
||||
|
||||
def submit_action(page):
|
||||
nonlocal submit_error
|
||||
nonlocal submit_error, needs_relogin
|
||||
if "/login" in page.url:
|
||||
submit_error = "Not logged in after login step"
|
||||
needs_relogin = True
|
||||
return
|
||||
try:
|
||||
_solve_turnstile(page)
|
||||
|
|
@ -488,18 +525,12 @@ def _submit_headless(
|
|||
f'select[name="data.LanguageId"] option[value="{language_id}"]'
|
||||
).wait_for(state="attached", timeout=BROWSER_ELEMENT_WAIT)
|
||||
page.select_option('select[name="data.LanguageId"]', language_id)
|
||||
ext = _LANGUAGE_ID_EXTENSION.get(
|
||||
language_id, Path(file_path).suffix.lstrip(".") or "txt"
|
||||
page.set_input_files("#input-open-file", file_path)
|
||||
page.wait_for_function(
|
||||
"() => { const ta = document.getElementById('plain-textarea'); return ta && ta.value.length > 0; }",
|
||||
timeout=BROWSER_ELEMENT_WAIT,
|
||||
)
|
||||
page.set_input_files(
|
||||
"#input-open-file",
|
||||
{
|
||||
"name": f"solution.{ext}",
|
||||
"mimeType": "text/plain",
|
||||
"buffer": Path(file_path).read_bytes(),
|
||||
},
|
||||
)
|
||||
page.locator('button[type="submit"]').click(no_wait_after=True)
|
||||
page.evaluate("document.getElementById('submit').click()")
|
||||
page.wait_for_url(
|
||||
lambda url: "/submissions/me" in url,
|
||||
timeout=BROWSER_SUBMIT_NAV_TIMEOUT["atcoder"],
|
||||
|
|
@ -512,15 +543,29 @@ def _submit_headless(
|
|||
headless=True,
|
||||
timeout=BROWSER_SESSION_TIMEOUT,
|
||||
google_search=False,
|
||||
cookies=saved_cookies if saved_cookies else [],
|
||||
) as session:
|
||||
print(json.dumps({"status": "logging_in"}), flush=True)
|
||||
session.fetch(
|
||||
f"{BASE_URL}/login",
|
||||
page_action=login_action,
|
||||
solve_cloudflare=True,
|
||||
)
|
||||
if login_error:
|
||||
return SubmitResult(success=False, error=f"Login failed: {login_error}")
|
||||
if not _retried and saved_cookies:
|
||||
print(json.dumps({"status": "checking_login"}), flush=True)
|
||||
session.fetch(f"{BASE_URL}/home", page_action=check_login, network_idle=True)
|
||||
|
||||
if not logged_in:
|
||||
print(json.dumps({"status": "logging_in"}), flush=True)
|
||||
session.fetch(
|
||||
f"{BASE_URL}/login",
|
||||
page_action=login_action,
|
||||
solve_cloudflare=True,
|
||||
)
|
||||
login_error = get_login_error()
|
||||
if login_error:
|
||||
return SubmitResult(success=False, error=f"Login failed: {login_error}")
|
||||
logged_in = True
|
||||
try:
|
||||
browser_cookies = session.context.cookies()
|
||||
if browser_cookies:
|
||||
save_platform_cookies("atcoder", browser_cookies)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
print(json.dumps({"status": "submitting"}), flush=True)
|
||||
session.fetch(
|
||||
|
|
@ -529,12 +574,16 @@ def _submit_headless(
|
|||
solve_cloudflare=True,
|
||||
)
|
||||
|
||||
if needs_relogin and not _retried:
|
||||
clear_platform_cookies("atcoder")
|
||||
return _submit_headless(
|
||||
contest_id, problem_id, file_path, language_id, credentials, _retried=True
|
||||
)
|
||||
|
||||
if submit_error:
|
||||
return SubmitResult(success=False, error=submit_error)
|
||||
|
||||
return SubmitResult(
|
||||
success=True, error="", submission_id="", verdict="submitted"
|
||||
)
|
||||
return SubmitResult(success=True, error="", submission_id="", verdict="submitted")
|
||||
except Exception as e:
|
||||
return SubmitResult(success=False, error=str(e))
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,38 @@ import os
|
|||
import re
|
||||
import sys
|
||||
from abc import ABC, abstractmethod
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
_COOKIE_FILE = Path.home() / ".cache" / "cp-nvim" / "cookies.json"
|
||||
|
||||
|
||||
def load_platform_cookies(platform: str) -> Any | None:
|
||||
try:
|
||||
data = json.loads(_COOKIE_FILE.read_text())
|
||||
return data.get(platform)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def save_platform_cookies(platform: str, data: Any) -> None:
|
||||
_COOKIE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||
try:
|
||||
existing = json.loads(_COOKIE_FILE.read_text())
|
||||
except Exception:
|
||||
existing = {}
|
||||
existing[platform] = data
|
||||
_COOKIE_FILE.write_text(json.dumps(existing))
|
||||
|
||||
|
||||
def clear_platform_cookies(platform: str) -> None:
|
||||
try:
|
||||
existing = json.loads(_COOKIE_FILE.read_text())
|
||||
existing.pop(platform, None)
|
||||
_COOKIE_FILE.write_text(json.dumps(existing))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
from .language_ids import get_language_id
|
||||
from .models import (
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ from typing import Any
|
|||
|
||||
import httpx
|
||||
|
||||
from .base import BaseScraper
|
||||
from .base import BaseScraper, clear_platform_cookies, load_platform_cookies, save_platform_cookies
|
||||
from .timeouts import BROWSER_SESSION_TIMEOUT, HTTP_TIMEOUT
|
||||
from .models import (
|
||||
ContestListResult,
|
||||
|
|
@ -31,7 +31,6 @@ HEADERS = {
|
|||
}
|
||||
CONNECTIONS = 8
|
||||
|
||||
_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "codechef-cookies.json"
|
||||
|
||||
_CC_CHECK_LOGIN_JS = "() => !!document.querySelector('a[href*=\"/users/\"]')"
|
||||
|
||||
|
|
@ -67,8 +66,6 @@ def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult:
|
|||
|
||||
_ensure_browser()
|
||||
|
||||
_COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
logged_in = False
|
||||
login_error: str | None = None
|
||||
|
||||
|
|
@ -85,7 +82,7 @@ def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult:
|
|||
try:
|
||||
page.wait_for_url(lambda url: "/login" not in url, timeout=3000)
|
||||
except Exception:
|
||||
login_error = "Login failed (bad credentials?)"
|
||||
login_error = "bad credentials?"
|
||||
return
|
||||
except Exception as e:
|
||||
login_error = str(e)
|
||||
|
|
@ -99,7 +96,7 @@ def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult:
|
|||
print(json.dumps({"status": "logging_in"}), flush=True)
|
||||
session.fetch(f"{BASE_URL}/login", page_action=login_action)
|
||||
if login_error:
|
||||
return LoginResult(success=False, error=f"Login failed: {login_error}")
|
||||
return LoginResult(success=False, error=login_error)
|
||||
|
||||
session.fetch(f"{BASE_URL}/", page_action=check_login, network_idle=True)
|
||||
if not logged_in:
|
||||
|
|
@ -110,7 +107,7 @@ def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult:
|
|||
try:
|
||||
browser_cookies = session.context.cookies()
|
||||
if browser_cookies:
|
||||
_COOKIE_PATH.write_text(json.dumps(browser_cookies))
|
||||
save_platform_cookies("codechef", browser_cookies)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
|
@ -126,6 +123,7 @@ def _submit_headless_codechef(
|
|||
language_id: str,
|
||||
credentials: dict[str, str],
|
||||
_retried: bool = False,
|
||||
_practice: bool = False,
|
||||
) -> SubmitResult:
|
||||
source_code = Path(file_path).read_text()
|
||||
|
||||
|
|
@ -141,15 +139,11 @@ def _submit_headless_codechef(
|
|||
|
||||
_ensure_browser()
|
||||
|
||||
_COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
saved_cookies: list[dict[str, Any]] = []
|
||||
if _COOKIE_PATH.exists() and not _retried:
|
||||
try:
|
||||
saved_cookies = json.loads(_COOKIE_PATH.read_text())
|
||||
except Exception:
|
||||
pass
|
||||
if not _retried:
|
||||
saved_cookies = load_platform_cookies("codechef") or []
|
||||
|
||||
logged_in = bool(saved_cookies) and not _retried
|
||||
logged_in = bool(saved_cookies)
|
||||
login_error: str | None = None
|
||||
submit_error: str | None = None
|
||||
needs_relogin = False
|
||||
|
|
@ -167,7 +161,7 @@ def _submit_headless_codechef(
|
|||
try:
|
||||
page.wait_for_url(lambda url: "/login" not in url, timeout=3000)
|
||||
except Exception:
|
||||
login_error = "Login failed (bad credentials?)"
|
||||
login_error = "bad credentials?"
|
||||
return
|
||||
except Exception as e:
|
||||
login_error = str(e)
|
||||
|
|
@ -213,7 +207,9 @@ def _submit_headless_codechef(
|
|||
const d = document.querySelector('[role="dialog"], .swal2-popup');
|
||||
return d ? d.textContent.trim() : null;
|
||||
}""")
|
||||
if dialog_text and (
|
||||
if dialog_text and "login" in dialog_text.lower():
|
||||
needs_relogin = True
|
||||
elif dialog_text and (
|
||||
"not available for accepting solutions" in dialog_text
|
||||
or "not available for submission" in dialog_text
|
||||
):
|
||||
|
|
@ -228,23 +224,23 @@ def _submit_headless_codechef(
|
|||
headless=True,
|
||||
timeout=BROWSER_SESSION_TIMEOUT,
|
||||
google_search=False,
|
||||
cookies=saved_cookies if (saved_cookies and not _retried) else [],
|
||||
cookies=saved_cookies if saved_cookies else [],
|
||||
) as session:
|
||||
if not logged_in:
|
||||
if not _retried and not _practice:
|
||||
print(json.dumps({"status": "checking_login"}), flush=True)
|
||||
session.fetch(
|
||||
f"{BASE_URL}/", page_action=check_login, network_idle=True
|
||||
)
|
||||
session.fetch(f"{BASE_URL}/", page_action=check_login)
|
||||
|
||||
if not logged_in:
|
||||
print(json.dumps({"status": "logging_in"}), flush=True)
|
||||
session.fetch(f"{BASE_URL}/login", page_action=login_action)
|
||||
if login_error:
|
||||
return SubmitResult(
|
||||
success=False, error=f"Login failed: {login_error}"
|
||||
success=False, error=login_error
|
||||
)
|
||||
logged_in = True
|
||||
|
||||
print(json.dumps({"status": "submitting"}), flush=True)
|
||||
if not _practice:
|
||||
print(json.dumps({"status": "submitting"}), flush=True)
|
||||
submit_url = (
|
||||
f"{BASE_URL}/submit/{problem_id}"
|
||||
if contest_id == "PRACTICE"
|
||||
|
|
@ -255,12 +251,12 @@ def _submit_headless_codechef(
|
|||
try:
|
||||
browser_cookies = session.context.cookies()
|
||||
if browser_cookies and logged_in:
|
||||
_COOKIE_PATH.write_text(json.dumps(browser_cookies))
|
||||
save_platform_cookies("codechef", browser_cookies)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if needs_relogin and not _retried:
|
||||
_COOKIE_PATH.unlink(missing_ok=True)
|
||||
clear_platform_cookies("codechef")
|
||||
return _submit_headless_codechef(
|
||||
contest_id,
|
||||
problem_id,
|
||||
|
|
@ -270,14 +266,14 @@ def _submit_headless_codechef(
|
|||
_retried=True,
|
||||
)
|
||||
|
||||
if submit_error == "PRACTICE_FALLBACK" and not _retried:
|
||||
if submit_error == "PRACTICE_FALLBACK" and not _practice:
|
||||
return _submit_headless_codechef(
|
||||
"PRACTICE",
|
||||
problem_id,
|
||||
file_path,
|
||||
language_id,
|
||||
credentials,
|
||||
_retried=True,
|
||||
_practice=True,
|
||||
)
|
||||
|
||||
if submit_error:
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ from typing import Any
|
|||
import requests
|
||||
from bs4 import BeautifulSoup, Tag
|
||||
|
||||
from .base import BaseScraper, extract_precision
|
||||
from .base import BaseScraper, clear_platform_cookies, extract_precision, load_platform_cookies, save_platform_cookies
|
||||
from .models import (
|
||||
ContestListResult,
|
||||
ContestSummary,
|
||||
|
|
@ -331,9 +331,33 @@ class CodeforcesScraper(BaseScraper):
|
|||
return await asyncio.to_thread(_login_headless_cf, credentials)
|
||||
|
||||
|
||||
def _login_headless_cf(credentials: dict[str, str]) -> LoginResult:
|
||||
from pathlib import Path
|
||||
def _cf_check_logged_in(page) -> bool:
|
||||
return page.evaluate(
|
||||
"() => Array.from(document.querySelectorAll('a'))"
|
||||
".some(a => a.textContent.includes('Logout'))"
|
||||
)
|
||||
|
||||
|
||||
def _cf_login_action(credentials: dict[str, str]):
|
||||
login_error: str | None = None
|
||||
|
||||
def login_action(page):
|
||||
nonlocal login_error
|
||||
try:
|
||||
page.wait_for_selector('input[name="handleOrEmail"]', timeout=60000)
|
||||
page.fill('input[name="handleOrEmail"]', credentials.get("username", ""))
|
||||
page.fill('input[name="password"]', credentials.get("password", ""))
|
||||
page.locator('#enterForm input[type="submit"]').click()
|
||||
page.wait_for_url(
|
||||
lambda url: "/enter" not in url, timeout=BROWSER_NAV_TIMEOUT
|
||||
)
|
||||
except Exception as e:
|
||||
login_error = str(e)
|
||||
|
||||
return login_action, lambda: login_error
|
||||
|
||||
|
||||
def _login_headless_cf(credentials: dict[str, str]) -> LoginResult:
|
||||
try:
|
||||
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
|
||||
except ImportError:
|
||||
|
|
@ -346,36 +370,30 @@ def _login_headless_cf(credentials: dict[str, str]) -> LoginResult:
|
|||
|
||||
_ensure_browser()
|
||||
|
||||
cookie_cache = Path.home() / ".cache" / "cp-nvim" / "codeforces-cookies.json"
|
||||
cookie_cache.parent.mkdir(parents=True, exist_ok=True)
|
||||
saved_cookies = load_platform_cookies("codeforces") or []
|
||||
|
||||
logged_in = False
|
||||
login_error: str | None = None
|
||||
if saved_cookies:
|
||||
print(json.dumps({"status": "checking_login"}), flush=True)
|
||||
logged_in = False
|
||||
|
||||
def check_login(page):
|
||||
nonlocal logged_in
|
||||
logged_in = page.evaluate(
|
||||
"() => Array.from(document.querySelectorAll('a'))"
|
||||
".some(a => a.textContent.includes('Logout'))"
|
||||
)
|
||||
def check_action(page):
|
||||
nonlocal logged_in
|
||||
logged_in = _cf_check_logged_in(page)
|
||||
|
||||
def login_action(page):
|
||||
nonlocal login_error
|
||||
try:
|
||||
page.fill(
|
||||
'input[name="handleOrEmail"]',
|
||||
credentials.get("username", ""),
|
||||
)
|
||||
page.fill(
|
||||
'input[name="password"]',
|
||||
credentials.get("password", ""),
|
||||
)
|
||||
page.locator('#enterForm input[type="submit"]').click()
|
||||
page.wait_for_url(
|
||||
lambda url: "/enter" not in url, timeout=BROWSER_NAV_TIMEOUT
|
||||
)
|
||||
except Exception as e:
|
||||
login_error = str(e)
|
||||
with StealthySession(
|
||||
headless=True,
|
||||
timeout=BROWSER_SESSION_TIMEOUT,
|
||||
google_search=False,
|
||||
cookies=saved_cookies,
|
||||
) as session:
|
||||
session.fetch(f"{BASE_URL}/", page_action=check_action, solve_cloudflare=True)
|
||||
if logged_in:
|
||||
return LoginResult(success=True, error="")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
login_action, get_error = _cf_login_action(credentials)
|
||||
|
||||
try:
|
||||
with StealthySession(
|
||||
|
|
@ -389,23 +407,24 @@ def _login_headless_cf(credentials: dict[str, str]) -> LoginResult:
|
|||
page_action=login_action,
|
||||
solve_cloudflare=True,
|
||||
)
|
||||
login_error = get_error()
|
||||
if login_error:
|
||||
return LoginResult(success=False, error=f"Login failed: {login_error}")
|
||||
|
||||
session.fetch(
|
||||
f"{BASE_URL}/",
|
||||
page_action=check_login,
|
||||
network_idle=True,
|
||||
)
|
||||
logged_in = False
|
||||
|
||||
def verify_action(page):
|
||||
nonlocal logged_in
|
||||
logged_in = _cf_check_logged_in(page)
|
||||
|
||||
session.fetch(f"{BASE_URL}/", page_action=verify_action, network_idle=True)
|
||||
if not logged_in:
|
||||
return LoginResult(
|
||||
success=False, error="Login failed (bad credentials?)"
|
||||
)
|
||||
return LoginResult(success=False, error="Login failed (bad credentials?)")
|
||||
|
||||
try:
|
||||
browser_cookies = session.context.cookies()
|
||||
if any(c.get("name") == "X-User-Handle" for c in browser_cookies):
|
||||
cookie_cache.write_text(json.dumps(browser_cookies))
|
||||
if any(c.get("name") == "X-User-Sha1" for c in browser_cookies):
|
||||
save_platform_cookies("codeforces", browser_cookies)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
|
@ -426,6 +445,7 @@ def _submit_headless(
|
|||
|
||||
source_code = Path(file_path).read_text()
|
||||
|
||||
|
||||
try:
|
||||
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
|
||||
except ImportError:
|
||||
|
|
@ -438,44 +458,19 @@ def _submit_headless(
|
|||
|
||||
_ensure_browser()
|
||||
|
||||
cookie_cache = Path.home() / ".cache" / "cp-nvim" / "codeforces-cookies.json"
|
||||
cookie_cache.parent.mkdir(parents=True, exist_ok=True)
|
||||
saved_cookies: list[dict[str, Any]] = []
|
||||
if cookie_cache.exists():
|
||||
try:
|
||||
saved_cookies = json.loads(cookie_cache.read_text())
|
||||
except Exception:
|
||||
pass
|
||||
if not _retried:
|
||||
saved_cookies = load_platform_cookies("codeforces") or []
|
||||
|
||||
logged_in = cookie_cache.exists() and not _retried
|
||||
login_error: str | None = None
|
||||
logged_in = bool(saved_cookies)
|
||||
submit_error: str | None = None
|
||||
needs_relogin = False
|
||||
|
||||
def check_login(page):
|
||||
nonlocal logged_in
|
||||
logged_in = page.evaluate(
|
||||
"() => Array.from(document.querySelectorAll('a'))"
|
||||
".some(a => a.textContent.includes('Logout'))"
|
||||
)
|
||||
logged_in = _cf_check_logged_in(page)
|
||||
|
||||
def login_action(page):
|
||||
nonlocal login_error
|
||||
try:
|
||||
page.fill(
|
||||
'input[name="handleOrEmail"]',
|
||||
credentials.get("username", ""),
|
||||
)
|
||||
page.fill(
|
||||
'input[name="password"]',
|
||||
credentials.get("password", ""),
|
||||
)
|
||||
page.locator('#enterForm input[type="submit"]').click()
|
||||
page.wait_for_url(
|
||||
lambda url: "/enter" not in url, timeout=BROWSER_NAV_TIMEOUT
|
||||
)
|
||||
except Exception as e:
|
||||
login_error = str(e)
|
||||
_login_action, _get_login_error = _cf_login_action(credentials)
|
||||
|
||||
def submit_action(page):
|
||||
nonlocal submit_error, needs_relogin
|
||||
|
|
@ -520,27 +515,25 @@ def _submit_headless(
|
|||
headless=True,
|
||||
timeout=BROWSER_SESSION_TIMEOUT,
|
||||
google_search=False,
|
||||
cookies=saved_cookies if (cookie_cache.exists() and not _retried) else [],
|
||||
cookies=saved_cookies if saved_cookies else [],
|
||||
) as session:
|
||||
if not (cookie_cache.exists() and not _retried):
|
||||
if not _retried and saved_cookies:
|
||||
print(json.dumps({"status": "checking_login"}), flush=True)
|
||||
session.fetch(
|
||||
f"{BASE_URL}/",
|
||||
page_action=check_login,
|
||||
network_idle=True,
|
||||
)
|
||||
session.fetch(f"{BASE_URL}/", page_action=check_login, solve_cloudflare=True)
|
||||
|
||||
if not logged_in:
|
||||
print(json.dumps({"status": "logging_in"}), flush=True)
|
||||
session.fetch(
|
||||
f"{BASE_URL}/enter",
|
||||
page_action=login_action,
|
||||
page_action=_login_action,
|
||||
solve_cloudflare=True,
|
||||
)
|
||||
login_error = _get_login_error()
|
||||
if login_error:
|
||||
return SubmitResult(
|
||||
success=False, error=f"Login failed: {login_error}"
|
||||
)
|
||||
logged_in = True
|
||||
|
||||
print(json.dumps({"status": "submitting"}), flush=True)
|
||||
session.fetch(
|
||||
|
|
@ -551,13 +544,13 @@ def _submit_headless(
|
|||
|
||||
try:
|
||||
browser_cookies = session.context.cookies()
|
||||
if any(c.get("name") == "X-User-Handle" for c in browser_cookies):
|
||||
cookie_cache.write_text(json.dumps(browser_cookies))
|
||||
if any(c.get("name") == "X-User-Sha1" for c in browser_cookies):
|
||||
save_platform_cookies("codeforces", browser_cookies)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if needs_relogin and not _retried:
|
||||
cookie_cache.unlink(missing_ok=True)
|
||||
clear_platform_cookies("codeforces")
|
||||
return _submit_headless(
|
||||
contest_id,
|
||||
problem_id,
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ from pathlib import Path
|
|||
|
||||
import httpx
|
||||
|
||||
from .base import BaseScraper, extract_precision
|
||||
from .base import BaseScraper, clear_platform_cookies, extract_precision, load_platform_cookies, save_platform_cookies
|
||||
from .timeouts import HTTP_TIMEOUT
|
||||
from .models import (
|
||||
ContestListResult,
|
||||
|
|
@ -28,8 +28,6 @@ HEADERS = {
|
|||
}
|
||||
CONNECTIONS = 8
|
||||
|
||||
_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "kattis-cookies.json"
|
||||
|
||||
TIME_RE = re.compile(
|
||||
r"CPU Time limit</span>\s*<span[^>]*>\s*(\d+)\s*seconds?\s*</span>",
|
||||
re.DOTALL,
|
||||
|
|
@ -209,20 +207,24 @@ async def _stream_single_problem(client: httpx.AsyncClient, slug: str) -> None:
|
|||
|
||||
|
||||
async def _load_kattis_cookies(client: httpx.AsyncClient) -> None:
|
||||
if not _COOKIE_PATH.exists():
|
||||
return
|
||||
try:
|
||||
for k, v in json.loads(_COOKIE_PATH.read_text()).items():
|
||||
data = load_platform_cookies("kattis")
|
||||
if isinstance(data, dict):
|
||||
for k, v in data.items():
|
||||
client.cookies.set(k, v)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
async def _save_kattis_cookies(client: httpx.AsyncClient) -> None:
|
||||
cookies = {k: v for k, v in client.cookies.items()}
|
||||
cookies = dict(client.cookies.items())
|
||||
if cookies:
|
||||
_COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
_COOKIE_PATH.write_text(json.dumps(cookies))
|
||||
save_platform_cookies("kattis", cookies)
|
||||
|
||||
|
||||
async def _check_kattis_login(client: httpx.AsyncClient) -> bool:
|
||||
try:
|
||||
r = await client.get(BASE_URL, headers=HEADERS, timeout=HTTP_TIMEOUT)
|
||||
return bool(r.headers.get("x-username"))
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
async def _do_kattis_login(
|
||||
|
|
@ -329,9 +331,10 @@ class KattisScraper(BaseScraper):
|
|||
return self._submit_error("Missing credentials. Use :CP kattis login")
|
||||
|
||||
async with httpx.AsyncClient(follow_redirects=True) as client:
|
||||
print(json.dumps({"status": "checking_login"}), flush=True)
|
||||
await _load_kattis_cookies(client)
|
||||
if not client.cookies:
|
||||
if client.cookies:
|
||||
print(json.dumps({"status": "checking_login"}), flush=True)
|
||||
else:
|
||||
print(json.dumps({"status": "logging_in"}), flush=True)
|
||||
ok = await _do_kattis_login(client, username, password)
|
||||
if not ok:
|
||||
|
|
@ -368,7 +371,7 @@ class KattisScraper(BaseScraper):
|
|||
return self._submit_error(f"Submit request failed: {e}")
|
||||
|
||||
if r.status_code in (400, 403) or r.text == "Request validation failed":
|
||||
_COOKIE_PATH.unlink(missing_ok=True)
|
||||
clear_platform_cookies("kattis")
|
||||
print(json.dumps({"status": "logging_in"}), flush=True)
|
||||
ok = await _do_kattis_login(client, username, password)
|
||||
if not ok:
|
||||
|
|
@ -399,6 +402,16 @@ class KattisScraper(BaseScraper):
|
|||
return self._login_error("Missing username or password")
|
||||
|
||||
async with httpx.AsyncClient(follow_redirects=True) as client:
|
||||
await _load_kattis_cookies(client)
|
||||
if client.cookies:
|
||||
print(json.dumps({"status": "checking_login"}), flush=True)
|
||||
if await _check_kattis_login(client):
|
||||
return LoginResult(
|
||||
success=True,
|
||||
error="",
|
||||
credentials={"username": username, "password": password},
|
||||
)
|
||||
|
||||
print(json.dumps({"status": "logging_in"}), flush=True)
|
||||
ok = await _do_kattis_login(client, username, password)
|
||||
if not ok:
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ from typing import Any, cast
|
|||
|
||||
import httpx
|
||||
|
||||
from .base import BaseScraper, extract_precision
|
||||
from .base import BaseScraper, extract_precision, load_platform_cookies, save_platform_cookies
|
||||
from .timeouts import HTTP_TIMEOUT
|
||||
from .models import (
|
||||
ContestListResult,
|
||||
|
|
@ -27,7 +27,6 @@ HEADERS = {
|
|||
}
|
||||
CONNECTIONS = 4
|
||||
|
||||
_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "usaco-cookies.json"
|
||||
_LOGIN_PATH = "/current/tpcm/login-session.php"
|
||||
_SUBMIT_PATH = "/current/tpcm/submit-solution.php"
|
||||
|
||||
|
|
@ -202,20 +201,16 @@ def _parse_submit_form(
|
|||
|
||||
|
||||
async def _load_usaco_cookies(client: httpx.AsyncClient) -> None:
|
||||
if not _COOKIE_PATH.exists():
|
||||
return
|
||||
try:
|
||||
for k, v in json.loads(_COOKIE_PATH.read_text()).items():
|
||||
data = load_platform_cookies("usaco")
|
||||
if isinstance(data, dict):
|
||||
for k, v in data.items():
|
||||
client.cookies.set(k, v)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
async def _save_usaco_cookies(client: httpx.AsyncClient) -> None:
|
||||
cookies = {k: v for k, v in client.cookies.items()}
|
||||
cookies = dict(client.cookies.items())
|
||||
if cookies:
|
||||
_COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
_COOKIE_PATH.write_text(json.dumps(cookies))
|
||||
save_platform_cookies("usaco", cookies)
|
||||
|
||||
|
||||
async def _check_usaco_login(client: httpx.AsyncClient, username: str) -> bool:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue