diff --git a/scrapers/codechef.py b/scrapers/codechef.py index 200d5e2..2a48dd5 100644 --- a/scrapers/codechef.py +++ b/scrapers/codechef.py @@ -3,13 +3,14 @@ import asyncio import json import re +from pathlib import Path from typing import Any import httpx from curl_cffi import requests as curl_requests from .base import BaseScraper, extract_precision -from .timeouts import HTTP_TIMEOUT +from .timeouts import BROWSER_NAV_TIMEOUT, BROWSER_SESSION_TIMEOUT, HTTP_TIMEOUT from .models import ( ContestListResult, ContestSummary, @@ -29,6 +30,20 @@ HEADERS = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } CONNECTIONS = 8 + +_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "codechef-cookies.json" + +_CC_CHECK_LOGIN_JS = """() => { + const d = document.getElementById('__NEXT_DATA__'); + if (d) { + try { + const p = JSON.parse(d.textContent); + if (p?.props?.pageProps?.currentUser?.username) return true; + } catch(e) {} + } + return !!document.querySelector('a[href="/logout"]') || + !!document.querySelector('[class*="user-name"]'); +}""" MEMORY_LIMIT_RE = re.compile( r"Memory\s+[Ll]imit.*?([0-9.]+)\s*(MB|GB)", re.IGNORECASE | re.DOTALL ) @@ -57,6 +72,258 @@ def _fetch_html_sync(url: str) -> str: return response.text +def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult: + try: + from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import] + except ImportError: + return LoginResult( + success=False, + error="scrapling is required for CodeChef login", + ) + + from .atcoder import _ensure_browser + + _ensure_browser() + + _COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True) + saved_cookies: list[dict[str, Any]] = [] + if _COOKIE_PATH.exists(): + try: + saved_cookies = json.loads(_COOKIE_PATH.read_text()) + except Exception: + pass + + logged_in = False + login_error: str | None = None + + def check_login(page): + nonlocal logged_in + logged_in = page.evaluate(_CC_CHECK_LOGIN_JS) + + def login_action(page): + nonlocal login_error + try: + page.locator('input[type="email"], input[name="email"]').first.fill( + credentials.get("username", "") + ) + page.locator('input[type="password"], input[name="password"]').first.fill( + credentials.get("password", "") + ) + page.locator('button[type="submit"]').first.click() + page.wait_for_url( + lambda url: "/login" not in url, timeout=BROWSER_NAV_TIMEOUT + ) + except Exception as e: + login_error = str(e) + + try: + with StealthySession( + headless=True, + timeout=BROWSER_SESSION_TIMEOUT, + google_search=False, + cookies=saved_cookies if saved_cookies else [], + ) as session: + if saved_cookies: + print(json.dumps({"status": "checking_login"}), flush=True) + session.fetch( + f"{BASE_URL}/", page_action=check_login, network_idle=True + ) + + if not logged_in: + print(json.dumps({"status": "logging_in"}), flush=True) + session.fetch(f"{BASE_URL}/login", page_action=login_action) + if login_error: + return LoginResult( + success=False, error=f"Login failed: {login_error}" + ) + + session.fetch( + f"{BASE_URL}/", page_action=check_login, network_idle=True + ) + if not logged_in: + return LoginResult( + success=False, error="Login failed (bad credentials?)" + ) + + try: + browser_cookies = session.context.cookies() + if browser_cookies: + _COOKIE_PATH.write_text(json.dumps(browser_cookies)) + except Exception: + pass + + return LoginResult(success=True, error="") + except Exception as e: + return LoginResult(success=False, error=str(e)) + + +def _submit_headless_codechef( + contest_id: str, + problem_id: str, + file_path: str, + language_id: str, + credentials: dict[str, str], + _retried: bool = False, +) -> SubmitResult: + source_code = Path(file_path).read_text() + + try: + from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import] + except ImportError: + return SubmitResult( + success=False, + error="scrapling is required for CodeChef submit", + ) + + from .atcoder import _ensure_browser + + _ensure_browser() + + _COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True) + saved_cookies: list[dict[str, Any]] = [] + if _COOKIE_PATH.exists() and not _retried: + try: + saved_cookies = json.loads(_COOKIE_PATH.read_text()) + except Exception: + pass + + logged_in = bool(saved_cookies) and not _retried + login_error: str | None = None + submit_error: str | None = None + needs_relogin = False + + def check_login(page): + nonlocal logged_in + logged_in = page.evaluate(_CC_CHECK_LOGIN_JS) + + def login_action(page): + nonlocal login_error + try: + page.locator('input[type="email"], input[name="email"]').first.fill( + credentials.get("username", "") + ) + page.locator('input[type="password"], input[name="password"]').first.fill( + credentials.get("password", "") + ) + page.locator('button[type="submit"]').first.click() + page.wait_for_url( + lambda url: "/login" not in url, timeout=BROWSER_NAV_TIMEOUT + ) + except Exception as e: + login_error = str(e) + + def submit_action(page): + nonlocal submit_error, needs_relogin + if "/login" in page.url: + needs_relogin = True + return + try: + selected = False + selects = page.locator("select") + for i in range(selects.count()): + try: + sel = selects.nth(i) + opts = sel.locator("option").all_inner_texts() + match = next( + (o for o in opts if language_id.lower() in o.lower()), None + ) + if match: + sel.select_option(label=match) + selected = True + break + except Exception: + pass + + if not selected: + lang_trigger = page.locator( + '[class*="language"] button, [data-testid*="language"] button' + ).first + lang_trigger.click() + page.wait_for_timeout(500) + page.locator( + f'[role="option"]:has-text("{language_id}"), ' + f'li:has-text("{language_id}")' + ).first.click() + + page.evaluate( + """(code) => { + if (typeof monaco !== 'undefined') { + const models = monaco.editor.getModels(); + if (models.length > 0) { models[0].setValue(code); return; } + } + const cm = document.querySelector('.CodeMirror'); + if (cm && cm.CodeMirror) { cm.CodeMirror.setValue(code); return; } + const ta = document.querySelector('textarea'); + if (ta) { ta.value = code; ta.dispatchEvent(new Event('input', {bubbles: true})); } + }""", + source_code, + ) + + page.locator( + 'button[type="submit"]:has-text("Submit"), button:has-text("Submit Code")' + ).first.click() + page.wait_for_url( + lambda url: "/submit/" not in url or "submission" in url, + timeout=BROWSER_NAV_TIMEOUT * 2, + ) + except Exception as e: + submit_error = str(e) + + try: + with StealthySession( + headless=True, + timeout=BROWSER_SESSION_TIMEOUT, + google_search=False, + cookies=saved_cookies if (saved_cookies and not _retried) else [], + ) as session: + if not logged_in: + print(json.dumps({"status": "checking_login"}), flush=True) + session.fetch( + f"{BASE_URL}/", page_action=check_login, network_idle=True + ) + + if not logged_in: + print(json.dumps({"status": "logging_in"}), flush=True) + session.fetch(f"{BASE_URL}/login", page_action=login_action) + if login_error: + return SubmitResult( + success=False, error=f"Login failed: {login_error}" + ) + + print(json.dumps({"status": "submitting"}), flush=True) + session.fetch( + f"{BASE_URL}/{contest_id}/submit/{problem_id}", + page_action=submit_action, + ) + + try: + browser_cookies = session.context.cookies() + if browser_cookies and logged_in: + _COOKIE_PATH.write_text(json.dumps(browser_cookies)) + except Exception: + pass + + if needs_relogin and not _retried: + _COOKIE_PATH.unlink(missing_ok=True) + return _submit_headless_codechef( + contest_id, + problem_id, + file_path, + language_id, + credentials, + _retried=True, + ) + + if submit_error: + return SubmitResult(success=False, error=submit_error) + + return SubmitResult( + success=True, error="", submission_id="", verdict="submitted" + ) + except Exception as e: + return SubmitResult(success=False, error=str(e)) + + class CodeChefScraper(BaseScraper): @property def platform_name(self) -> str: @@ -261,15 +528,21 @@ class CodeChefScraper(BaseScraper): language_id: str, credentials: dict[str, str], ) -> SubmitResult: - return SubmitResult( - success=False, - error="CodeChef submit not yet implemented", - submission_id="", - verdict="", + if not credentials.get("username") or not credentials.get("password"): + return self._submit_error("Missing credentials. Use :CP codechef login") + return await asyncio.to_thread( + _submit_headless_codechef, + contest_id, + problem_id, + file_path, + language_id, + credentials, ) async def login(self, credentials: dict[str, str]) -> LoginResult: - return self._login_error("CodeChef login not yet implemented") + if not credentials.get("username") or not credentials.get("password"): + return self._login_error("Missing username or password") + return await asyncio.to_thread(_login_headless_codechef, credentials) if __name__ == "__main__": diff --git a/scrapers/kattis.py b/scrapers/kattis.py index 9b11395..43ce1f3 100644 --- a/scrapers/kattis.py +++ b/scrapers/kattis.py @@ -6,6 +6,7 @@ import json import re import zipfile from datetime import datetime +from pathlib import Path import httpx @@ -27,6 +28,8 @@ HEADERS = { } CONNECTIONS = 8 +_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "kattis-cookies.json" + TIME_RE = re.compile( r"CPU Time limit\s*]*>\s*(\d+)\s*seconds?\s*", re.DOTALL, @@ -201,6 +204,44 @@ async def _stream_single_problem(client: httpx.AsyncClient, slug: str) -> None: ) +async def _load_kattis_cookies(client: httpx.AsyncClient) -> None: + if not _COOKIE_PATH.exists(): + return + try: + for k, v in json.loads(_COOKIE_PATH.read_text()).items(): + client.cookies.set(k, v) + except Exception: + pass + + +async def _save_kattis_cookies(client: httpx.AsyncClient) -> None: + cookies = {k: v for k, v in client.cookies.items()} + if cookies: + _COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True) + _COOKIE_PATH.write_text(json.dumps(cookies)) + + +async def _check_kattis_login(client: httpx.AsyncClient) -> bool: + try: + r = await client.get(BASE_URL + "/", headers=HEADERS, timeout=HTTP_TIMEOUT) + text = r.text.lower() + return "sign out" in text or "logout" in text or "my profile" in text + except Exception: + return False + + +async def _do_kattis_login( + client: httpx.AsyncClient, username: str, password: str +) -> bool: + r = await client.post( + f"{BASE_URL}/login/email", + data={"user": username, "password": password, "script": "true"}, + headers=HEADERS, + timeout=HTTP_TIMEOUT, + ) + return r.status_code == 200 and "login failed" not in r.text.lower() + + class KattisScraper(BaseScraper): @property def platform_name(self) -> str: @@ -245,7 +286,10 @@ class KattisScraper(BaseScraper): async def scrape_contest_list(self) -> ContestListResult: try: async with httpx.AsyncClient() as client: - html = await _fetch_text(client, f"{BASE_URL}/contests") + html = await _fetch_text( + client, + f"{BASE_URL}/contests?kattis_original=on&kattis_recycled=off&user_created=off", + ) contests = _parse_contests_page(html) if not contests: return self._contests_error("No contests found") @@ -278,15 +322,81 @@ class KattisScraper(BaseScraper): language_id: str, credentials: dict[str, str], ) -> SubmitResult: - return SubmitResult( - success=False, - error="Kattis submit not yet implemented", - submission_id="", - verdict="", - ) + source = Path(file_path).read_bytes() + username = credentials.get("username", "") + password = credentials.get("password", "") + if not username or not password: + return self._submit_error("Missing credentials. Use :CP kattis login") + + async with httpx.AsyncClient(follow_redirects=True) as client: + await _load_kattis_cookies(client) + print(json.dumps({"status": "checking_login"}), flush=True) + logged_in = bool(client.cookies) and await _check_kattis_login(client) + if not logged_in: + print(json.dumps({"status": "logging_in"}), flush=True) + ok = await _do_kattis_login(client, username, password) + if not ok: + return self._submit_error("Login failed (bad credentials?)") + await _save_kattis_cookies(client) + + print(json.dumps({"status": "submitting"}), flush=True) + ext = "py" if "python" in language_id.lower() else "cpp" + data: dict[str, str] = { + "submit": "true", + "script": "true", + "language": language_id, + "problem": problem_id, + "mainclass": "", + "submit_ctr": "2", + } + if contest_id != problem_id: + data["contest"] = contest_id + try: + r = await client.post( + f"{BASE_URL}/submit", + data=data, + files={"sub_file[]": (f"solution.{ext}", source, "text/plain")}, + headers=HEADERS, + timeout=HTTP_TIMEOUT, + ) + r.raise_for_status() + except Exception as e: + return self._submit_error(f"Submit request failed: {e}") + + sid_m = re.search(r"Submission ID:\s*(\d+)", r.text, re.IGNORECASE) + sid = sid_m.group(1) if sid_m else "" + return SubmitResult( + success=True, error="", submission_id=sid, verdict="submitted" + ) async def login(self, credentials: dict[str, str]) -> LoginResult: - return self._login_error("Kattis login not yet implemented") + username = credentials.get("username", "") + password = credentials.get("password", "") + if not username or not password: + return self._login_error("Missing username or password") + + async with httpx.AsyncClient(follow_redirects=True) as client: + await _load_kattis_cookies(client) + if client.cookies: + print(json.dumps({"status": "checking_login"}), flush=True) + if await _check_kattis_login(client): + return LoginResult( + success=True, + error="", + credentials={"username": username, "password": password}, + ) + + print(json.dumps({"status": "logging_in"}), flush=True) + ok = await _do_kattis_login(client, username, password) + if not ok: + return self._login_error("Login failed (bad credentials?)") + + await _save_kattis_cookies(client) + return LoginResult( + success=True, + error="", + credentials={"username": username, "password": password}, + ) if __name__ == "__main__": diff --git a/scrapers/language_ids.py b/scrapers/language_ids.py index d6a0ae4..6870aa3 100644 --- a/scrapers/language_ids.py +++ b/scrapers/language_ids.py @@ -11,6 +11,18 @@ LANGUAGE_IDS = { "cpp": "C++17", "python": "Python3", }, + "usaco": { + "cpp": "cpp", + "python": "python", + }, + "kattis": { + "cpp": "C++17", + "python": "Python 3", + }, + "codechef": { + "cpp": "C++ 17", + "python": "Python 3", + }, } diff --git a/scrapers/usaco.py b/scrapers/usaco.py index 221811c..73ec6b1 100644 --- a/scrapers/usaco.py +++ b/scrapers/usaco.py @@ -3,6 +3,7 @@ import asyncio import json import re +from pathlib import Path from typing import Any, cast import httpx @@ -20,11 +21,22 @@ from .models import ( ) BASE_URL = "http://www.usaco.org" +_AUTH_BASE = "https://usaco.org" HEADERS = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } CONNECTIONS = 4 +_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "usaco-cookies.json" +_LOGIN_PATH = "/current/tpcm/login-session.php" +_SUBMIT_PATH = "/current/tpcm/submitproblem.php" + +_LANG_KEYWORDS: dict[str, list[str]] = { + "cpp": ["c++17", "c++ 17", "g++17", "c++", "cpp"], + "python": ["python3", "python 3", "python"], + "java": ["java"], +} + MONTHS = [ "dec", "jan", @@ -127,6 +139,110 @@ def _parse_problem_page(html: str) -> dict[str, Any]: } +def _pick_lang_option(select_body: str, language_id: str) -> str | None: + keywords = _LANG_KEYWORDS.get(language_id.lower(), [language_id.lower()]) + for m in re.finditer( + r']*\bvalue=["\']([^"\']*)["\'][^>]*>([^<]+)', + select_body, + re.IGNORECASE, + ): + val, text = m.group(1), m.group(2).strip().lower() + for kw in keywords: + if kw in text: + return val + return None + + +def _parse_submit_form( + html: str, language_id: str +) -> tuple[str, dict[str, str], str | None]: + form_action = _AUTH_BASE + _SUBMIT_PATH + hidden: dict[str, str] = {} + lang_val: str | None = None + for form_m in re.finditer( + r']*action=["\']([^"\']+)["\'][^>]*>(.*?)', + html, + re.DOTALL | re.IGNORECASE, + ): + action, body = form_m.group(1), form_m.group(2) + if "sub_file" not in body.lower(): + continue + if action.startswith("http"): + form_action = action + elif action.startswith("/"): + form_action = _AUTH_BASE + action + else: + form_action = _AUTH_BASE + "/" + action + for input_m in re.finditer( + r']*\btype=["\']hidden["\'][^>]*/?>', + body, + re.IGNORECASE, + ): + tag = input_m.group(0) + name_m = re.search(r'\bname=["\']([^"\']+)["\']', tag, re.IGNORECASE) + val_m = re.search(r'\bvalue=["\']([^"\']*)["\']', tag, re.IGNORECASE) + if name_m and val_m: + hidden[name_m.group(1)] = val_m.group(2) + for sel_m in re.finditer( + r']*\bname=["\']([^"\']+)["\'][^>]*>(.*?)', + body, + re.DOTALL | re.IGNORECASE, + ): + name, sel_body = sel_m.group(1), sel_m.group(2) + if "lang" in name.lower(): + lang_val = _pick_lang_option(sel_body, language_id) + break + break + return form_action, hidden, lang_val + + +async def _load_usaco_cookies(client: httpx.AsyncClient) -> None: + if not _COOKIE_PATH.exists(): + return + try: + for k, v in json.loads(_COOKIE_PATH.read_text()).items(): + client.cookies.set(k, v) + except Exception: + pass + + +async def _save_usaco_cookies(client: httpx.AsyncClient) -> None: + cookies = {k: v for k, v in client.cookies.items()} + if cookies: + _COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True) + _COOKIE_PATH.write_text(json.dumps(cookies)) + + +async def _check_usaco_login(client: httpx.AsyncClient, username: str) -> bool: + try: + r = await client.get( + f"{_AUTH_BASE}/index.php", + headers=HEADERS, + timeout=HTTP_TIMEOUT, + ) + text = r.text.lower() + return username.lower() in text or "logout" in text + except Exception: + return False + + +async def _do_usaco_login( + client: httpx.AsyncClient, username: str, password: str +) -> bool: + r = await client.post( + f"{_AUTH_BASE}{_LOGIN_PATH}", + data={"user": username, "password": password}, + headers=HEADERS, + timeout=HTTP_TIMEOUT, + ) + r.raise_for_status() + try: + data = r.json() + return bool(data.get("success") or data.get("status") == "success") + except Exception: + return r.status_code == 200 and "error" not in r.text.lower() + + class USACOScraper(BaseScraper): @property def platform_name(self) -> str: @@ -293,15 +409,99 @@ class USACOScraper(BaseScraper): language_id: str, credentials: dict[str, str], ) -> SubmitResult: - return SubmitResult( - success=False, - error="USACO submit not yet implemented", - submission_id="", - verdict="", - ) + source = Path(file_path).read_bytes() + username = credentials.get("username", "") + password = credentials.get("password", "") + if not username or not password: + return self._submit_error("Missing credentials. Use :CP usaco login") + + async with httpx.AsyncClient(follow_redirects=True) as client: + await _load_usaco_cookies(client) + print(json.dumps({"status": "checking_login"}), flush=True) + logged_in = bool(client.cookies) and await _check_usaco_login( + client, username + ) + if not logged_in: + print(json.dumps({"status": "logging_in"}), flush=True) + try: + ok = await _do_usaco_login(client, username, password) + except Exception as e: + return self._submit_error(f"Login failed: {e}") + if not ok: + return self._submit_error("Login failed (bad credentials?)") + await _save_usaco_cookies(client) + + print(json.dumps({"status": "submitting"}), flush=True) + try: + page_r = await client.get( + f"{_AUTH_BASE}/index.php?page=viewproblem2&cpid={problem_id}", + headers=HEADERS, + timeout=HTTP_TIMEOUT, + ) + form_url, hidden_fields, lang_val = _parse_submit_form( + page_r.text, language_id + ) + except Exception: + form_url = _AUTH_BASE + _SUBMIT_PATH + hidden_fields = {} + lang_val = None + + data: dict[str, str] = {"cpid": problem_id, **hidden_fields} + data["language"] = lang_val if lang_val is not None else language_id + ext = "py" if "python" in language_id.lower() else "cpp" + try: + r = await client.post( + form_url, + data=data, + files={"sub_file[]": (f"solution.{ext}", source, "text/plain")}, + headers=HEADERS, + timeout=HTTP_TIMEOUT, + ) + r.raise_for_status() + except Exception as e: + return self._submit_error(f"Submit request failed: {e}") + + try: + resp = r.json() + sid = str(resp.get("submission_id", resp.get("id", ""))) + except Exception: + sid = "" + return SubmitResult( + success=True, error="", submission_id=sid, verdict="submitted" + ) async def login(self, credentials: dict[str, str]) -> LoginResult: - return self._login_error("USACO login not yet implemented") + username = credentials.get("username", "") + password = credentials.get("password", "") + if not username or not password: + return self._login_error("Missing username or password") + + async with httpx.AsyncClient(follow_redirects=True) as client: + await _load_usaco_cookies(client) + if client.cookies: + print(json.dumps({"status": "checking_login"}), flush=True) + if await _check_usaco_login(client, username): + return LoginResult( + success=True, + error="", + credentials={"username": username, "password": password}, + ) + + print(json.dumps({"status": "logging_in"}), flush=True) + try: + ok = await _do_usaco_login(client, username, password) + except Exception as e: + return self._login_error(f"Login request failed: {e}") + + if not ok: + return self._login_error("Login failed (bad credentials?)") + + await _save_usaco_cookies(client) + return LoginResult( + success=True, + error="", + credentials={"username": username, "password": password}, + ) if __name__ == "__main__":