diff --git a/scrapers/codeforces.py b/scrapers/codeforces.py index c0495d8..7fc5c1c 100644 --- a/scrapers/codeforces.py +++ b/scrapers/codeforces.py @@ -2,7 +2,9 @@ import asyncio import json +import os import re +import tempfile from typing import Any import requests @@ -10,6 +12,7 @@ from bs4 import BeautifulSoup, Tag from curl_cffi import requests as curl_requests from .base import BaseScraper, extract_precision +from .language_ids import get_language_id from .models import ( ContestListResult, ContestSummary, @@ -289,13 +292,167 @@ class CodeforcesScraper(BaseScraper): language_id: str, credentials: dict[str, str], ) -> SubmitResult: + return await asyncio.to_thread( + _submit_headless, + contest_id, + problem_id, + source_code, + language_id, + credentials, + ) + + +def _submit_headless( + contest_id: str, + problem_id: str, + source_code: str, + language_id: str, + credentials: dict[str, str], +) -> SubmitResult: + from pathlib import Path + + try: + from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import] + except ImportError: return SubmitResult( success=False, - error="Codeforces submit not yet implemented", - submission_id="", - verdict="", + error="scrapling is required for Codeforces submit", ) + from .atcoder import _ensure_browser, _solve_turnstile + + _ensure_browser() + + cookie_cache = ( + Path.home() / ".cache" / "cp-nvim" / "codeforces-cookies.json" + ) + cookie_cache.parent.mkdir(parents=True, exist_ok=True) + saved_cookies: list[dict[str, Any]] = [] + if cookie_cache.exists(): + try: + saved_cookies = json.loads(cookie_cache.read_text()) + except Exception: + pass + + login_error: str | None = None + submit_error: str | None = None + + def do_login_and_submit(page): + nonlocal login_error, submit_error + + has_submit_form = page.evaluate( + "() => !!document.querySelector('form.submit-form')" + ) + + if not has_submit_form: + if "/enter" not in page.url: + page.goto( + f"{BASE_URL}/enter", + wait_until="domcontentloaded", + timeout=10000, + ) + + try: + _solve_turnstile(page) + except Exception: + pass + + print(json.dumps({"status": "logging_in"}), flush=True) + try: + page.fill( + 'input[name="handleOrEmail"]', + credentials.get("username", ""), + ) + page.fill( + 'input[name="password"]', + credentials.get("password", ""), + ) + page.locator( + '#enterForm input[type="submit"]' + ).click() + page.wait_for_url( + lambda url: "/enter" not in url, timeout=10000 + ) + except Exception as e: + login_error = str(e) + return + + page.goto( + f"{BASE_URL}/contest/{contest_id}/submit", + wait_until="domcontentloaded", + timeout=10000, + ) + + print(json.dumps({"status": "submitting"}), flush=True) + try: + page.select_option( + 'select[name="submittedProblemIndex"]', + problem_id.upper(), + ) + page.select_option( + 'select[name="programTypeId"]', language_id + ) + with tempfile.NamedTemporaryFile( + mode="w", suffix=".cpp", delete=False, prefix="cf_" + ) as tf: + tf.write(source_code) + tmp_path = tf.name + try: + page.set_input_files( + 'input[name="sourceFile"]', tmp_path + ) + page.wait_for_timeout(500) + except Exception: + page.fill('textarea[name="source"]', source_code) + finally: + os.unlink(tmp_path) + page.locator('form.submit-form input.submit').click() + page.wait_for_url( + lambda url: "/my" in url or "/status" in url, + timeout=10000, + ) + except Exception as e: + submit_error = str(e) + + try: + with StealthySession( + headless=True, + timeout=15000, + google_search=False, + cookies=saved_cookies, + ) as session: + print(json.dumps({"status": "checking_login"}), flush=True) + session.fetch( + f"{BASE_URL}/contest/{contest_id}/submit", + page_action=do_login_and_submit, + solve_cloudflare=True, + ) + + try: + browser_cookies = session.context.cookies() + if any( + c["name"] == "JSESSIONID" for c in browser_cookies + ): + cookie_cache.write_text(json.dumps(browser_cookies)) + except Exception: + pass + + if login_error: + return SubmitResult( + success=False, error=f"Login failed: {login_error}" + ) + if submit_error: + return SubmitResult(success=False, error=submit_error) + + return SubmitResult( + success=True, + error="", + submission_id="", + verdict="submitted", + ) + except Exception as e: + return SubmitResult(success=False, error=str(e)) + if __name__ == "__main__": CodeforcesScraper().run_cli()