diff --git a/lua/cp/constants.lua b/lua/cp/constants.lua index 3a2e27d..ec3c384 100644 --- a/lua/cp/constants.lua +++ b/lua/cp/constants.lua @@ -219,6 +219,4 @@ M.LANGUAGE_VERSIONS = { M.DEFAULT_VERSIONS = { cpp = 'c++20', python = 'python3' } -M.COOKIE_FILE = vim.fn.expand('~/.cache/cp-nvim/cookies.json') - return M diff --git a/lua/cp/credentials.lua b/lua/cp/credentials.lua index cf3d29e..637ed23 100644 --- a/lua/cp/credentials.lua +++ b/lua/cp/credentials.lua @@ -38,7 +38,6 @@ local function prompt_and_login(platform, display) end, function(result) vim.schedule(function() if result.success then - cache.set_credentials(platform, credentials) logger.log( display .. ' login successful', { level = vim.log.levels.INFO, override = true } @@ -106,14 +105,6 @@ function M.logout(platform) local display = constants.PLATFORM_DISPLAY_NAMES[platform] or platform cache.load() cache.clear_credentials(platform) - local cookie_file = constants.COOKIE_FILE - if vim.fn.filereadable(cookie_file) == 1 then - local ok, data = pcall(vim.fn.json_decode, vim.fn.readfile(cookie_file, 'b')) - if ok and type(data) == 'table' then - data[platform] = nil - vim.fn.writefile({ vim.fn.json_encode(data) }, cookie_file) - end - end logger.log(display .. ' credentials cleared', { level = vim.log.levels.INFO, override = true }) end diff --git a/scrapers/atcoder.py b/scrapers/atcoder.py index c52190b..a9876cf 100644 --- a/scrapers/atcoder.py +++ b/scrapers/atcoder.py @@ -6,6 +6,7 @@ import os import re import subprocess import time +from pathlib import Path from typing import Any import backoff @@ -15,13 +16,7 @@ from bs4 import BeautifulSoup, Tag from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry -from .base import ( - BaseScraper, - clear_platform_cookies, - extract_precision, - load_platform_cookies, - save_platform_cookies, -) +from .base import BaseScraper, extract_precision from .models import ( ContestListResult, ContestSummary, @@ -384,15 +379,26 @@ def _ensure_browser() -> None: break -def _at_check_logged_in(page) -> bool: - return page.evaluate( - "() => Array.from(document.querySelectorAll('a')).some(a => a.textContent.trim() === 'Sign Out')" - ) +def _login_headless(credentials: dict[str, str]) -> LoginResult: + try: + from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import] + except ImportError: + return LoginResult( + success=False, + error="scrapling is required for AtCoder login. Install it: uv add 'scrapling[fetchers]>=0.4'", + ) + _ensure_browser() -def _at_login_action(credentials: dict[str, str]): + logged_in = False login_error: str | None = None + def check_login(page): + nonlocal logged_in + logged_in = page.evaluate( + "() => Array.from(document.querySelectorAll('a')).some(a => a.textContent.trim() === 'Sign Out')" + ) + def login_action(page): nonlocal login_error try: @@ -406,46 +412,100 @@ def _at_login_action(credentials: dict[str, str]): except Exception as e: login_error = str(e) - return login_action, lambda: login_error + try: + with StealthySession( + headless=True, + timeout=BROWSER_SESSION_TIMEOUT, + google_search=False, + ) as session: + print(json.dumps({"status": "logging_in"}), flush=True) + session.fetch( + f"{BASE_URL}/login", + page_action=login_action, + solve_cloudflare=True, + ) + if login_error: + return LoginResult(success=False, error=f"Login failed: {login_error}") + + session.fetch( + f"{BASE_URL}/home", page_action=check_login, network_idle=True + ) + if not logged_in: + return LoginResult( + success=False, error="Login failed (bad credentials?)" + ) + + return LoginResult(success=True, error="") + except Exception as e: + return LoginResult(success=False, error=str(e)) -def _login_headless(credentials: dict[str, str]) -> LoginResult: +def _submit_headless( + contest_id: str, + problem_id: str, + file_path: str, + language_id: str, + credentials: dict[str, str], +) -> "SubmitResult": try: from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import] except ImportError: - return LoginResult( + return SubmitResult( success=False, - error="scrapling is required for AtCoder login. Install it: uv add 'scrapling[fetchers]>=0.4'", + error="scrapling is required for AtCoder submit. Install it: uv add 'scrapling[fetchers]>=0.4'", ) _ensure_browser() - saved_cookies = load_platform_cookies("atcoder") or [] - - if saved_cookies: - print(json.dumps({"status": "checking_login"}), flush=True) - logged_in = False - - def check_action(page): - nonlocal logged_in - logged_in = _at_check_logged_in(page) + login_error: str | None = None + submit_error: str | None = None + def login_action(page): + nonlocal login_error try: - with StealthySession( - headless=True, - timeout=BROWSER_SESSION_TIMEOUT, - google_search=False, - cookies=saved_cookies, - ) as session: - session.fetch( - f"{BASE_URL}/home", page_action=check_action, network_idle=True - ) - if logged_in: - return LoginResult(success=True, error="") - except Exception: - pass + _solve_turnstile(page) + page.fill('input[name="username"]', credentials.get("username", "")) + page.fill('input[name="password"]', credentials.get("password", "")) + page.click("#submit") + page.wait_for_url( + lambda url: "/login" not in url, timeout=BROWSER_NAV_TIMEOUT + ) + except Exception as e: + login_error = str(e) - login_action, get_error = _at_login_action(credentials) + def submit_action(page): + nonlocal submit_error + if "/login" in page.url: + submit_error = "Not logged in after login step" + return + try: + _solve_turnstile(page) + page.select_option( + 'select[name="data.TaskScreenName"]', + f"{contest_id}_{problem_id}", + ) + page.locator( + f'select[name="data.LanguageId"] option[value="{language_id}"]' + ).wait_for(state="attached", timeout=BROWSER_ELEMENT_WAIT) + page.select_option('select[name="data.LanguageId"]', language_id) + ext = _LANGUAGE_ID_EXTENSION.get( + language_id, Path(file_path).suffix.lstrip(".") or "txt" + ) + page.set_input_files( + "#input-open-file", + { + "name": f"solution.{ext}", + "mimeType": "text/plain", + "buffer": Path(file_path).read_bytes(), + }, + ) + page.locator('button[type="submit"]').click(no_wait_after=True) + page.wait_for_url( + lambda url: "/submissions/me" in url, + timeout=BROWSER_SUBMIT_NAV_TIMEOUT["atcoder"], + ) + except Exception as e: + submit_error = str(e) try: with StealthySession( @@ -459,128 +519,8 @@ def _login_headless(credentials: dict[str, str]) -> LoginResult: page_action=login_action, solve_cloudflare=True, ) - login_error = get_error() if login_error: - return LoginResult(success=False, error=f"Login failed: {login_error}") - - logged_in = False - - def verify_action(page): - nonlocal logged_in - logged_in = _at_check_logged_in(page) - - session.fetch( - f"{BASE_URL}/home", page_action=verify_action, network_idle=True - ) - if not logged_in: - return LoginResult( - success=False, error="Login failed (bad credentials?)" - ) - - try: - browser_cookies = session.context.cookies() - if browser_cookies: - save_platform_cookies("atcoder", browser_cookies) - except Exception: - pass - - return LoginResult(success=True, error="") - except Exception as e: - return LoginResult(success=False, error=str(e)) - - -def _submit_headless( - contest_id: str, - problem_id: str, - file_path: str, - language_id: str, - credentials: dict[str, str], - _retried: bool = False, -) -> "SubmitResult": - try: - from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import] - except ImportError: - return SubmitResult( - success=False, - error="scrapling is required for AtCoder submit. Install it: uv add 'scrapling[fetchers]>=0.4'", - ) - - _ensure_browser() - - saved_cookies: list[dict[str, Any]] = [] - if not _retried: - saved_cookies = load_platform_cookies("atcoder") or [] - - logged_in = bool(saved_cookies) - submit_error: str | None = None - needs_relogin = False - - def check_login(page): - nonlocal logged_in - logged_in = _at_check_logged_in(page) - - login_action, get_login_error = _at_login_action(credentials) - - def submit_action(page): - nonlocal submit_error, needs_relogin - if "/login" in page.url: - needs_relogin = True - return - try: - _solve_turnstile(page) - page.select_option( - 'select[name="data.TaskScreenName"]', - f"{contest_id}_{problem_id}", - ) - page.locator( - f'select[name="data.LanguageId"] option[value="{language_id}"]' - ).wait_for(state="attached", timeout=BROWSER_ELEMENT_WAIT) - page.select_option('select[name="data.LanguageId"]', language_id) - page.set_input_files("#input-open-file", file_path) - page.wait_for_function( - "() => { const ta = document.getElementById('plain-textarea'); return ta && ta.value.length > 0; }", - timeout=BROWSER_ELEMENT_WAIT, - ) - page.evaluate("document.getElementById('submit').click()") - page.wait_for_url( - lambda url: "/submissions/me" in url, - timeout=BROWSER_SUBMIT_NAV_TIMEOUT["atcoder"], - ) - except Exception as e: - submit_error = str(e) - - try: - with StealthySession( - headless=True, - timeout=BROWSER_SESSION_TIMEOUT, - google_search=False, - cookies=saved_cookies if saved_cookies else [], - ) as session: - if not _retried and saved_cookies: - print(json.dumps({"status": "checking_login"}), flush=True) - session.fetch( - f"{BASE_URL}/home", page_action=check_login, network_idle=True - ) - - if not logged_in: - print(json.dumps({"status": "logging_in"}), flush=True) - session.fetch( - f"{BASE_URL}/login", - page_action=login_action, - solve_cloudflare=True, - ) - login_error = get_login_error() - if login_error: - return SubmitResult( - success=False, error=f"Login failed: {login_error}" - ) - logged_in = True - try: - browser_cookies = session.context.cookies() - if browser_cookies: - save_platform_cookies("atcoder", browser_cookies) - except Exception: - pass + return SubmitResult(success=False, error=f"Login failed: {login_error}") print(json.dumps({"status": "submitting"}), flush=True) session.fetch( @@ -589,17 +529,6 @@ def _submit_headless( solve_cloudflare=True, ) - if needs_relogin and not _retried: - clear_platform_cookies("atcoder") - return _submit_headless( - contest_id, - problem_id, - file_path, - language_id, - credentials, - _retried=True, - ) - if submit_error: return SubmitResult(success=False, error=submit_error) diff --git a/scrapers/base.py b/scrapers/base.py index 035495a..11ab8c6 100644 --- a/scrapers/base.py +++ b/scrapers/base.py @@ -4,8 +4,6 @@ import os import re import sys from abc import ABC, abstractmethod -from pathlib import Path -from typing import Any from .language_ids import get_language_id from .models import ( @@ -17,36 +15,6 @@ from .models import ( TestsResult, ) -_COOKIE_FILE = Path.home() / ".cache" / "cp-nvim" / "cookies.json" - - -def load_platform_cookies(platform: str) -> Any | None: - try: - data = json.loads(_COOKIE_FILE.read_text()) - return data.get(platform) - except Exception: - return None - - -def save_platform_cookies(platform: str, data: Any) -> None: - _COOKIE_FILE.parent.mkdir(parents=True, exist_ok=True) - try: - existing = json.loads(_COOKIE_FILE.read_text()) - except Exception: - existing = {} - existing[platform] = data - _COOKIE_FILE.write_text(json.dumps(existing)) - - -def clear_platform_cookies(platform: str) -> None: - try: - existing = json.loads(_COOKIE_FILE.read_text()) - existing.pop(platform, None) - _COOKIE_FILE.write_text(json.dumps(existing)) - except Exception: - pass - - _PRECISION_ABS_REL_RE = re.compile( r"(?:absolute|relative)\s+error[^.]*?10\s*[\^{]\s*\{?\s*[-\u2212]\s*(\d+)\s*\}?", re.IGNORECASE, diff --git a/scrapers/codechef.py b/scrapers/codechef.py index b64fdf5..0427d5d 100644 --- a/scrapers/codechef.py +++ b/scrapers/codechef.py @@ -9,18 +9,8 @@ from typing import Any import httpx -from .base import ( - BaseScraper, - clear_platform_cookies, - load_platform_cookies, - save_platform_cookies, -) -from .timeouts import ( - BROWSER_ELEMENT_WAIT, - BROWSER_NAV_TIMEOUT, - BROWSER_SESSION_TIMEOUT, - HTTP_TIMEOUT, -) +from .base import BaseScraper +from .timeouts import BROWSER_SESSION_TIMEOUT, HTTP_TIMEOUT from .models import ( ContestListResult, ContestSummary, @@ -41,6 +31,7 @@ HEADERS = { } CONNECTIONS = 8 +_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "codechef-cookies.json" _CC_CHECK_LOGIN_JS = "() => !!document.querySelector('a[href*=\"/users/\"]')" @@ -63,29 +54,6 @@ async def fetch_json(client: httpx.AsyncClient, path: str) -> dict[str, Any]: return r.json() -def _cc_check_logged_in(page) -> bool: - return "dashboard" in page.url or page.evaluate(_CC_CHECK_LOGIN_JS) - - -def _cc_login_action(credentials: dict[str, str]): - login_error: str | None = None - - def login_action(page): - nonlocal login_error - try: - page.locator('input[name="name"]').fill(credentials.get("username", "")) - page.locator('input[name="pass"]').fill(credentials.get("password", "")) - page.locator("input.cc-login-btn").click() - page.wait_for_function( - "() => !window.location.pathname.includes('/login')", - timeout=BROWSER_NAV_TIMEOUT, - ) - except Exception as e: - login_error = str(e) - - return login_action, lambda: login_error - - def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult: try: from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import] @@ -99,32 +67,28 @@ def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult: _ensure_browser() - saved_cookies = load_platform_cookies("codechef") or [] + _COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True) - if saved_cookies: - print(json.dumps({"status": "checking_login"}), flush=True) - logged_in = False + logged_in = False + login_error: str | None = None - def check_action(page): - nonlocal logged_in - logged_in = _cc_check_logged_in(page) + def check_login(page): + nonlocal logged_in + logged_in = "dashboard" in page.url or page.evaluate(_CC_CHECK_LOGIN_JS) + def login_action(page): + nonlocal login_error try: - with StealthySession( - headless=True, - timeout=BROWSER_SESSION_TIMEOUT, - google_search=False, - cookies=saved_cookies, - ) as session: - session.fetch( - f"{BASE_URL}/", page_action=check_action, network_idle=True - ) - if logged_in: - return LoginResult(success=True, error="") - except Exception: - pass - - login_action, get_error = _cc_login_action(credentials) + page.locator('input[name="name"]').fill(credentials.get("username", "")) + page.locator('input[name="pass"]').fill(credentials.get("password", "")) + page.locator("input.cc-login-btn").click() + try: + page.wait_for_url(lambda url: "/login" not in url, timeout=3000) + except Exception: + login_error = "Login failed (bad credentials?)" + return + except Exception as e: + login_error = str(e) try: with StealthySession( @@ -133,20 +97,11 @@ def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult: google_search=False, ) as session: print(json.dumps({"status": "logging_in"}), flush=True) - session.fetch( - f"{BASE_URL}/login", page_action=login_action, network_idle=True - ) - login_error = get_error() + session.fetch(f"{BASE_URL}/login", page_action=login_action) if login_error: return LoginResult(success=False, error=f"Login failed: {login_error}") - logged_in = False - - def verify_action(page): - nonlocal logged_in - logged_in = _cc_check_logged_in(page) - - session.fetch(f"{BASE_URL}/", page_action=verify_action, network_idle=True) + session.fetch(f"{BASE_URL}/", page_action=check_login, network_idle=True) if not logged_in: return LoginResult( success=False, error="Login failed (bad credentials?)" @@ -154,8 +109,8 @@ def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult: try: browser_cookies = session.context.cookies() - if any(c.get("name") == "userkey" for c in browser_cookies): - save_platform_cookies("codechef", browser_cookies) + if browser_cookies: + _COOKIE_PATH.write_text(json.dumps(browser_cookies)) except Exception: pass @@ -171,7 +126,6 @@ def _submit_headless_codechef( language_id: str, credentials: dict[str, str], _retried: bool = False, - _practice: bool = False, ) -> SubmitResult: source_code = Path(file_path).read_text() @@ -187,19 +141,36 @@ def _submit_headless_codechef( _ensure_browser() + _COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True) saved_cookies: list[dict[str, Any]] = [] - if not _retried: - saved_cookies = load_platform_cookies("codechef") or [] + if _COOKIE_PATH.exists() and not _retried: + try: + saved_cookies = json.loads(_COOKIE_PATH.read_text()) + except Exception: + pass - logged_in = bool(saved_cookies) + logged_in = bool(saved_cookies) and not _retried + login_error: str | None = None submit_error: str | None = None needs_relogin = False def check_login(page): nonlocal logged_in - logged_in = _cc_check_logged_in(page) + logged_in = "dashboard" in page.url or page.evaluate(_CC_CHECK_LOGIN_JS) - _login_action, _get_login_error = _cc_login_action(credentials) + def login_action(page): + nonlocal login_error + try: + page.locator('input[name="name"]').fill(credentials.get("username", "")) + page.locator('input[name="pass"]').fill(credentials.get("password", "")) + page.locator("input.cc-login-btn").click() + try: + page.wait_for_url(lambda url: "/login" not in url, timeout=3000) + except Exception: + login_error = "Login failed (bad credentials?)" + return + except Exception as e: + login_error = str(e) def submit_action(page): nonlocal submit_error, needs_relogin @@ -207,13 +178,12 @@ def _submit_headless_codechef( needs_relogin = True return try: - page.wait_for_selector( - '[aria-haspopup="listbox"]', timeout=BROWSER_ELEMENT_WAIT - ) + page.wait_for_selector('[aria-haspopup="listbox"]', timeout=10000) page.locator('[aria-haspopup="listbox"]').click() - page.wait_for_selector('[role="option"]', timeout=BROWSER_ELEMENT_WAIT) + page.wait_for_selector('[role="option"]', timeout=5000) page.locator(f'[role="option"][data-value="{language_id}"]').click() + page.wait_for_timeout(250) page.locator(".ace_editor").click() page.keyboard.press("Control+a") @@ -228,6 +198,7 @@ def _submit_headless_codechef( }""", source_code, ) + page.wait_for_timeout(125) page.evaluate( "() => document.getElementById('submit_btn').scrollIntoView({block:'center'})" @@ -242,9 +213,7 @@ def _submit_headless_codechef( const d = document.querySelector('[role="dialog"], .swal2-popup'); return d ? d.textContent.trim() : null; }""") - if dialog_text and "login" in dialog_text.lower(): - needs_relogin = True - elif dialog_text and ( + if dialog_text and ( "not available for accepting solutions" in dialog_text or "not available for submission" in dialog_text ): @@ -259,9 +228,9 @@ def _submit_headless_codechef( headless=True, timeout=BROWSER_SESSION_TIMEOUT, google_search=False, - cookies=saved_cookies if saved_cookies else [], + cookies=saved_cookies if (saved_cookies and not _retried) else [], ) as session: - if not _retried and not _practice and saved_cookies: + if not logged_in: print(json.dumps({"status": "checking_login"}), flush=True) session.fetch( f"{BASE_URL}/", page_action=check_login, network_idle=True @@ -269,18 +238,13 @@ def _submit_headless_codechef( if not logged_in: print(json.dumps({"status": "logging_in"}), flush=True) - session.fetch( - f"{BASE_URL}/login", page_action=_login_action, network_idle=True - ) - login_error = _get_login_error() + session.fetch(f"{BASE_URL}/login", page_action=login_action) if login_error: return SubmitResult( success=False, error=f"Login failed: {login_error}" ) - logged_in = True - if not _practice: - print(json.dumps({"status": "submitting"}), flush=True) + print(json.dumps({"status": "submitting"}), flush=True) submit_url = ( f"{BASE_URL}/submit/{problem_id}" if contest_id == "PRACTICE" @@ -290,13 +254,13 @@ def _submit_headless_codechef( try: browser_cookies = session.context.cookies() - if any(c.get("name") == "userkey" for c in browser_cookies): - save_platform_cookies("codechef", browser_cookies) + if browser_cookies and logged_in: + _COOKIE_PATH.write_text(json.dumps(browser_cookies)) except Exception: pass if needs_relogin and not _retried: - clear_platform_cookies("codechef") + _COOKIE_PATH.unlink(missing_ok=True) return _submit_headless_codechef( contest_id, problem_id, @@ -306,14 +270,14 @@ def _submit_headless_codechef( _retried=True, ) - if submit_error == "PRACTICE_FALLBACK" and not _practice: + if submit_error == "PRACTICE_FALLBACK" and not _retried: return _submit_headless_codechef( "PRACTICE", problem_id, file_path, language_id, credentials, - _practice=True, + _retried=True, ) if submit_error: diff --git a/scrapers/codeforces.py b/scrapers/codeforces.py index 5bbfa38..d5b6161 100644 --- a/scrapers/codeforces.py +++ b/scrapers/codeforces.py @@ -8,13 +8,7 @@ from typing import Any import requests from bs4 import BeautifulSoup, Tag -from .base import ( - BaseScraper, - clear_platform_cookies, - extract_precision, - load_platform_cookies, - save_platform_cookies, -) +from .base import BaseScraper, extract_precision from .models import ( ContestListResult, ContestSummary, @@ -337,33 +331,9 @@ class CodeforcesScraper(BaseScraper): return await asyncio.to_thread(_login_headless_cf, credentials) -def _cf_check_logged_in(page) -> bool: - return page.evaluate( - "() => Array.from(document.querySelectorAll('a'))" - ".some(a => a.textContent.includes('Logout'))" - ) - - -def _cf_login_action(credentials: dict[str, str]): - login_error: str | None = None - - def login_action(page): - nonlocal login_error - try: - page.wait_for_selector('input[name="handleOrEmail"]', timeout=60000) - page.fill('input[name="handleOrEmail"]', credentials.get("username", "")) - page.fill('input[name="password"]', credentials.get("password", "")) - page.locator('#enterForm input[type="submit"]').click() - page.wait_for_url( - lambda url: "/enter" not in url, timeout=BROWSER_NAV_TIMEOUT - ) - except Exception as e: - login_error = str(e) - - return login_action, lambda: login_error - - def _login_headless_cf(credentials: dict[str, str]) -> LoginResult: + from pathlib import Path + try: from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import] except ImportError: @@ -376,32 +346,36 @@ def _login_headless_cf(credentials: dict[str, str]) -> LoginResult: _ensure_browser() - saved_cookies = load_platform_cookies("codeforces") or [] + cookie_cache = Path.home() / ".cache" / "cp-nvim" / "codeforces-cookies.json" + cookie_cache.parent.mkdir(parents=True, exist_ok=True) - if saved_cookies: - print(json.dumps({"status": "checking_login"}), flush=True) - logged_in = False + logged_in = False + login_error: str | None = None - def check_action(page): - nonlocal logged_in - logged_in = _cf_check_logged_in(page) + def check_login(page): + nonlocal logged_in + logged_in = page.evaluate( + "() => Array.from(document.querySelectorAll('a'))" + ".some(a => a.textContent.includes('Logout'))" + ) + def login_action(page): + nonlocal login_error try: - with StealthySession( - headless=True, - timeout=BROWSER_SESSION_TIMEOUT, - google_search=False, - cookies=saved_cookies, - ) as session: - session.fetch( - f"{BASE_URL}/", page_action=check_action, solve_cloudflare=True - ) - if logged_in: - return LoginResult(success=True, error="") - except Exception: - pass - - login_action, get_error = _cf_login_action(credentials) + page.fill( + 'input[name="handleOrEmail"]', + credentials.get("username", ""), + ) + page.fill( + 'input[name="password"]', + credentials.get("password", ""), + ) + page.locator('#enterForm input[type="submit"]').click() + page.wait_for_url( + lambda url: "/enter" not in url, timeout=BROWSER_NAV_TIMEOUT + ) + except Exception as e: + login_error = str(e) try: with StealthySession( @@ -415,17 +389,14 @@ def _login_headless_cf(credentials: dict[str, str]) -> LoginResult: page_action=login_action, solve_cloudflare=True, ) - login_error = get_error() if login_error: return LoginResult(success=False, error=f"Login failed: {login_error}") - logged_in = False - - def verify_action(page): - nonlocal logged_in - logged_in = _cf_check_logged_in(page) - - session.fetch(f"{BASE_URL}/", page_action=verify_action, network_idle=True) + session.fetch( + f"{BASE_URL}/", + page_action=check_login, + network_idle=True, + ) if not logged_in: return LoginResult( success=False, error="Login failed (bad credentials?)" @@ -433,8 +404,8 @@ def _login_headless_cf(credentials: dict[str, str]) -> LoginResult: try: browser_cookies = session.context.cookies() - if any(c.get("name") == "X-User-Sha1" for c in browser_cookies): - save_platform_cookies("codeforces", browser_cookies) + if any(c.get("name") == "X-User-Handle" for c in browser_cookies): + cookie_cache.write_text(json.dumps(browser_cookies)) except Exception: pass @@ -467,19 +438,44 @@ def _submit_headless( _ensure_browser() + cookie_cache = Path.home() / ".cache" / "cp-nvim" / "codeforces-cookies.json" + cookie_cache.parent.mkdir(parents=True, exist_ok=True) saved_cookies: list[dict[str, Any]] = [] - if not _retried: - saved_cookies = load_platform_cookies("codeforces") or [] + if cookie_cache.exists(): + try: + saved_cookies = json.loads(cookie_cache.read_text()) + except Exception: + pass - logged_in = bool(saved_cookies) + logged_in = cookie_cache.exists() and not _retried + login_error: str | None = None submit_error: str | None = None needs_relogin = False def check_login(page): nonlocal logged_in - logged_in = _cf_check_logged_in(page) + logged_in = page.evaluate( + "() => Array.from(document.querySelectorAll('a'))" + ".some(a => a.textContent.includes('Logout'))" + ) - _login_action, _get_login_error = _cf_login_action(credentials) + def login_action(page): + nonlocal login_error + try: + page.fill( + 'input[name="handleOrEmail"]', + credentials.get("username", ""), + ) + page.fill( + 'input[name="password"]', + credentials.get("password", ""), + ) + page.locator('#enterForm input[type="submit"]').click() + page.wait_for_url( + lambda url: "/enter" not in url, timeout=BROWSER_NAV_TIMEOUT + ) + except Exception as e: + login_error = str(e) def submit_action(page): nonlocal submit_error, needs_relogin @@ -524,27 +520,27 @@ def _submit_headless( headless=True, timeout=BROWSER_SESSION_TIMEOUT, google_search=False, - cookies=saved_cookies if saved_cookies else [], + cookies=saved_cookies if (cookie_cache.exists() and not _retried) else [], ) as session: - if not _retried and saved_cookies: + if not (cookie_cache.exists() and not _retried): print(json.dumps({"status": "checking_login"}), flush=True) session.fetch( - f"{BASE_URL}/", page_action=check_login, solve_cloudflare=True + f"{BASE_URL}/", + page_action=check_login, + network_idle=True, ) if not logged_in: print(json.dumps({"status": "logging_in"}), flush=True) session.fetch( f"{BASE_URL}/enter", - page_action=_login_action, + page_action=login_action, solve_cloudflare=True, ) - login_error = _get_login_error() if login_error: return SubmitResult( success=False, error=f"Login failed: {login_error}" ) - logged_in = True print(json.dumps({"status": "submitting"}), flush=True) session.fetch( @@ -555,13 +551,13 @@ def _submit_headless( try: browser_cookies = session.context.cookies() - if any(c.get("name") == "X-User-Sha1" for c in browser_cookies): - save_platform_cookies("codeforces", browser_cookies) + if any(c.get("name") == "X-User-Handle" for c in browser_cookies): + cookie_cache.write_text(json.dumps(browser_cookies)) except Exception: pass if needs_relogin and not _retried: - clear_platform_cookies("codeforces") + cookie_cache.unlink(missing_ok=True) return _submit_headless( contest_id, problem_id, diff --git a/scrapers/kattis.py b/scrapers/kattis.py index ac2c157..1177445 100644 --- a/scrapers/kattis.py +++ b/scrapers/kattis.py @@ -10,13 +10,7 @@ from pathlib import Path import httpx -from .base import ( - BaseScraper, - clear_platform_cookies, - extract_precision, - load_platform_cookies, - save_platform_cookies, -) +from .base import BaseScraper, extract_precision from .timeouts import HTTP_TIMEOUT from .models import ( ContestListResult, @@ -34,6 +28,8 @@ HEADERS = { } CONNECTIONS = 8 +_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "kattis-cookies.json" + TIME_RE = re.compile( r"CPU Time limit\s*]*>\s*(\d+)\s*seconds?\s*", re.DOTALL, @@ -213,24 +209,20 @@ async def _stream_single_problem(client: httpx.AsyncClient, slug: str) -> None: async def _load_kattis_cookies(client: httpx.AsyncClient) -> None: - data = load_platform_cookies("kattis") - if isinstance(data, dict): - for k, v in data.items(): + if not _COOKIE_PATH.exists(): + return + try: + for k, v in json.loads(_COOKIE_PATH.read_text()).items(): client.cookies.set(k, v) + except Exception: + pass async def _save_kattis_cookies(client: httpx.AsyncClient) -> None: - cookies = dict(client.cookies.items()) + cookies = {k: v for k, v in client.cookies.items()} if cookies: - save_platform_cookies("kattis", cookies) - - -async def _check_kattis_login(client: httpx.AsyncClient) -> bool: - try: - r = await client.get(BASE_URL, headers=HEADERS, timeout=HTTP_TIMEOUT) - return bool(r.headers.get("x-username")) - except Exception: - return False + _COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True) + _COOKIE_PATH.write_text(json.dumps(cookies)) async def _do_kattis_login( @@ -337,10 +329,9 @@ class KattisScraper(BaseScraper): return self._submit_error("Missing credentials. Use :CP kattis login") async with httpx.AsyncClient(follow_redirects=True) as client: + print(json.dumps({"status": "checking_login"}), flush=True) await _load_kattis_cookies(client) - if client.cookies: - print(json.dumps({"status": "checking_login"}), flush=True) - else: + if not client.cookies: print(json.dumps({"status": "logging_in"}), flush=True) ok = await _do_kattis_login(client, username, password) if not ok: @@ -377,7 +368,7 @@ class KattisScraper(BaseScraper): return self._submit_error(f"Submit request failed: {e}") if r.status_code in (400, 403) or r.text == "Request validation failed": - clear_platform_cookies("kattis") + _COOKIE_PATH.unlink(missing_ok=True) print(json.dumps({"status": "logging_in"}), flush=True) ok = await _do_kattis_login(client, username, password) if not ok: @@ -408,16 +399,6 @@ class KattisScraper(BaseScraper): return self._login_error("Missing username or password") async with httpx.AsyncClient(follow_redirects=True) as client: - await _load_kattis_cookies(client) - if client.cookies: - print(json.dumps({"status": "checking_login"}), flush=True) - if await _check_kattis_login(client): - return LoginResult( - success=True, - error="", - credentials={"username": username, "password": password}, - ) - print(json.dumps({"status": "logging_in"}), flush=True) ok = await _do_kattis_login(client, username, password) if not ok: diff --git a/scrapers/usaco.py b/scrapers/usaco.py index b6e95d2..b009cf0 100644 --- a/scrapers/usaco.py +++ b/scrapers/usaco.py @@ -8,12 +8,7 @@ from typing import Any, cast import httpx -from .base import ( - BaseScraper, - extract_precision, - load_platform_cookies, - save_platform_cookies, -) +from .base import BaseScraper, extract_precision from .timeouts import HTTP_TIMEOUT from .models import ( ContestListResult, @@ -32,6 +27,7 @@ HEADERS = { } CONNECTIONS = 4 +_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "usaco-cookies.json" _LOGIN_PATH = "/current/tpcm/login-session.php" _SUBMIT_PATH = "/current/tpcm/submit-solution.php" @@ -206,16 +202,20 @@ def _parse_submit_form( async def _load_usaco_cookies(client: httpx.AsyncClient) -> None: - data = load_platform_cookies("usaco") - if isinstance(data, dict): - for k, v in data.items(): + if not _COOKIE_PATH.exists(): + return + try: + for k, v in json.loads(_COOKIE_PATH.read_text()).items(): client.cookies.set(k, v) + except Exception: + pass async def _save_usaco_cookies(client: httpx.AsyncClient) -> None: - cookies = dict(client.cookies.items()) + cookies = {k: v for k, v in client.cookies.items()} if cookies: - save_platform_cookies("usaco", cookies) + _COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True) + _COOKIE_PATH.write_text(json.dumps(cookies)) async def _check_usaco_login(client: httpx.AsyncClient, username: str) -> bool: