#!/usr/bin/env python3 import asyncio import json import os import re import sys import time from typing import Any import backoff import httpx import requests from bs4 import BeautifulSoup, Tag from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from .base import BaseScraper, extract_precision from .language_ids import get_language_id from .models import ( CombinedTest, ContestListResult, ContestSummary, MetadataResult, ProblemSummary, SubmitResult, TestCase, TestsResult, ) MIB_TO_MB = 1.048576 BASE_URL = "https://atcoder.jp" ARCHIVE_URL = f"{BASE_URL}/contests/archive" TIMEOUT_SECONDS = 30 HEADERS = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" } RETRY_STATUS = {429, 502, 503, 504} FATAL_STATUS = {400, 401, 403, 404, 410} _session = requests.Session() _adapter = HTTPAdapter( pool_connections=100, pool_maxsize=100, max_retries=Retry(total=0), ) _session.mount("https://", _adapter) _session.mount("http://", _adapter) def _give_up_requests(exc: Exception) -> bool: if isinstance(exc, requests.HTTPError) and exc.response is not None: return exc.response.status_code in FATAL_STATUS return False def _retry_after_requests(details): exc = details.get("exception") if isinstance(exc, requests.HTTPError) and exc.response is not None: ra = exc.response.headers.get("Retry-After") if ra: try: time.sleep(max(0.0, float(ra))) except ValueError: pass @backoff.on_exception( backoff.expo, (requests.ConnectionError, requests.Timeout, requests.HTTPError), max_tries=5, jitter=backoff.full_jitter, giveup=_give_up_requests, on_backoff=_retry_after_requests, ) def _fetch(url: str) -> str: r = _session.get(url, headers=HEADERS, timeout=TIMEOUT_SECONDS) if r.status_code in RETRY_STATUS: raise requests.HTTPError(response=r) r.raise_for_status() return r.text def _giveup_httpx(exc: Exception) -> bool: return ( isinstance(exc, httpx.HTTPStatusError) and exc.response is not None and (exc.response.status_code in FATAL_STATUS) ) @backoff.on_exception( backoff.expo, (httpx.ConnectError, httpx.ReadTimeout, httpx.HTTPStatusError), max_tries=5, jitter=backoff.full_jitter, giveup=_giveup_httpx, ) async def _get_async(client: httpx.AsyncClient, url: str) -> str: r = await client.get(url, headers=HEADERS, timeout=TIMEOUT_SECONDS) r.raise_for_status() return r.text def _text_from_pre(pre: Tag) -> str: return ( pre.get_text(separator="\n", strip=False) .replace("\r", "") .replace("\xa0", " ") .rstrip("\n") ) def _parse_last_page(html: str) -> int: soup = BeautifulSoup(html, "html.parser") nav = soup.select_one("ul.pagination") if not nav: return 1 nums = [] for a in nav.select("a"): s = a.get_text(strip=True) if s.isdigit(): nums.append(int(s)) return max(nums) if nums else 1 def _parse_start_time(tr: Tag) -> int | None: tds = tr.select("td") if not tds: return None time_el = tds[0].select_one("time.fixtime-full") if not time_el: return None text = time_el.get_text(strip=True) try: from datetime import datetime dt = datetime.strptime(text, "%Y-%m-%d %H:%M:%S%z") return int(dt.timestamp()) except (ValueError, TypeError): return None def _parse_archive_contests(html: str) -> list[ContestSummary]: soup = BeautifulSoup(html, "html.parser") tbody = soup.select_one("table.table-default tbody") or soup.select_one("tbody") if not tbody: return [] out: list[ContestSummary] = [] for tr in tbody.select("tr"): a = tr.select_one("a[href^='/contests/']") if not a: continue href_attr = a.get("href") if not isinstance(href_attr, str): continue m = re.search(r"/contests/([^/?#]+)", href_attr) if not m: continue cid = m.group(1) name = a.get_text(strip=True) start_time = _parse_start_time(tr) out.append( ContestSummary(id=cid, name=name, display_name=name, start_time=start_time) ) return out def _parse_tasks_list(html: str) -> list[dict[str, str]]: soup = BeautifulSoup(html, "html.parser") tbody = soup.select_one("table tbody") if not tbody: return [] rows: list[dict[str, str]] = [] for tr in tbody.select("tr"): tds = tr.select("td") if len(tds) < 2: continue letter = tds[0].get_text(strip=True) a = tds[1].select_one("a[href*='/tasks/']") if not a: continue href_attr = a.get("href") if not isinstance(href_attr, str): continue m = re.search(r"/contests/[^/]+/tasks/([^/?#]+)", href_attr) if not m: continue slug = m.group(1) title = a.get_text(strip=True) rows.append({"letter": letter, "title": title, "slug": slug}) return rows def _extract_problem_info(html: str) -> tuple[int, float, bool, float | None]: soup = BeautifulSoup(html, "html.parser") txt = soup.get_text(" ", strip=True) timeout_ms = 0 memory_mb = 0.0 ts = re.search(r"Time\s*Limit:\s*([\d.]+)\s*sec", txt, flags=re.I) if ts: timeout_ms = int(float(ts.group(1)) * 1000) ms = re.search(r"Memory\s*Limit:\s*(\d+)\s*MiB", txt, flags=re.I) if ms: memory_mb = float(ms.group(1)) * MIB_TO_MB div = soup.select_one("#problem-statement") body = div.get_text(" ", strip=True) if div else soup.get_text(" ", strip=True) interactive = "This is an interactive" in body precision = extract_precision(body) return timeout_ms, memory_mb, interactive, precision def _extract_samples(html: str) -> list[TestCase]: soup = BeautifulSoup(html, "html.parser") root = soup.select_one("#task-statement") or soup inputs: dict[str, str] = {} outputs: dict[str, str] = {} for h in root.find_all(re.compile(r"h[2-4]")): title = h.get_text(" ", strip=True) pre = h.find_next("pre") if not pre: continue t = _text_from_pre(pre) mi = re.search(r"Sample\s*Input\s*(\d+)", title, flags=re.I) mo = re.search(r"Sample\s*Output\s*(\d+)", title, flags=re.I) if mi: inputs[mi.group(1)] = t.strip() elif mo: outputs[mo.group(1)] = t.strip() cases: list[TestCase] = [] for k in sorted(set(inputs) & set(outputs), key=lambda s: int(s)): cases.append(TestCase(input=inputs[k], expected=outputs[k])) return cases def _scrape_tasks_sync(contest_id: str) -> list[dict[str, str]]: html = _fetch(f"{BASE_URL}/contests/{contest_id}/tasks") return _parse_tasks_list(html) def _scrape_problem_page_sync(contest_id: str, slug: str) -> dict[str, Any]: html = _fetch(f"{BASE_URL}/contests/{contest_id}/tasks/{slug}") try: tests = _extract_samples(html) except Exception: tests = [] timeout_ms, memory_mb, interactive, precision = _extract_problem_info(html) return { "tests": tests, "timeout_ms": timeout_ms, "memory_mb": memory_mb, "interactive": interactive, "precision": precision, } def _to_problem_summaries(rows: list[dict[str, str]]) -> list[ProblemSummary]: out: list[ProblemSummary] = [] for r in rows: letter = (r.get("letter") or "").strip().upper() title = r.get("title") or "" if not letter: continue pid = letter.lower() out.append(ProblemSummary(id=pid, name=title)) return out async def _fetch_upcoming_contests_async( client: httpx.AsyncClient, ) -> list[ContestSummary]: try: html = await _get_async(client, f"{BASE_URL}/contests/") return _parse_archive_contests(html) except Exception: return [] async def _fetch_all_contests_async() -> list[ContestSummary]: async with httpx.AsyncClient( limits=httpx.Limits(max_connections=100, max_keepalive_connections=100), ) as client: upcoming = await _fetch_upcoming_contests_async(client) first_html = await _get_async(client, ARCHIVE_URL) last = _parse_last_page(first_html) out = _parse_archive_contests(first_html) if last <= 1: seen = {c.id for c in out} for c in upcoming: if c.id not in seen: out.append(c) return out tasks = [ asyncio.create_task(_get_async(client, f"{ARCHIVE_URL}?page={p}")) for p in range(2, last + 1) ] for coro in asyncio.as_completed(tasks): html = await coro out.extend(_parse_archive_contests(html)) seen = {c.id for c in out} for c in upcoming: if c.id not in seen: out.append(c) return out class AtcoderScraper(BaseScraper): @property def platform_name(self) -> str: return "atcoder" async def scrape_contest_metadata(self, contest_id: str) -> MetadataResult: try: rows = await asyncio.to_thread(_scrape_tasks_sync, contest_id) problems = _to_problem_summaries(rows) if not problems: return self._metadata_error( f"No problems found for contest {contest_id}" ) return MetadataResult( success=True, error="", contest_id=contest_id, problems=problems, url=f"https://atcoder.jp/contests/{contest_id}/tasks/{contest_id}_%s", ) except Exception as e: return self._metadata_error(str(e)) async def scrape_contest_list(self) -> ContestListResult: try: contests = await _fetch_all_contests_async() if not contests: return self._contests_error("No contests found") return ContestListResult(success=True, error="", contests=contests) except Exception as e: return self._contests_error(str(e)) async def stream_tests_for_category_async(self, category_id: str) -> None: rows = await asyncio.to_thread(_scrape_tasks_sync, category_id) async def emit(row: dict[str, str]) -> None: letter = (row.get("letter") or "").strip().lower() slug = row.get("slug") or "" if not letter or not slug: return data = await asyncio.to_thread(_scrape_problem_page_sync, category_id, slug) tests: list[TestCase] = data.get("tests", []) combined_input = "\n".join(t.input for t in tests) if tests else "" combined_expected = "\n".join(t.expected for t in tests) if tests else "" print( json.dumps( { "problem_id": letter, "combined": { "input": combined_input, "expected": combined_expected, }, "tests": [ {"input": t.input, "expected": t.expected} for t in tests ], "timeout_ms": data.get("timeout_ms", 0), "memory_mb": data.get("memory_mb", 0), "interactive": bool(data.get("interactive")), "multi_test": False, "precision": data.get("precision"), } ), flush=True, ) await asyncio.gather(*(emit(r) for r in rows)) async def submit( self, contest_id: str, problem_id: str, source_code: str, language_id: str, credentials: dict[str, str], ) -> SubmitResult: def _submit_sync() -> SubmitResult: try: from camoufox.sync_api import Camoufox except ImportError: return SubmitResult( success=False, error="camoufox is required for AtCoder login. Install it: uv add 'camoufox[geoip]'", ) from curl_cffi import requests as curl_requests try: with Camoufox(headless=True) as browser: page = browser.new_page() page.goto(f"{BASE_URL}/login", wait_until="domcontentloaded") page.wait_for_load_state("networkidle") page.fill('input[name="username"]', credentials.get("username", "")) page.fill('input[name="password"]', credentials.get("password", "")) page.click("#submit") page.wait_for_url(lambda url: "/login" not in url, timeout=30000) cookies = page.context.cookies() session = curl_requests.Session(impersonate="chrome") for cookie in cookies: session.cookies.set( cookie["name"], cookie["value"], domain=cookie.get("domain", ""), ) submit_page = session.get( f"{BASE_URL}/contests/{contest_id}/submit", timeout=TIMEOUT_SECONDS, ) submit_page.raise_for_status() soup = BeautifulSoup(submit_page.text, "html.parser") csrf_input = soup.find("input", {"name": "csrf_token"}) if not csrf_input or not hasattr(csrf_input, "get"): return SubmitResult( success=False, error="Could not find CSRF token on submit page" ) csrf_token = csrf_input.get("value", "") or "" # type: ignore[union-attr] task_screen_name = f"{contest_id}_{problem_id}" submit_resp = session.post( f"{BASE_URL}/contests/{contest_id}/submit", data={ "data.TaskScreenName": task_screen_name, "data.LanguageId": language_id, "sourceCode": source_code, "csrf_token": csrf_token, }, timeout=TIMEOUT_SECONDS, allow_redirects=False, ) if submit_resp.status_code in (301, 302): location = submit_resp.headers.get("Location", "") if "/submissions/me" in location: return SubmitResult( success=True, error="", submission_id="", verdict="submitted", ) return SubmitResult( success=False, error=f"Submit may have failed: redirected to {location}", ) submit_resp.raise_for_status() return SubmitResult( success=False, error="Unexpected response from submit (expected redirect)", ) except Exception as e: return SubmitResult(success=False, error=str(e)) return await asyncio.to_thread(_submit_sync) async def main_async() -> int: if len(sys.argv) < 2: result = MetadataResult( success=False, error="Usage: atcoder.py metadata OR atcoder.py tests OR atcoder.py contests", url="", ) print(result.model_dump_json()) return 1 mode: str = sys.argv[1] scraper = AtcoderScraper() if mode == "metadata": if len(sys.argv) != 3: result = MetadataResult( success=False, error="Usage: atcoder.py metadata ", url="", ) print(result.model_dump_json()) return 1 contest_id = sys.argv[2] result = await scraper.scrape_contest_metadata(contest_id) print(result.model_dump_json()) return 0 if result.success else 1 if mode == "tests": if len(sys.argv) != 3: tests_result = TestsResult( success=False, error="Usage: atcoder.py tests ", problem_id="", combined=CombinedTest(input="", expected=""), tests=[], timeout_ms=0, memory_mb=0, ) print(tests_result.model_dump_json()) return 1 contest_id = sys.argv[2] await scraper.stream_tests_for_category_async(contest_id) return 0 if mode == "contests": if len(sys.argv) != 2: contest_result = ContestListResult( success=False, error="Usage: atcoder.py contests" ) print(contest_result.model_dump_json()) return 1 contest_result = await scraper.scrape_contest_list() print(contest_result.model_dump_json()) return 0 if contest_result.success else 1 if mode == "submit": if len(sys.argv) != 5: print( SubmitResult( success=False, error="Usage: atcoder.py submit ", ).model_dump_json() ) return 1 source_code = sys.stdin.read() creds_raw = os.environ.get("CP_CREDENTIALS", "{}") try: credentials = json.loads(creds_raw) except json.JSONDecodeError: credentials = {} language_id = get_language_id("atcoder", sys.argv[4]) or sys.argv[4] submit_result = await scraper.submit( sys.argv[2], sys.argv[3], source_code, language_id, credentials ) print(submit_result.model_dump_json()) return 0 if submit_result.success else 1 result = MetadataResult( success=False, error="Unknown mode. Use 'metadata ', 'tests ', 'contests', or 'submit '", url="", ) print(result.model_dump_json()) return 1 def main() -> None: sys.exit(asyncio.run(main_async())) if __name__ == "__main__": main()