refactor(atcoder): extract submit helpers to module level; emit status events

Problem: `_submit_sync` was a deeply nested closure containing
`_solve_turnstile` and the browser-install block as further nesting.
Status events went to stderr, which `run_scraper()` silently discards.

Solution: Extract `_TURNSTILE_JS`, `_solve_turnstile`, `_ensure_browser`,
and `_submit_headless` to module level. Status events (`installing_browser`,
`checking_login`, `logging_in`, `submitting`) now print to stdout as NDJSON.
`submit()` delegates to `asyncio.to_thread(_submit_headless, ...)`.
This commit is contained in:
Barrett Ruth 2026-03-04 18:59:31 -05:00
parent 886a99eb95
commit 6faf9c7537
Signed by: barrett
GPG key ID: A6C96C9349D2FC81

View file

@ -4,7 +4,9 @@ import asyncio
import json import json
import os import os
import re import re
import subprocess
import sys import sys
import tempfile
import time import time
from typing import Any from typing import Any
@ -233,6 +235,185 @@ def _extract_samples(html: str) -> list[TestCase]:
return cases return cases
_TURNSTILE_JS = '() => { const el = document.querySelector(\'[name="cf-turnstile-response"]\'); return el && el.value.length > 0; }'
def _solve_turnstile(page) -> None:
for _ in range(6):
has_token = page.evaluate(_TURNSTILE_JS)
if has_token:
return
try:
box = page.locator(
'iframe[src*="challenges.cloudflare.com"]'
).first.bounding_box()
if box:
page.mouse.click(
box["x"] + box["width"] * 0.15,
box["y"] + box["height"] * 0.5,
)
except Exception:
pass
try:
page.wait_for_function(_TURNSTILE_JS, timeout=5000)
return
except Exception:
pass
raise RuntimeError("Turnstile not solved after multiple attempts")
def _ensure_browser() -> None:
try:
from patchright._impl._driver import compute_driver_executable
node, cli = compute_driver_executable()
browser_info = subprocess.run(
[node, cli, "install", "--dry-run", "chromium"],
capture_output=True,
text=True,
)
for line in browser_info.stdout.splitlines():
if "Install location:" in line:
install_dir = line.split(":", 1)[1].strip()
if not os.path.isdir(install_dir):
print(json.dumps({"status": "installing_browser"}), flush=True)
subprocess.run(
[node, cli, "install", "chromium"],
check=True,
)
break
except Exception:
pass
def _submit_headless(
contest_id: str,
problem_id: str,
source_code: str,
language_id: str,
credentials: dict[str, str],
) -> "SubmitResult":
from pathlib import Path
try:
from scrapling.fetchers import StealthySession
except ImportError:
return SubmitResult(
success=False,
error="scrapling is required for AtCoder submit. Install it: uv add 'scrapling[fetchers]>=0.4'",
)
_ensure_browser()
cookie_cache = Path.home() / ".cache" / "cp-nvim" / "atcoder-cookies.json"
cookie_cache.parent.mkdir(parents=True, exist_ok=True)
saved_cookies: list[dict[str, Any]] = []
if cookie_cache.exists():
try:
saved_cookies = json.loads(cookie_cache.read_text())
except Exception:
pass
logged_in = False
login_error: str | None = None
submit_error: str | None = None
def check_login(page):
nonlocal logged_in
logged_in = page.evaluate(
"() => Array.from(document.querySelectorAll('a')).some(a => a.textContent.trim() === 'Sign Out')"
)
def login_action(page):
nonlocal login_error
try:
_solve_turnstile(page)
page.fill('input[name="username"]', credentials.get("username", ""))
page.fill('input[name="password"]', credentials.get("password", ""))
page.click("#submit")
page.wait_for_url(lambda url: "/login" not in url, timeout=60000)
except Exception as e:
login_error = str(e)
def submit_action(page):
nonlocal submit_error
try:
_solve_turnstile(page)
page.select_option(
'select[name="data.TaskScreenName"]',
f"{contest_id}_{problem_id}",
)
page.locator(
f'select[name="data.LanguageId"] option[value="{language_id}"]'
).wait_for(state="attached", timeout=15000)
page.select_option('select[name="data.LanguageId"]', language_id)
with tempfile.NamedTemporaryFile(
mode="w", suffix=".cpp", delete=False, prefix="atcoder_"
) as tf:
tf.write(source_code)
tmp_path = tf.name
try:
page.set_input_files("#input-open-file", tmp_path)
page.wait_for_timeout(500)
finally:
os.unlink(tmp_path)
page.locator('button[type="submit"]').click()
page.wait_for_url(
lambda url: "/submissions/me" in url, timeout=60000
)
except Exception as e:
submit_error = str(e)
try:
with StealthySession(
headless=True,
timeout=60000,
google_search=False,
cookies=saved_cookies,
) as session:
print(json.dumps({"status": "checking_login"}), flush=True)
session.fetch(
f"{BASE_URL}/home",
page_action=check_login,
network_idle=True,
)
if not logged_in:
print(json.dumps({"status": "logging_in"}), flush=True)
session.fetch(
f"{BASE_URL}/login",
page_action=login_action,
solve_cloudflare=True,
)
if login_error:
return SubmitResult(
success=False, error=f"Login failed: {login_error}"
)
print(json.dumps({"status": "submitting"}), flush=True)
session.fetch(
f"{BASE_URL}/contests/{contest_id}/submit",
page_action=submit_action,
solve_cloudflare=True,
)
try:
browser_cookies = session.context.cookies()
if any(c["name"] == "REVEL_SESSION" for c in browser_cookies):
cookie_cache.write_text(json.dumps(browser_cookies))
except Exception:
pass
if submit_error:
return SubmitResult(success=False, error=submit_error)
return SubmitResult(
success=True, error="", submission_id="", verdict="submitted"
)
except Exception as e:
return SubmitResult(success=False, error=str(e))
def _scrape_tasks_sync(contest_id: str) -> list[dict[str, str]]: def _scrape_tasks_sync(contest_id: str) -> list[dict[str, str]]:
html = _fetch(f"{BASE_URL}/contests/{contest_id}/tasks") html = _fetch(f"{BASE_URL}/contests/{contest_id}/tasks")
return _parse_tasks_list(html) return _parse_tasks_list(html)
@ -379,83 +560,9 @@ class AtcoderScraper(BaseScraper):
language_id: str, language_id: str,
credentials: dict[str, str], credentials: dict[str, str],
) -> SubmitResult: ) -> SubmitResult:
def _submit_sync() -> SubmitResult: return await asyncio.to_thread(
try: _submit_headless, contest_id, problem_id, source_code, language_id, credentials
from camoufox.sync_api import Camoufox )
except ImportError:
return SubmitResult(
success=False,
error="camoufox is required for AtCoder login. Install it: uv add 'camoufox[geoip]'",
)
from curl_cffi import requests as curl_requests
try:
with Camoufox(headless=True) as browser:
page = browser.new_page()
page.goto(f"{BASE_URL}/login", wait_until="domcontentloaded")
page.wait_for_load_state("networkidle")
page.fill('input[name="username"]', credentials.get("username", ""))
page.fill('input[name="password"]', credentials.get("password", ""))
page.click("#submit")
page.wait_for_url(lambda url: "/login" not in url, timeout=30000)
cookies = page.context.cookies()
session = curl_requests.Session(impersonate="chrome")
for cookie in cookies:
session.cookies.set(
cookie["name"],
cookie["value"],
domain=cookie.get("domain", ""),
)
submit_page = session.get(
f"{BASE_URL}/contests/{contest_id}/submit",
timeout=TIMEOUT_SECONDS,
)
submit_page.raise_for_status()
soup = BeautifulSoup(submit_page.text, "html.parser")
csrf_input = soup.find("input", {"name": "csrf_token"})
if not csrf_input or not hasattr(csrf_input, "get"):
return SubmitResult(
success=False, error="Could not find CSRF token on submit page"
)
csrf_token = csrf_input.get("value", "") or "" # type: ignore[union-attr]
task_screen_name = f"{contest_id}_{problem_id}"
submit_resp = session.post(
f"{BASE_URL}/contests/{contest_id}/submit",
data={
"data.TaskScreenName": task_screen_name,
"data.LanguageId": language_id,
"sourceCode": source_code,
"csrf_token": csrf_token,
},
timeout=TIMEOUT_SECONDS,
allow_redirects=False,
)
if submit_resp.status_code in (301, 302):
location = submit_resp.headers.get("Location", "")
if "/submissions/me" in location:
return SubmitResult(
success=True,
error="",
submission_id="",
verdict="submitted",
)
return SubmitResult(
success=False,
error=f"Submit may have failed: redirected to {location}",
)
submit_resp.raise_for_status()
return SubmitResult(
success=False,
error="Unexpected response from submit (expected redirect)",
)
except Exception as e:
return SubmitResult(success=False, error=str(e))
return await asyncio.to_thread(_submit_sync)
async def main_async() -> int: async def main_async() -> int: