diff --git a/scrapers/usaco.py b/scrapers/usaco.py index 221811c..73ec6b1 100644 --- a/scrapers/usaco.py +++ b/scrapers/usaco.py @@ -3,6 +3,7 @@ import asyncio import json import re +from pathlib import Path from typing import Any, cast import httpx @@ -20,11 +21,22 @@ from .models import ( ) BASE_URL = "http://www.usaco.org" +_AUTH_BASE = "https://usaco.org" HEADERS = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } CONNECTIONS = 4 +_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "usaco-cookies.json" +_LOGIN_PATH = "/current/tpcm/login-session.php" +_SUBMIT_PATH = "/current/tpcm/submitproblem.php" + +_LANG_KEYWORDS: dict[str, list[str]] = { + "cpp": ["c++17", "c++ 17", "g++17", "c++", "cpp"], + "python": ["python3", "python 3", "python"], + "java": ["java"], +} + MONTHS = [ "dec", "jan", @@ -127,6 +139,110 @@ def _parse_problem_page(html: str) -> dict[str, Any]: } +def _pick_lang_option(select_body: str, language_id: str) -> str | None: + keywords = _LANG_KEYWORDS.get(language_id.lower(), [language_id.lower()]) + for m in re.finditer( + r']*\bvalue=["\']([^"\']*)["\'][^>]*>([^<]+)', + select_body, + re.IGNORECASE, + ): + val, text = m.group(1), m.group(2).strip().lower() + for kw in keywords: + if kw in text: + return val + return None + + +def _parse_submit_form( + html: str, language_id: str +) -> tuple[str, dict[str, str], str | None]: + form_action = _AUTH_BASE + _SUBMIT_PATH + hidden: dict[str, str] = {} + lang_val: str | None = None + for form_m in re.finditer( + r']*action=["\']([^"\']+)["\'][^>]*>(.*?)', + html, + re.DOTALL | re.IGNORECASE, + ): + action, body = form_m.group(1), form_m.group(2) + if "sub_file" not in body.lower(): + continue + if action.startswith("http"): + form_action = action + elif action.startswith("/"): + form_action = _AUTH_BASE + action + else: + form_action = _AUTH_BASE + "/" + action + for input_m in re.finditer( + r']*\btype=["\']hidden["\'][^>]*/?>', + body, + re.IGNORECASE, + ): + tag = input_m.group(0) + name_m = re.search(r'\bname=["\']([^"\']+)["\']', tag, re.IGNORECASE) + val_m = re.search(r'\bvalue=["\']([^"\']*)["\']', tag, re.IGNORECASE) + if name_m and val_m: + hidden[name_m.group(1)] = val_m.group(2) + for sel_m in re.finditer( + r']*\bname=["\']([^"\']+)["\'][^>]*>(.*?)', + body, + re.DOTALL | re.IGNORECASE, + ): + name, sel_body = sel_m.group(1), sel_m.group(2) + if "lang" in name.lower(): + lang_val = _pick_lang_option(sel_body, language_id) + break + break + return form_action, hidden, lang_val + + +async def _load_usaco_cookies(client: httpx.AsyncClient) -> None: + if not _COOKIE_PATH.exists(): + return + try: + for k, v in json.loads(_COOKIE_PATH.read_text()).items(): + client.cookies.set(k, v) + except Exception: + pass + + +async def _save_usaco_cookies(client: httpx.AsyncClient) -> None: + cookies = {k: v for k, v in client.cookies.items()} + if cookies: + _COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True) + _COOKIE_PATH.write_text(json.dumps(cookies)) + + +async def _check_usaco_login(client: httpx.AsyncClient, username: str) -> bool: + try: + r = await client.get( + f"{_AUTH_BASE}/index.php", + headers=HEADERS, + timeout=HTTP_TIMEOUT, + ) + text = r.text.lower() + return username.lower() in text or "logout" in text + except Exception: + return False + + +async def _do_usaco_login( + client: httpx.AsyncClient, username: str, password: str +) -> bool: + r = await client.post( + f"{_AUTH_BASE}{_LOGIN_PATH}", + data={"user": username, "password": password}, + headers=HEADERS, + timeout=HTTP_TIMEOUT, + ) + r.raise_for_status() + try: + data = r.json() + return bool(data.get("success") or data.get("status") == "success") + except Exception: + return r.status_code == 200 and "error" not in r.text.lower() + + class USACOScraper(BaseScraper): @property def platform_name(self) -> str: @@ -293,15 +409,99 @@ class USACOScraper(BaseScraper): language_id: str, credentials: dict[str, str], ) -> SubmitResult: - return SubmitResult( - success=False, - error="USACO submit not yet implemented", - submission_id="", - verdict="", - ) + source = Path(file_path).read_bytes() + username = credentials.get("username", "") + password = credentials.get("password", "") + if not username or not password: + return self._submit_error("Missing credentials. Use :CP usaco login") + + async with httpx.AsyncClient(follow_redirects=True) as client: + await _load_usaco_cookies(client) + print(json.dumps({"status": "checking_login"}), flush=True) + logged_in = bool(client.cookies) and await _check_usaco_login( + client, username + ) + if not logged_in: + print(json.dumps({"status": "logging_in"}), flush=True) + try: + ok = await _do_usaco_login(client, username, password) + except Exception as e: + return self._submit_error(f"Login failed: {e}") + if not ok: + return self._submit_error("Login failed (bad credentials?)") + await _save_usaco_cookies(client) + + print(json.dumps({"status": "submitting"}), flush=True) + try: + page_r = await client.get( + f"{_AUTH_BASE}/index.php?page=viewproblem2&cpid={problem_id}", + headers=HEADERS, + timeout=HTTP_TIMEOUT, + ) + form_url, hidden_fields, lang_val = _parse_submit_form( + page_r.text, language_id + ) + except Exception: + form_url = _AUTH_BASE + _SUBMIT_PATH + hidden_fields = {} + lang_val = None + + data: dict[str, str] = {"cpid": problem_id, **hidden_fields} + data["language"] = lang_val if lang_val is not None else language_id + ext = "py" if "python" in language_id.lower() else "cpp" + try: + r = await client.post( + form_url, + data=data, + files={"sub_file[]": (f"solution.{ext}", source, "text/plain")}, + headers=HEADERS, + timeout=HTTP_TIMEOUT, + ) + r.raise_for_status() + except Exception as e: + return self._submit_error(f"Submit request failed: {e}") + + try: + resp = r.json() + sid = str(resp.get("submission_id", resp.get("id", ""))) + except Exception: + sid = "" + return SubmitResult( + success=True, error="", submission_id=sid, verdict="submitted" + ) async def login(self, credentials: dict[str, str]) -> LoginResult: - return self._login_error("USACO login not yet implemented") + username = credentials.get("username", "") + password = credentials.get("password", "") + if not username or not password: + return self._login_error("Missing username or password") + + async with httpx.AsyncClient(follow_redirects=True) as client: + await _load_usaco_cookies(client) + if client.cookies: + print(json.dumps({"status": "checking_login"}), flush=True) + if await _check_usaco_login(client, username): + return LoginResult( + success=True, + error="", + credentials={"username": username, "password": password}, + ) + + print(json.dumps({"status": "logging_in"}), flush=True) + try: + ok = await _do_usaco_login(client, username, password) + except Exception as e: + return self._login_error(f"Login request failed: {e}") + + if not ok: + return self._login_error("Login failed (bad credentials?)") + + await _save_usaco_cookies(client) + return LoginResult( + success=True, + error="", + credentials={"username": username, "password": password}, + ) if __name__ == "__main__":