diff --git a/lua/cp/constants.lua b/lua/cp/constants.lua index ec3c384..3a2e27d 100644 --- a/lua/cp/constants.lua +++ b/lua/cp/constants.lua @@ -219,4 +219,6 @@ M.LANGUAGE_VERSIONS = { M.DEFAULT_VERSIONS = { cpp = 'c++20', python = 'python3' } +M.COOKIE_FILE = vim.fn.expand('~/.cache/cp-nvim/cookies.json') + return M diff --git a/lua/cp/credentials.lua b/lua/cp/credentials.lua index 637ed23..cf3d29e 100644 --- a/lua/cp/credentials.lua +++ b/lua/cp/credentials.lua @@ -38,6 +38,7 @@ local function prompt_and_login(platform, display) end, function(result) vim.schedule(function() if result.success then + cache.set_credentials(platform, credentials) logger.log( display .. ' login successful', { level = vim.log.levels.INFO, override = true } @@ -105,6 +106,14 @@ function M.logout(platform) local display = constants.PLATFORM_DISPLAY_NAMES[platform] or platform cache.load() cache.clear_credentials(platform) + local cookie_file = constants.COOKIE_FILE + if vim.fn.filereadable(cookie_file) == 1 then + local ok, data = pcall(vim.fn.json_decode, vim.fn.readfile(cookie_file, 'b')) + if ok and type(data) == 'table' then + data[platform] = nil + vim.fn.writefile({ vim.fn.json_encode(data) }, cookie_file) + end + end logger.log(display .. ' credentials cleared', { level = vim.log.levels.INFO, override = true }) end diff --git a/scrapers/atcoder.py b/scrapers/atcoder.py index a9876cf..c52190b 100644 --- a/scrapers/atcoder.py +++ b/scrapers/atcoder.py @@ -6,7 +6,6 @@ import os import re import subprocess import time -from pathlib import Path from typing import Any import backoff @@ -16,7 +15,13 @@ from bs4 import BeautifulSoup, Tag from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry -from .base import BaseScraper, extract_precision +from .base import ( + BaseScraper, + clear_platform_cookies, + extract_precision, + load_platform_cookies, + save_platform_cookies, +) from .models import ( ContestListResult, ContestSummary, @@ -379,26 +384,15 @@ def _ensure_browser() -> None: break -def _login_headless(credentials: dict[str, str]) -> LoginResult: - try: - from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import] - except ImportError: - return LoginResult( - success=False, - error="scrapling is required for AtCoder login. Install it: uv add 'scrapling[fetchers]>=0.4'", - ) +def _at_check_logged_in(page) -> bool: + return page.evaluate( + "() => Array.from(document.querySelectorAll('a')).some(a => a.textContent.trim() === 'Sign Out')" + ) - _ensure_browser() - logged_in = False +def _at_login_action(credentials: dict[str, str]): login_error: str | None = None - def check_login(page): - nonlocal logged_in - logged_in = page.evaluate( - "() => Array.from(document.querySelectorAll('a')).some(a => a.textContent.trim() === 'Sign Out')" - ) - def login_action(page): nonlocal login_error try: @@ -412,6 +406,47 @@ def _login_headless(credentials: dict[str, str]) -> LoginResult: except Exception as e: login_error = str(e) + return login_action, lambda: login_error + + +def _login_headless(credentials: dict[str, str]) -> LoginResult: + try: + from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import] + except ImportError: + return LoginResult( + success=False, + error="scrapling is required for AtCoder login. Install it: uv add 'scrapling[fetchers]>=0.4'", + ) + + _ensure_browser() + + saved_cookies = load_platform_cookies("atcoder") or [] + + if saved_cookies: + print(json.dumps({"status": "checking_login"}), flush=True) + logged_in = False + + def check_action(page): + nonlocal logged_in + logged_in = _at_check_logged_in(page) + + try: + with StealthySession( + headless=True, + timeout=BROWSER_SESSION_TIMEOUT, + google_search=False, + cookies=saved_cookies, + ) as session: + session.fetch( + f"{BASE_URL}/home", page_action=check_action, network_idle=True + ) + if logged_in: + return LoginResult(success=True, error="") + except Exception: + pass + + login_action, get_error = _at_login_action(credentials) + try: with StealthySession( headless=True, @@ -424,17 +459,31 @@ def _login_headless(credentials: dict[str, str]) -> LoginResult: page_action=login_action, solve_cloudflare=True, ) + login_error = get_error() if login_error: return LoginResult(success=False, error=f"Login failed: {login_error}") + logged_in = False + + def verify_action(page): + nonlocal logged_in + logged_in = _at_check_logged_in(page) + session.fetch( - f"{BASE_URL}/home", page_action=check_login, network_idle=True + f"{BASE_URL}/home", page_action=verify_action, network_idle=True ) if not logged_in: return LoginResult( success=False, error="Login failed (bad credentials?)" ) + try: + browser_cookies = session.context.cookies() + if browser_cookies: + save_platform_cookies("atcoder", browser_cookies) + except Exception: + pass + return LoginResult(success=True, error="") except Exception as e: return LoginResult(success=False, error=str(e)) @@ -446,6 +495,7 @@ def _submit_headless( file_path: str, language_id: str, credentials: dict[str, str], + _retried: bool = False, ) -> "SubmitResult": try: from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import] @@ -457,26 +507,24 @@ def _submit_headless( _ensure_browser() - login_error: str | None = None - submit_error: str | None = None + saved_cookies: list[dict[str, Any]] = [] + if not _retried: + saved_cookies = load_platform_cookies("atcoder") or [] - def login_action(page): - nonlocal login_error - try: - _solve_turnstile(page) - page.fill('input[name="username"]', credentials.get("username", "")) - page.fill('input[name="password"]', credentials.get("password", "")) - page.click("#submit") - page.wait_for_url( - lambda url: "/login" not in url, timeout=BROWSER_NAV_TIMEOUT - ) - except Exception as e: - login_error = str(e) + logged_in = bool(saved_cookies) + submit_error: str | None = None + needs_relogin = False + + def check_login(page): + nonlocal logged_in + logged_in = _at_check_logged_in(page) + + login_action, get_login_error = _at_login_action(credentials) def submit_action(page): - nonlocal submit_error + nonlocal submit_error, needs_relogin if "/login" in page.url: - submit_error = "Not logged in after login step" + needs_relogin = True return try: _solve_turnstile(page) @@ -488,18 +536,12 @@ def _submit_headless( f'select[name="data.LanguageId"] option[value="{language_id}"]' ).wait_for(state="attached", timeout=BROWSER_ELEMENT_WAIT) page.select_option('select[name="data.LanguageId"]', language_id) - ext = _LANGUAGE_ID_EXTENSION.get( - language_id, Path(file_path).suffix.lstrip(".") or "txt" + page.set_input_files("#input-open-file", file_path) + page.wait_for_function( + "() => { const ta = document.getElementById('plain-textarea'); return ta && ta.value.length > 0; }", + timeout=BROWSER_ELEMENT_WAIT, ) - page.set_input_files( - "#input-open-file", - { - "name": f"solution.{ext}", - "mimeType": "text/plain", - "buffer": Path(file_path).read_bytes(), - }, - ) - page.locator('button[type="submit"]').click(no_wait_after=True) + page.evaluate("document.getElementById('submit').click()") page.wait_for_url( lambda url: "/submissions/me" in url, timeout=BROWSER_SUBMIT_NAV_TIMEOUT["atcoder"], @@ -512,15 +554,33 @@ def _submit_headless( headless=True, timeout=BROWSER_SESSION_TIMEOUT, google_search=False, + cookies=saved_cookies if saved_cookies else [], ) as session: - print(json.dumps({"status": "logging_in"}), flush=True) - session.fetch( - f"{BASE_URL}/login", - page_action=login_action, - solve_cloudflare=True, - ) - if login_error: - return SubmitResult(success=False, error=f"Login failed: {login_error}") + if not _retried and saved_cookies: + print(json.dumps({"status": "checking_login"}), flush=True) + session.fetch( + f"{BASE_URL}/home", page_action=check_login, network_idle=True + ) + + if not logged_in: + print(json.dumps({"status": "logging_in"}), flush=True) + session.fetch( + f"{BASE_URL}/login", + page_action=login_action, + solve_cloudflare=True, + ) + login_error = get_login_error() + if login_error: + return SubmitResult( + success=False, error=f"Login failed: {login_error}" + ) + logged_in = True + try: + browser_cookies = session.context.cookies() + if browser_cookies: + save_platform_cookies("atcoder", browser_cookies) + except Exception: + pass print(json.dumps({"status": "submitting"}), flush=True) session.fetch( @@ -529,6 +589,17 @@ def _submit_headless( solve_cloudflare=True, ) + if needs_relogin and not _retried: + clear_platform_cookies("atcoder") + return _submit_headless( + contest_id, + problem_id, + file_path, + language_id, + credentials, + _retried=True, + ) + if submit_error: return SubmitResult(success=False, error=submit_error) diff --git a/scrapers/base.py b/scrapers/base.py index 11ab8c6..035495a 100644 --- a/scrapers/base.py +++ b/scrapers/base.py @@ -4,6 +4,8 @@ import os import re import sys from abc import ABC, abstractmethod +from pathlib import Path +from typing import Any from .language_ids import get_language_id from .models import ( @@ -15,6 +17,36 @@ from .models import ( TestsResult, ) +_COOKIE_FILE = Path.home() / ".cache" / "cp-nvim" / "cookies.json" + + +def load_platform_cookies(platform: str) -> Any | None: + try: + data = json.loads(_COOKIE_FILE.read_text()) + return data.get(platform) + except Exception: + return None + + +def save_platform_cookies(platform: str, data: Any) -> None: + _COOKIE_FILE.parent.mkdir(parents=True, exist_ok=True) + try: + existing = json.loads(_COOKIE_FILE.read_text()) + except Exception: + existing = {} + existing[platform] = data + _COOKIE_FILE.write_text(json.dumps(existing)) + + +def clear_platform_cookies(platform: str) -> None: + try: + existing = json.loads(_COOKIE_FILE.read_text()) + existing.pop(platform, None) + _COOKIE_FILE.write_text(json.dumps(existing)) + except Exception: + pass + + _PRECISION_ABS_REL_RE = re.compile( r"(?:absolute|relative)\s+error[^.]*?10\s*[\^{]\s*\{?\s*[-\u2212]\s*(\d+)\s*\}?", re.IGNORECASE, diff --git a/scrapers/codechef.py b/scrapers/codechef.py index 0427d5d..b64fdf5 100644 --- a/scrapers/codechef.py +++ b/scrapers/codechef.py @@ -9,8 +9,18 @@ from typing import Any import httpx -from .base import BaseScraper -from .timeouts import BROWSER_SESSION_TIMEOUT, HTTP_TIMEOUT +from .base import ( + BaseScraper, + clear_platform_cookies, + load_platform_cookies, + save_platform_cookies, +) +from .timeouts import ( + BROWSER_ELEMENT_WAIT, + BROWSER_NAV_TIMEOUT, + BROWSER_SESSION_TIMEOUT, + HTTP_TIMEOUT, +) from .models import ( ContestListResult, ContestSummary, @@ -31,7 +41,6 @@ HEADERS = { } CONNECTIONS = 8 -_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "codechef-cookies.json" _CC_CHECK_LOGIN_JS = "() => !!document.querySelector('a[href*=\"/users/\"]')" @@ -54,6 +63,29 @@ async def fetch_json(client: httpx.AsyncClient, path: str) -> dict[str, Any]: return r.json() +def _cc_check_logged_in(page) -> bool: + return "dashboard" in page.url or page.evaluate(_CC_CHECK_LOGIN_JS) + + +def _cc_login_action(credentials: dict[str, str]): + login_error: str | None = None + + def login_action(page): + nonlocal login_error + try: + page.locator('input[name="name"]').fill(credentials.get("username", "")) + page.locator('input[name="pass"]').fill(credentials.get("password", "")) + page.locator("input.cc-login-btn").click() + page.wait_for_function( + "() => !window.location.pathname.includes('/login')", + timeout=BROWSER_NAV_TIMEOUT, + ) + except Exception as e: + login_error = str(e) + + return login_action, lambda: login_error + + def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult: try: from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import] @@ -67,28 +99,32 @@ def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult: _ensure_browser() - _COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True) + saved_cookies = load_platform_cookies("codechef") or [] - logged_in = False - login_error: str | None = None + if saved_cookies: + print(json.dumps({"status": "checking_login"}), flush=True) + logged_in = False - def check_login(page): - nonlocal logged_in - logged_in = "dashboard" in page.url or page.evaluate(_CC_CHECK_LOGIN_JS) + def check_action(page): + nonlocal logged_in + logged_in = _cc_check_logged_in(page) - def login_action(page): - nonlocal login_error try: - page.locator('input[name="name"]').fill(credentials.get("username", "")) - page.locator('input[name="pass"]').fill(credentials.get("password", "")) - page.locator("input.cc-login-btn").click() - try: - page.wait_for_url(lambda url: "/login" not in url, timeout=3000) - except Exception: - login_error = "Login failed (bad credentials?)" - return - except Exception as e: - login_error = str(e) + with StealthySession( + headless=True, + timeout=BROWSER_SESSION_TIMEOUT, + google_search=False, + cookies=saved_cookies, + ) as session: + session.fetch( + f"{BASE_URL}/", page_action=check_action, network_idle=True + ) + if logged_in: + return LoginResult(success=True, error="") + except Exception: + pass + + login_action, get_error = _cc_login_action(credentials) try: with StealthySession( @@ -97,11 +133,20 @@ def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult: google_search=False, ) as session: print(json.dumps({"status": "logging_in"}), flush=True) - session.fetch(f"{BASE_URL}/login", page_action=login_action) + session.fetch( + f"{BASE_URL}/login", page_action=login_action, network_idle=True + ) + login_error = get_error() if login_error: return LoginResult(success=False, error=f"Login failed: {login_error}") - session.fetch(f"{BASE_URL}/", page_action=check_login, network_idle=True) + logged_in = False + + def verify_action(page): + nonlocal logged_in + logged_in = _cc_check_logged_in(page) + + session.fetch(f"{BASE_URL}/", page_action=verify_action, network_idle=True) if not logged_in: return LoginResult( success=False, error="Login failed (bad credentials?)" @@ -109,8 +154,8 @@ def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult: try: browser_cookies = session.context.cookies() - if browser_cookies: - _COOKIE_PATH.write_text(json.dumps(browser_cookies)) + if any(c.get("name") == "userkey" for c in browser_cookies): + save_platform_cookies("codechef", browser_cookies) except Exception: pass @@ -126,6 +171,7 @@ def _submit_headless_codechef( language_id: str, credentials: dict[str, str], _retried: bool = False, + _practice: bool = False, ) -> SubmitResult: source_code = Path(file_path).read_text() @@ -141,36 +187,19 @@ def _submit_headless_codechef( _ensure_browser() - _COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True) saved_cookies: list[dict[str, Any]] = [] - if _COOKIE_PATH.exists() and not _retried: - try: - saved_cookies = json.loads(_COOKIE_PATH.read_text()) - except Exception: - pass + if not _retried: + saved_cookies = load_platform_cookies("codechef") or [] - logged_in = bool(saved_cookies) and not _retried - login_error: str | None = None + logged_in = bool(saved_cookies) submit_error: str | None = None needs_relogin = False def check_login(page): nonlocal logged_in - logged_in = "dashboard" in page.url or page.evaluate(_CC_CHECK_LOGIN_JS) + logged_in = _cc_check_logged_in(page) - def login_action(page): - nonlocal login_error - try: - page.locator('input[name="name"]').fill(credentials.get("username", "")) - page.locator('input[name="pass"]').fill(credentials.get("password", "")) - page.locator("input.cc-login-btn").click() - try: - page.wait_for_url(lambda url: "/login" not in url, timeout=3000) - except Exception: - login_error = "Login failed (bad credentials?)" - return - except Exception as e: - login_error = str(e) + _login_action, _get_login_error = _cc_login_action(credentials) def submit_action(page): nonlocal submit_error, needs_relogin @@ -178,12 +207,13 @@ def _submit_headless_codechef( needs_relogin = True return try: - page.wait_for_selector('[aria-haspopup="listbox"]', timeout=10000) + page.wait_for_selector( + '[aria-haspopup="listbox"]', timeout=BROWSER_ELEMENT_WAIT + ) page.locator('[aria-haspopup="listbox"]').click() - page.wait_for_selector('[role="option"]', timeout=5000) + page.wait_for_selector('[role="option"]', timeout=BROWSER_ELEMENT_WAIT) page.locator(f'[role="option"][data-value="{language_id}"]').click() - page.wait_for_timeout(250) page.locator(".ace_editor").click() page.keyboard.press("Control+a") @@ -198,7 +228,6 @@ def _submit_headless_codechef( }""", source_code, ) - page.wait_for_timeout(125) page.evaluate( "() => document.getElementById('submit_btn').scrollIntoView({block:'center'})" @@ -213,7 +242,9 @@ def _submit_headless_codechef( const d = document.querySelector('[role="dialog"], .swal2-popup'); return d ? d.textContent.trim() : null; }""") - if dialog_text and ( + if dialog_text and "login" in dialog_text.lower(): + needs_relogin = True + elif dialog_text and ( "not available for accepting solutions" in dialog_text or "not available for submission" in dialog_text ): @@ -228,9 +259,9 @@ def _submit_headless_codechef( headless=True, timeout=BROWSER_SESSION_TIMEOUT, google_search=False, - cookies=saved_cookies if (saved_cookies and not _retried) else [], + cookies=saved_cookies if saved_cookies else [], ) as session: - if not logged_in: + if not _retried and not _practice and saved_cookies: print(json.dumps({"status": "checking_login"}), flush=True) session.fetch( f"{BASE_URL}/", page_action=check_login, network_idle=True @@ -238,13 +269,18 @@ def _submit_headless_codechef( if not logged_in: print(json.dumps({"status": "logging_in"}), flush=True) - session.fetch(f"{BASE_URL}/login", page_action=login_action) + session.fetch( + f"{BASE_URL}/login", page_action=_login_action, network_idle=True + ) + login_error = _get_login_error() if login_error: return SubmitResult( success=False, error=f"Login failed: {login_error}" ) + logged_in = True - print(json.dumps({"status": "submitting"}), flush=True) + if not _practice: + print(json.dumps({"status": "submitting"}), flush=True) submit_url = ( f"{BASE_URL}/submit/{problem_id}" if contest_id == "PRACTICE" @@ -254,13 +290,13 @@ def _submit_headless_codechef( try: browser_cookies = session.context.cookies() - if browser_cookies and logged_in: - _COOKIE_PATH.write_text(json.dumps(browser_cookies)) + if any(c.get("name") == "userkey" for c in browser_cookies): + save_platform_cookies("codechef", browser_cookies) except Exception: pass if needs_relogin and not _retried: - _COOKIE_PATH.unlink(missing_ok=True) + clear_platform_cookies("codechef") return _submit_headless_codechef( contest_id, problem_id, @@ -270,14 +306,14 @@ def _submit_headless_codechef( _retried=True, ) - if submit_error == "PRACTICE_FALLBACK" and not _retried: + if submit_error == "PRACTICE_FALLBACK" and not _practice: return _submit_headless_codechef( "PRACTICE", problem_id, file_path, language_id, credentials, - _retried=True, + _practice=True, ) if submit_error: diff --git a/scrapers/codeforces.py b/scrapers/codeforces.py index d5b6161..5bbfa38 100644 --- a/scrapers/codeforces.py +++ b/scrapers/codeforces.py @@ -8,7 +8,13 @@ from typing import Any import requests from bs4 import BeautifulSoup, Tag -from .base import BaseScraper, extract_precision +from .base import ( + BaseScraper, + clear_platform_cookies, + extract_precision, + load_platform_cookies, + save_platform_cookies, +) from .models import ( ContestListResult, ContestSummary, @@ -331,9 +337,33 @@ class CodeforcesScraper(BaseScraper): return await asyncio.to_thread(_login_headless_cf, credentials) -def _login_headless_cf(credentials: dict[str, str]) -> LoginResult: - from pathlib import Path +def _cf_check_logged_in(page) -> bool: + return page.evaluate( + "() => Array.from(document.querySelectorAll('a'))" + ".some(a => a.textContent.includes('Logout'))" + ) + +def _cf_login_action(credentials: dict[str, str]): + login_error: str | None = None + + def login_action(page): + nonlocal login_error + try: + page.wait_for_selector('input[name="handleOrEmail"]', timeout=60000) + page.fill('input[name="handleOrEmail"]', credentials.get("username", "")) + page.fill('input[name="password"]', credentials.get("password", "")) + page.locator('#enterForm input[type="submit"]').click() + page.wait_for_url( + lambda url: "/enter" not in url, timeout=BROWSER_NAV_TIMEOUT + ) + except Exception as e: + login_error = str(e) + + return login_action, lambda: login_error + + +def _login_headless_cf(credentials: dict[str, str]) -> LoginResult: try: from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import] except ImportError: @@ -346,36 +376,32 @@ def _login_headless_cf(credentials: dict[str, str]) -> LoginResult: _ensure_browser() - cookie_cache = Path.home() / ".cache" / "cp-nvim" / "codeforces-cookies.json" - cookie_cache.parent.mkdir(parents=True, exist_ok=True) + saved_cookies = load_platform_cookies("codeforces") or [] - logged_in = False - login_error: str | None = None + if saved_cookies: + print(json.dumps({"status": "checking_login"}), flush=True) + logged_in = False - def check_login(page): - nonlocal logged_in - logged_in = page.evaluate( - "() => Array.from(document.querySelectorAll('a'))" - ".some(a => a.textContent.includes('Logout'))" - ) + def check_action(page): + nonlocal logged_in + logged_in = _cf_check_logged_in(page) - def login_action(page): - nonlocal login_error try: - page.fill( - 'input[name="handleOrEmail"]', - credentials.get("username", ""), - ) - page.fill( - 'input[name="password"]', - credentials.get("password", ""), - ) - page.locator('#enterForm input[type="submit"]').click() - page.wait_for_url( - lambda url: "/enter" not in url, timeout=BROWSER_NAV_TIMEOUT - ) - except Exception as e: - login_error = str(e) + with StealthySession( + headless=True, + timeout=BROWSER_SESSION_TIMEOUT, + google_search=False, + cookies=saved_cookies, + ) as session: + session.fetch( + f"{BASE_URL}/", page_action=check_action, solve_cloudflare=True + ) + if logged_in: + return LoginResult(success=True, error="") + except Exception: + pass + + login_action, get_error = _cf_login_action(credentials) try: with StealthySession( @@ -389,14 +415,17 @@ def _login_headless_cf(credentials: dict[str, str]) -> LoginResult: page_action=login_action, solve_cloudflare=True, ) + login_error = get_error() if login_error: return LoginResult(success=False, error=f"Login failed: {login_error}") - session.fetch( - f"{BASE_URL}/", - page_action=check_login, - network_idle=True, - ) + logged_in = False + + def verify_action(page): + nonlocal logged_in + logged_in = _cf_check_logged_in(page) + + session.fetch(f"{BASE_URL}/", page_action=verify_action, network_idle=True) if not logged_in: return LoginResult( success=False, error="Login failed (bad credentials?)" @@ -404,8 +433,8 @@ def _login_headless_cf(credentials: dict[str, str]) -> LoginResult: try: browser_cookies = session.context.cookies() - if any(c.get("name") == "X-User-Handle" for c in browser_cookies): - cookie_cache.write_text(json.dumps(browser_cookies)) + if any(c.get("name") == "X-User-Sha1" for c in browser_cookies): + save_platform_cookies("codeforces", browser_cookies) except Exception: pass @@ -438,44 +467,19 @@ def _submit_headless( _ensure_browser() - cookie_cache = Path.home() / ".cache" / "cp-nvim" / "codeforces-cookies.json" - cookie_cache.parent.mkdir(parents=True, exist_ok=True) saved_cookies: list[dict[str, Any]] = [] - if cookie_cache.exists(): - try: - saved_cookies = json.loads(cookie_cache.read_text()) - except Exception: - pass + if not _retried: + saved_cookies = load_platform_cookies("codeforces") or [] - logged_in = cookie_cache.exists() and not _retried - login_error: str | None = None + logged_in = bool(saved_cookies) submit_error: str | None = None needs_relogin = False def check_login(page): nonlocal logged_in - logged_in = page.evaluate( - "() => Array.from(document.querySelectorAll('a'))" - ".some(a => a.textContent.includes('Logout'))" - ) + logged_in = _cf_check_logged_in(page) - def login_action(page): - nonlocal login_error - try: - page.fill( - 'input[name="handleOrEmail"]', - credentials.get("username", ""), - ) - page.fill( - 'input[name="password"]', - credentials.get("password", ""), - ) - page.locator('#enterForm input[type="submit"]').click() - page.wait_for_url( - lambda url: "/enter" not in url, timeout=BROWSER_NAV_TIMEOUT - ) - except Exception as e: - login_error = str(e) + _login_action, _get_login_error = _cf_login_action(credentials) def submit_action(page): nonlocal submit_error, needs_relogin @@ -520,27 +524,27 @@ def _submit_headless( headless=True, timeout=BROWSER_SESSION_TIMEOUT, google_search=False, - cookies=saved_cookies if (cookie_cache.exists() and not _retried) else [], + cookies=saved_cookies if saved_cookies else [], ) as session: - if not (cookie_cache.exists() and not _retried): + if not _retried and saved_cookies: print(json.dumps({"status": "checking_login"}), flush=True) session.fetch( - f"{BASE_URL}/", - page_action=check_login, - network_idle=True, + f"{BASE_URL}/", page_action=check_login, solve_cloudflare=True ) if not logged_in: print(json.dumps({"status": "logging_in"}), flush=True) session.fetch( f"{BASE_URL}/enter", - page_action=login_action, + page_action=_login_action, solve_cloudflare=True, ) + login_error = _get_login_error() if login_error: return SubmitResult( success=False, error=f"Login failed: {login_error}" ) + logged_in = True print(json.dumps({"status": "submitting"}), flush=True) session.fetch( @@ -551,13 +555,13 @@ def _submit_headless( try: browser_cookies = session.context.cookies() - if any(c.get("name") == "X-User-Handle" for c in browser_cookies): - cookie_cache.write_text(json.dumps(browser_cookies)) + if any(c.get("name") == "X-User-Sha1" for c in browser_cookies): + save_platform_cookies("codeforces", browser_cookies) except Exception: pass if needs_relogin and not _retried: - cookie_cache.unlink(missing_ok=True) + clear_platform_cookies("codeforces") return _submit_headless( contest_id, problem_id, diff --git a/scrapers/kattis.py b/scrapers/kattis.py index 1177445..ac2c157 100644 --- a/scrapers/kattis.py +++ b/scrapers/kattis.py @@ -10,7 +10,13 @@ from pathlib import Path import httpx -from .base import BaseScraper, extract_precision +from .base import ( + BaseScraper, + clear_platform_cookies, + extract_precision, + load_platform_cookies, + save_platform_cookies, +) from .timeouts import HTTP_TIMEOUT from .models import ( ContestListResult, @@ -28,8 +34,6 @@ HEADERS = { } CONNECTIONS = 8 -_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "kattis-cookies.json" - TIME_RE = re.compile( r"CPU Time limit\s*]*>\s*(\d+)\s*seconds?\s*", re.DOTALL, @@ -209,20 +213,24 @@ async def _stream_single_problem(client: httpx.AsyncClient, slug: str) -> None: async def _load_kattis_cookies(client: httpx.AsyncClient) -> None: - if not _COOKIE_PATH.exists(): - return - try: - for k, v in json.loads(_COOKIE_PATH.read_text()).items(): + data = load_platform_cookies("kattis") + if isinstance(data, dict): + for k, v in data.items(): client.cookies.set(k, v) - except Exception: - pass async def _save_kattis_cookies(client: httpx.AsyncClient) -> None: - cookies = {k: v for k, v in client.cookies.items()} + cookies = dict(client.cookies.items()) if cookies: - _COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True) - _COOKIE_PATH.write_text(json.dumps(cookies)) + save_platform_cookies("kattis", cookies) + + +async def _check_kattis_login(client: httpx.AsyncClient) -> bool: + try: + r = await client.get(BASE_URL, headers=HEADERS, timeout=HTTP_TIMEOUT) + return bool(r.headers.get("x-username")) + except Exception: + return False async def _do_kattis_login( @@ -329,9 +337,10 @@ class KattisScraper(BaseScraper): return self._submit_error("Missing credentials. Use :CP kattis login") async with httpx.AsyncClient(follow_redirects=True) as client: - print(json.dumps({"status": "checking_login"}), flush=True) await _load_kattis_cookies(client) - if not client.cookies: + if client.cookies: + print(json.dumps({"status": "checking_login"}), flush=True) + else: print(json.dumps({"status": "logging_in"}), flush=True) ok = await _do_kattis_login(client, username, password) if not ok: @@ -368,7 +377,7 @@ class KattisScraper(BaseScraper): return self._submit_error(f"Submit request failed: {e}") if r.status_code in (400, 403) or r.text == "Request validation failed": - _COOKIE_PATH.unlink(missing_ok=True) + clear_platform_cookies("kattis") print(json.dumps({"status": "logging_in"}), flush=True) ok = await _do_kattis_login(client, username, password) if not ok: @@ -399,6 +408,16 @@ class KattisScraper(BaseScraper): return self._login_error("Missing username or password") async with httpx.AsyncClient(follow_redirects=True) as client: + await _load_kattis_cookies(client) + if client.cookies: + print(json.dumps({"status": "checking_login"}), flush=True) + if await _check_kattis_login(client): + return LoginResult( + success=True, + error="", + credentials={"username": username, "password": password}, + ) + print(json.dumps({"status": "logging_in"}), flush=True) ok = await _do_kattis_login(client, username, password) if not ok: diff --git a/scrapers/usaco.py b/scrapers/usaco.py index b009cf0..b6e95d2 100644 --- a/scrapers/usaco.py +++ b/scrapers/usaco.py @@ -8,7 +8,12 @@ from typing import Any, cast import httpx -from .base import BaseScraper, extract_precision +from .base import ( + BaseScraper, + extract_precision, + load_platform_cookies, + save_platform_cookies, +) from .timeouts import HTTP_TIMEOUT from .models import ( ContestListResult, @@ -27,7 +32,6 @@ HEADERS = { } CONNECTIONS = 4 -_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "usaco-cookies.json" _LOGIN_PATH = "/current/tpcm/login-session.php" _SUBMIT_PATH = "/current/tpcm/submit-solution.php" @@ -202,20 +206,16 @@ def _parse_submit_form( async def _load_usaco_cookies(client: httpx.AsyncClient) -> None: - if not _COOKIE_PATH.exists(): - return - try: - for k, v in json.loads(_COOKIE_PATH.read_text()).items(): + data = load_platform_cookies("usaco") + if isinstance(data, dict): + for k, v in data.items(): client.cookies.set(k, v) - except Exception: - pass async def _save_usaco_cookies(client: httpx.AsyncClient) -> None: - cookies = {k: v for k, v in client.cookies.items()} + cookies = dict(client.cookies.items()) if cookies: - _COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True) - _COOKIE_PATH.write_text(json.dumps(cookies)) + save_platform_cookies("usaco", cookies) async def _check_usaco_login(client: httpx.AsyncClient, username: str) -> bool: