diff --git a/scrapers/codechef.py b/scrapers/codechef.py index 200d5e2..19d94a9 100644 --- a/scrapers/codechef.py +++ b/scrapers/codechef.py @@ -3,13 +3,14 @@ import asyncio import json import re +from pathlib import Path from typing import Any import httpx from curl_cffi import requests as curl_requests from .base import BaseScraper, extract_precision -from .timeouts import HTTP_TIMEOUT +from .timeouts import BROWSER_NAV_TIMEOUT, BROWSER_SESSION_TIMEOUT, HTTP_TIMEOUT from .models import ( ContestListResult, ContestSummary, @@ -29,6 +30,20 @@ HEADERS = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } CONNECTIONS = 8 + +_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "codechef-cookies.json" + +_CC_CHECK_LOGIN_JS = """() => { + const d = document.getElementById('__NEXT_DATA__'); + if (d) { + try { + const p = JSON.parse(d.textContent); + if (p?.props?.pageProps?.currentUser?.username) return true; + } catch(e) {} + } + return !!document.querySelector('a[href="/logout"]') || + !!document.querySelector('[class*="user-name"]'); +}""" MEMORY_LIMIT_RE = re.compile( r"Memory\s+[Ll]imit.*?([0-9.]+)\s*(MB|GB)", re.IGNORECASE | re.DOTALL ) @@ -57,6 +72,252 @@ def _fetch_html_sync(url: str) -> str: return response.text +def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult: + try: + from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import] + except ImportError: + return LoginResult( + success=False, + error="scrapling is required for CodeChef login", + ) + + from .atcoder import _ensure_browser + + _ensure_browser() + + _COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True) + saved_cookies: list[dict[str, Any]] = [] + if _COOKIE_PATH.exists(): + try: + saved_cookies = json.loads(_COOKIE_PATH.read_text()) + except Exception: + pass + + logged_in = False + login_error: str | None = None + + def check_login(page): + nonlocal logged_in + logged_in = page.evaluate(_CC_CHECK_LOGIN_JS) + + def login_action(page): + nonlocal login_error + try: + page.locator('input[type="email"], input[name="email"]').first.fill( + credentials.get("username", "") + ) + page.locator('input[type="password"], input[name="password"]').first.fill( + credentials.get("password", "") + ) + page.locator('button[type="submit"]').first.click() + page.wait_for_url( + lambda url: "/login" not in url, timeout=BROWSER_NAV_TIMEOUT + ) + except Exception as e: + login_error = str(e) + + try: + with StealthySession( + headless=True, + timeout=BROWSER_SESSION_TIMEOUT, + google_search=False, + cookies=saved_cookies if saved_cookies else [], + ) as session: + if saved_cookies: + print(json.dumps({"status": "checking_login"}), flush=True) + session.fetch(f"{BASE_URL}/", page_action=check_login, network_idle=True) + + if not logged_in: + print(json.dumps({"status": "logging_in"}), flush=True) + session.fetch(f"{BASE_URL}/login", page_action=login_action) + if login_error: + return LoginResult( + success=False, error=f"Login failed: {login_error}" + ) + + session.fetch(f"{BASE_URL}/", page_action=check_login, network_idle=True) + if not logged_in: + return LoginResult( + success=False, error="Login failed (bad credentials?)" + ) + + try: + browser_cookies = session.context.cookies() + if browser_cookies: + _COOKIE_PATH.write_text(json.dumps(browser_cookies)) + except Exception: + pass + + return LoginResult(success=True, error="") + except Exception as e: + return LoginResult(success=False, error=str(e)) + + +def _submit_headless_codechef( + contest_id: str, + problem_id: str, + file_path: str, + language_id: str, + credentials: dict[str, str], + _retried: bool = False, +) -> SubmitResult: + source_code = Path(file_path).read_text() + + try: + from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import] + except ImportError: + return SubmitResult( + success=False, + error="scrapling is required for CodeChef submit", + ) + + from .atcoder import _ensure_browser + + _ensure_browser() + + _COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True) + saved_cookies: list[dict[str, Any]] = [] + if _COOKIE_PATH.exists() and not _retried: + try: + saved_cookies = json.loads(_COOKIE_PATH.read_text()) + except Exception: + pass + + logged_in = bool(saved_cookies) and not _retried + login_error: str | None = None + submit_error: str | None = None + needs_relogin = False + + def check_login(page): + nonlocal logged_in + logged_in = page.evaluate(_CC_CHECK_LOGIN_JS) + + def login_action(page): + nonlocal login_error + try: + page.locator('input[type="email"], input[name="email"]').first.fill( + credentials.get("username", "") + ) + page.locator('input[type="password"], input[name="password"]').first.fill( + credentials.get("password", "") + ) + page.locator('button[type="submit"]').first.click() + page.wait_for_url( + lambda url: "/login" not in url, timeout=BROWSER_NAV_TIMEOUT + ) + except Exception as e: + login_error = str(e) + + def submit_action(page): + nonlocal submit_error, needs_relogin + if "/login" in page.url: + needs_relogin = True + return + try: + selected = False + selects = page.locator("select") + for i in range(selects.count()): + try: + sel = selects.nth(i) + opts = sel.locator("option").all_inner_texts() + match = next( + (o for o in opts if language_id.lower() in o.lower()), None + ) + if match: + sel.select_option(label=match) + selected = True + break + except Exception: + pass + + if not selected: + lang_trigger = page.locator( + '[class*="language"] button, [data-testid*="language"] button' + ).first + lang_trigger.click() + page.wait_for_timeout(500) + page.locator( + f'[role="option"]:has-text("{language_id}"), ' + f'li:has-text("{language_id}")' + ).first.click() + + page.evaluate( + """(code) => { + if (typeof monaco !== 'undefined') { + const models = monaco.editor.getModels(); + if (models.length > 0) { models[0].setValue(code); return; } + } + const cm = document.querySelector('.CodeMirror'); + if (cm && cm.CodeMirror) { cm.CodeMirror.setValue(code); return; } + const ta = document.querySelector('textarea'); + if (ta) { ta.value = code; ta.dispatchEvent(new Event('input', {bubbles: true})); } + }""", + source_code, + ) + + page.locator( + 'button[type="submit"]:has-text("Submit"), button:has-text("Submit Code")' + ).first.click() + page.wait_for_url( + lambda url: "/submit/" not in url or "submission" in url, + timeout=BROWSER_NAV_TIMEOUT * 2, + ) + except Exception as e: + submit_error = str(e) + + try: + with StealthySession( + headless=True, + timeout=BROWSER_SESSION_TIMEOUT, + google_search=False, + cookies=saved_cookies if (saved_cookies and not _retried) else [], + ) as session: + if not logged_in: + print(json.dumps({"status": "checking_login"}), flush=True) + session.fetch(f"{BASE_URL}/", page_action=check_login, network_idle=True) + + if not logged_in: + print(json.dumps({"status": "logging_in"}), flush=True) + session.fetch(f"{BASE_URL}/login", page_action=login_action) + if login_error: + return SubmitResult( + success=False, error=f"Login failed: {login_error}" + ) + + print(json.dumps({"status": "submitting"}), flush=True) + session.fetch( + f"{BASE_URL}/{contest_id}/submit/{problem_id}", + page_action=submit_action, + ) + + try: + browser_cookies = session.context.cookies() + if browser_cookies and logged_in: + _COOKIE_PATH.write_text(json.dumps(browser_cookies)) + except Exception: + pass + + if needs_relogin and not _retried: + _COOKIE_PATH.unlink(missing_ok=True) + return _submit_headless_codechef( + contest_id, + problem_id, + file_path, + language_id, + credentials, + _retried=True, + ) + + if submit_error: + return SubmitResult(success=False, error=submit_error) + + return SubmitResult( + success=True, error="", submission_id="", verdict="submitted" + ) + except Exception as e: + return SubmitResult(success=False, error=str(e)) + + class CodeChefScraper(BaseScraper): @property def platform_name(self) -> str: @@ -261,15 +522,21 @@ class CodeChefScraper(BaseScraper): language_id: str, credentials: dict[str, str], ) -> SubmitResult: - return SubmitResult( - success=False, - error="CodeChef submit not yet implemented", - submission_id="", - verdict="", + if not credentials.get("username") or not credentials.get("password"): + return self._submit_error("Missing credentials. Use :CP codechef login") + return await asyncio.to_thread( + _submit_headless_codechef, + contest_id, + problem_id, + file_path, + language_id, + credentials, ) async def login(self, credentials: dict[str, str]) -> LoginResult: - return self._login_error("CodeChef login not yet implemented") + if not credentials.get("username") or not credentials.get("password"): + return self._login_error("Missing username or password") + return await asyncio.to_thread(_login_headless_codechef, credentials) if __name__ == "__main__": diff --git a/scrapers/language_ids.py b/scrapers/language_ids.py index d6a0ae4..6870aa3 100644 --- a/scrapers/language_ids.py +++ b/scrapers/language_ids.py @@ -11,6 +11,18 @@ LANGUAGE_IDS = { "cpp": "C++17", "python": "Python3", }, + "usaco": { + "cpp": "cpp", + "python": "python", + }, + "kattis": { + "cpp": "C++17", + "python": "Python 3", + }, + "codechef": { + "cpp": "C++ 17", + "python": "Python 3", + }, }