diff --git a/scrapers/atcoder.py b/scrapers/atcoder.py index 9b1a574..e0cec19 100644 --- a/scrapers/atcoder.py +++ b/scrapers/atcoder.py @@ -4,7 +4,9 @@ import asyncio import json import os import re +import subprocess import sys +import tempfile import time from typing import Any @@ -233,6 +235,185 @@ def _extract_samples(html: str) -> list[TestCase]: return cases +_TURNSTILE_JS = '() => { const el = document.querySelector(\'[name="cf-turnstile-response"]\'); return el && el.value.length > 0; }' + + +def _solve_turnstile(page) -> None: + for _ in range(6): + has_token = page.evaluate(_TURNSTILE_JS) + if has_token: + return + try: + box = page.locator( + 'iframe[src*="challenges.cloudflare.com"]' + ).first.bounding_box() + if box: + page.mouse.click( + box["x"] + box["width"] * 0.15, + box["y"] + box["height"] * 0.5, + ) + except Exception: + pass + try: + page.wait_for_function(_TURNSTILE_JS, timeout=5000) + return + except Exception: + pass + raise RuntimeError("Turnstile not solved after multiple attempts") + + +def _ensure_browser() -> None: + try: + from patchright._impl._driver import compute_driver_executable + + node, cli = compute_driver_executable() + browser_info = subprocess.run( + [node, cli, "install", "--dry-run", "chromium"], + capture_output=True, + text=True, + ) + for line in browser_info.stdout.splitlines(): + if "Install location:" in line: + install_dir = line.split(":", 1)[1].strip() + if not os.path.isdir(install_dir): + print(json.dumps({"status": "installing_browser"}), flush=True) + subprocess.run( + [node, cli, "install", "chromium"], + check=True, + ) + break + except Exception: + pass + + +def _submit_headless( + contest_id: str, + problem_id: str, + source_code: str, + language_id: str, + credentials: dict[str, str], +) -> "SubmitResult": + from pathlib import Path + + try: + from scrapling.fetchers import StealthySession + except ImportError: + return SubmitResult( + success=False, + error="scrapling is required for AtCoder submit. Install it: uv add 'scrapling[fetchers]>=0.4'", + ) + + _ensure_browser() + + cookie_cache = Path.home() / ".cache" / "cp-nvim" / "atcoder-cookies.json" + cookie_cache.parent.mkdir(parents=True, exist_ok=True) + saved_cookies: list[dict[str, Any]] = [] + if cookie_cache.exists(): + try: + saved_cookies = json.loads(cookie_cache.read_text()) + except Exception: + pass + + logged_in = False + login_error: str | None = None + submit_error: str | None = None + + def check_login(page): + nonlocal logged_in + logged_in = page.evaluate( + "() => Array.from(document.querySelectorAll('a')).some(a => a.textContent.trim() === 'Sign Out')" + ) + + def login_action(page): + nonlocal login_error + try: + _solve_turnstile(page) + page.fill('input[name="username"]', credentials.get("username", "")) + page.fill('input[name="password"]', credentials.get("password", "")) + page.click("#submit") + page.wait_for_url(lambda url: "/login" not in url, timeout=60000) + except Exception as e: + login_error = str(e) + + def submit_action(page): + nonlocal submit_error + try: + _solve_turnstile(page) + page.select_option( + 'select[name="data.TaskScreenName"]', + f"{contest_id}_{problem_id}", + ) + page.locator( + f'select[name="data.LanguageId"] option[value="{language_id}"]' + ).wait_for(state="attached", timeout=15000) + page.select_option('select[name="data.LanguageId"]', language_id) + with tempfile.NamedTemporaryFile( + mode="w", suffix=".cpp", delete=False, prefix="atcoder_" + ) as tf: + tf.write(source_code) + tmp_path = tf.name + try: + page.set_input_files("#input-open-file", tmp_path) + page.wait_for_timeout(500) + finally: + os.unlink(tmp_path) + page.locator('button[type="submit"]').click() + page.wait_for_url( + lambda url: "/submissions/me" in url, timeout=60000 + ) + except Exception as e: + submit_error = str(e) + + try: + with StealthySession( + headless=True, + timeout=60000, + google_search=False, + cookies=saved_cookies, + ) as session: + print(json.dumps({"status": "checking_login"}), flush=True) + session.fetch( + f"{BASE_URL}/home", + page_action=check_login, + network_idle=True, + ) + + if not logged_in: + print(json.dumps({"status": "logging_in"}), flush=True) + session.fetch( + f"{BASE_URL}/login", + page_action=login_action, + solve_cloudflare=True, + ) + if login_error: + return SubmitResult( + success=False, error=f"Login failed: {login_error}" + ) + + print(json.dumps({"status": "submitting"}), flush=True) + session.fetch( + f"{BASE_URL}/contests/{contest_id}/submit", + page_action=submit_action, + solve_cloudflare=True, + ) + + try: + browser_cookies = session.context.cookies() + if any(c["name"] == "REVEL_SESSION" for c in browser_cookies): + cookie_cache.write_text(json.dumps(browser_cookies)) + except Exception: + pass + + if submit_error: + return SubmitResult(success=False, error=submit_error) + + return SubmitResult( + success=True, error="", submission_id="", verdict="submitted" + ) + except Exception as e: + return SubmitResult(success=False, error=str(e)) + + def _scrape_tasks_sync(contest_id: str) -> list[dict[str, str]]: html = _fetch(f"{BASE_URL}/contests/{contest_id}/tasks") return _parse_tasks_list(html) @@ -379,83 +560,9 @@ class AtcoderScraper(BaseScraper): language_id: str, credentials: dict[str, str], ) -> SubmitResult: - def _submit_sync() -> SubmitResult: - try: - from camoufox.sync_api import Camoufox - except ImportError: - return SubmitResult( - success=False, - error="camoufox is required for AtCoder login. Install it: uv add 'camoufox[geoip]'", - ) - - from curl_cffi import requests as curl_requests - - try: - with Camoufox(headless=True) as browser: - page = browser.new_page() - page.goto(f"{BASE_URL}/login", wait_until="domcontentloaded") - page.wait_for_load_state("networkidle") - page.fill('input[name="username"]', credentials.get("username", "")) - page.fill('input[name="password"]', credentials.get("password", "")) - page.click("#submit") - page.wait_for_url(lambda url: "/login" not in url, timeout=30000) - cookies = page.context.cookies() - - session = curl_requests.Session(impersonate="chrome") - for cookie in cookies: - session.cookies.set( - cookie["name"], - cookie["value"], - domain=cookie.get("domain", ""), - ) - - submit_page = session.get( - f"{BASE_URL}/contests/{contest_id}/submit", - timeout=TIMEOUT_SECONDS, - ) - submit_page.raise_for_status() - soup = BeautifulSoup(submit_page.text, "html.parser") - csrf_input = soup.find("input", {"name": "csrf_token"}) - if not csrf_input or not hasattr(csrf_input, "get"): - return SubmitResult( - success=False, error="Could not find CSRF token on submit page" - ) - csrf_token = csrf_input.get("value", "") or "" # type: ignore[union-attr] - - task_screen_name = f"{contest_id}_{problem_id}" - submit_resp = session.post( - f"{BASE_URL}/contests/{contest_id}/submit", - data={ - "data.TaskScreenName": task_screen_name, - "data.LanguageId": language_id, - "sourceCode": source_code, - "csrf_token": csrf_token, - }, - timeout=TIMEOUT_SECONDS, - allow_redirects=False, - ) - if submit_resp.status_code in (301, 302): - location = submit_resp.headers.get("Location", "") - if "/submissions/me" in location: - return SubmitResult( - success=True, - error="", - submission_id="", - verdict="submitted", - ) - return SubmitResult( - success=False, - error=f"Submit may have failed: redirected to {location}", - ) - submit_resp.raise_for_status() - return SubmitResult( - success=False, - error="Unexpected response from submit (expected redirect)", - ) - except Exception as e: - return SubmitResult(success=False, error=str(e)) - - return await asyncio.to_thread(_submit_sync) + return await asyncio.to_thread( + _submit_headless, contest_id, problem_id, source_code, language_id, credentials + ) async def main_async() -> int: