feat(scrapers/atcoder): atcoder scraper

This commit is contained in:
Barrett Ruth 2025-10-03 23:26:09 -04:00
parent 179b333505
commit f929c8e826
3 changed files with 330 additions and 362 deletions

View file

@ -5,6 +5,7 @@ description = "Add your description here"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"backoff>=2.2.1",
"beautifulsoup4>=4.13.5",
"curl-cffi>=0.13.0",
"httpx>=0.28.1",

View file

@ -1,14 +1,19 @@
#!/usr/bin/env python3
import concurrent.futures
import asyncio
import json
import re
import sys
import time
from dataclasses import asdict
from typing import Any
import backoff
import httpx
import requests
from bs4 import BeautifulSoup, Tag
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from .base import BaseScraper
from .models import (
@ -20,398 +25,352 @@ from .models import (
TestsResult,
)
MIB_TO_MB = 1.048576
BASE_URL = "https://atcoder.jp"
ARCHIVE_URL = f"{BASE_URL}/contests/archive"
TIMEOUT_SECONDS = 30
HEADERS = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
}
RETRY_STATUS = {429, 502, 503, 504}
FATAL_STATUS = {400, 401, 403, 404, 410}
def _make_request(url: str, timeout: int = 10) -> requests.Response:
headers = {
"User-Agent": (
"Mozilla/5.0 (X11; Linux x86_64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
}
_session = requests.Session()
_adapter = HTTPAdapter(
pool_connections=100,
pool_maxsize=100,
max_retries=Retry(total=0),
)
_session.mount("https://", _adapter)
_session.mount("http://", _adapter)
@backoff.on_exception(
backoff.expo,
(requests.exceptions.RequestException, requests.exceptions.HTTPError),
max_tries=5,
jitter=backoff.random_jitter,
on_backoff=lambda details: print(
f"Request error on {url} (attempt {details['tries']}), "
f"retrying in {details['wait']:.1f}s: {details['exception']}",
file=sys.stderr,
),
def _give_up_requests(exc: Exception) -> bool:
if isinstance(exc, requests.HTTPError) and exc.response is not None:
return exc.response.status_code in FATAL_STATUS
return False
def _retry_after_requests(details):
exc = details.get("exception")
if isinstance(exc, requests.HTTPError) and exc.response is not None:
ra = exc.response.headers.get("Retry-After")
if ra:
try:
time.sleep(max(0.0, float(ra)))
except ValueError:
pass
@backoff.on_exception(
backoff.expo,
(requests.ConnectionError, requests.Timeout, requests.HTTPError),
max_tries=5,
jitter=backoff.full_jitter,
giveup=_give_up_requests,
on_backoff=_retry_after_requests,
)
def _fetch(url: str) -> str:
r = _session.get(url, headers=HEADERS, timeout=TIMEOUT_SECONDS)
if r.status_code in RETRY_STATUS:
raise requests.HTTPError(response=r)
r.raise_for_status()
return r.text
def _giveup_httpx(exc: Exception) -> bool:
return (
isinstance(exc, httpx.HTTPStatusError)
and exc.response is not None
and (exc.response.status_code in FATAL_STATUS)
)
@backoff.on_predicate(
backoff.expo,
lambda resp: resp.status_code == 429,
max_tries=5,
jitter=backoff.random_jitter,
on_backoff=lambda details: print(
f"Rate limited on {url}, retrying in {details['wait']:.1f}s",
file=sys.stderr,
),
@backoff.on_exception(
backoff.expo,
(httpx.ConnectError, httpx.ReadTimeout, httpx.HTTPStatusError),
max_tries=5,
jitter=backoff.full_jitter,
giveup=_giveup_httpx,
)
async def _get_async(client: httpx.AsyncClient, url: str) -> str:
r = await client.get(url, headers=HEADERS, timeout=TIMEOUT_SECONDS)
r.raise_for_status()
return r.text
def _text_from_pre(pre: Tag) -> str:
return (
pre.get_text(separator="\n", strip=False)
.replace("\r", "")
.replace("\xa0", " ")
.rstrip("\n")
)
def _req():
return requests.get(url, headers=headers, timeout=timeout)
resp = _req()
resp.raise_for_status()
return resp
def extract_problem_limits(soup: BeautifulSoup) -> tuple[int, float]:
timeout_ms = None
memory_mb = None
def _parse_last_page(html: str) -> int:
soup = BeautifulSoup(html, "html.parser")
nav = soup.select_one("ul.pagination")
if not nav:
return 1
nums = []
for a in nav.select("a"):
s = a.get_text(strip=True)
if s.isdigit():
nums.append(int(s))
return max(nums) if nums else 1
paragraphs = soup.find_all("p")
for p in paragraphs:
text = p.get_text()
if "Time Limit:" in text and "Memory Limit:" in text:
time_match = re.search(r"Time Limit:\s*(\d+)\s*sec", text)
if time_match:
seconds = int(time_match.group(1))
timeout_ms = seconds * 1000
memory_match = re.search(r"Memory Limit:\s*(\d+)\s*MiB", text)
if memory_match:
memory_mib = int(memory_match.group(1))
memory_mb = round(memory_mib * 1.048576, 2)
break
def _parse_archive_contests(html: str) -> list[ContestSummary]:
soup = BeautifulSoup(html, "html.parser")
tbody = soup.select_one("table.table-default tbody") or soup.select_one("tbody")
if not tbody:
return []
out: list[ContestSummary] = []
for tr in tbody.select("tr"):
a = tr.select_one("a[href^='/contests/']")
if not a:
continue
href_attr = a.get("href")
if not isinstance(href_attr, str):
continue
m = re.search(r"/contests/([^/?#]+)", href_attr)
if not m:
continue
cid = m.group(1)
name = a.get_text(strip=True)
out.append(ContestSummary(id=cid, name=name, display_name=name))
return out
if timeout_ms is None:
raise ValueError("Could not find valid timeout in problem constraints")
if memory_mb is None:
raise ValueError("Could not find valid memory limit in problem constraints")
def _parse_tasks_list(html: str) -> list[dict[str, str]]:
soup = BeautifulSoup(html, "html.parser")
tbody = soup.select_one("table tbody")
if not tbody:
return []
rows: list[dict[str, str]] = []
for tr in tbody.select("tr"):
tds = tr.select("td")
if len(tds) < 2:
continue
letter = tds[0].get_text(strip=True)
a = tds[1].select_one("a[href*='/tasks/']")
if not a:
continue
href_attr = a.get("href")
if not isinstance(href_attr, str):
continue
m = re.search(r"/contests/[^/]+/tasks/([^/?#]+)", href_attr)
if not m:
continue
slug = m.group(1)
title = a.get_text(strip=True)
rows.append({"letter": letter, "title": title, "slug": slug})
return rows
def _extract_limits(html: str) -> tuple[int, float]:
soup = BeautifulSoup(html, "html.parser")
txt = soup.get_text(" ", strip=True)
timeout_ms = 0
memory_mb = 0.0
ts = re.search(r"Time\s*Limit:\s*([\d.]+)\s*sec", txt, flags=re.I)
if ts:
timeout_ms = int(float(ts.group(1)) * 1000)
ms = re.search(r"Memory\s*Limit:\s*(\d+)\s*MiB", txt, flags=re.I)
if ms:
memory_mb = float(ms.group(1)) * MIB_TO_MB
return timeout_ms, memory_mb
def parse_problem_url(contest_id: str, problem_letter: str) -> str:
task_id: str = f"{contest_id}_{problem_letter}"
return f"https://atcoder.jp/contests/{contest_id}/tasks/{task_id}"
def _extract_samples(html: str) -> list[TestCase]:
soup = BeautifulSoup(html, "html.parser")
root = soup.select_one("#task-statement") or soup
inputs: dict[str, str] = {}
outputs: dict[str, str] = {}
for h in root.find_all(re.compile(r"h[2-4]")):
title = h.get_text(" ", strip=True)
pre = h.find_next("pre")
if not pre:
continue
t = _text_from_pre(pre)
mi = re.search(r"Sample\s*Input\s*(\d+)", title, flags=re.I)
mo = re.search(r"Sample\s*Output\s*(\d+)", title, flags=re.I)
if mi:
inputs[mi.group(1)] = t
elif mo:
outputs[mo.group(1)] = t
cases: list[TestCase] = []
for k in sorted(set(inputs) & set(outputs), key=lambda s: int(s)):
cases.append(TestCase(input=inputs[k], expected=outputs[k]))
return cases
def extract_problem_from_row(row, contest_id: str) -> ProblemSummary | None:
cells = row.find_all("td")
if len(cells) < 2:
return None
task_link = cells[1].find("a")
if not task_link:
return None
task_name = task_link.get_text(strip=True)
task_href = task_link.get("href", "")
if not task_href:
return None
task_id = task_href.split("/")[-1]
if not task_id.startswith(contest_id + "_"):
return None
problem_letter = task_id[len(contest_id) + 1 :]
if not problem_letter or not task_name:
return None
return ProblemSummary(id=problem_letter.lower(), name=task_name)
def _scrape_tasks_sync(contest_id: str) -> list[dict[str, str]]:
html = _fetch(f"{BASE_URL}/contests/{contest_id}/tasks")
return _parse_tasks_list(html)
def scrape_contest_problems(contest_id: str) -> list[ProblemSummary]:
try:
contest_url = f"https://atcoder.jp/contests/{contest_id}/tasks"
response = _make_request(contest_url)
soup = BeautifulSoup(response.text, "html.parser")
task_table = soup.find("table", class_="table")
if not task_table or not isinstance(task_table, Tag):
return []
rows = task_table.find_all("tr")[1:]
problems: list[ProblemSummary] = []
for row in rows:
problem = extract_problem_from_row(row, contest_id)
if problem:
problems.append(problem)
return problems
except Exception as e:
print(f"Failed to scrape AtCoder contest problems: {e}", file=sys.stderr)
return []
def _scrape_problem_page_sync(contest_id: str, slug: str) -> dict[str, Any]:
html = _fetch(f"{BASE_URL}/contests/{contest_id}/tasks/{slug}")
tests = _extract_samples(html)
timeout_ms, memory_mb = _extract_limits(html)
return {
"tests": tests,
"timeout_ms": timeout_ms,
"memory_mb": memory_mb,
"interactive": False,
}
def extract_test_case_from_headers(sample_headers, i: int) -> tuple[str, str] | None:
if i >= len(sample_headers):
return None
header = sample_headers[i]
if "input" not in header.get_text().lower():
return None
input_pre = header.find_next("pre")
if not input_pre or i + 1 >= len(sample_headers):
return None
next_header = sample_headers[i + 1]
if "output" not in next_header.get_text().lower():
return None
output_pre = next_header.find_next("pre")
if not output_pre:
return None
input_text = input_pre.get_text().strip().replace("\r", "")
output_text = output_pre.get_text().strip().replace("\r", "")
if not input_text or not output_text:
return None
return (input_text, output_text)
def _to_problem_summaries(rows: list[dict[str, str]]) -> list[ProblemSummary]:
out: list[ProblemSummary] = []
seen: set[str] = set()
for r in rows:
letter = (r.get("letter") or "").strip().upper()
title = r.get("title") or ""
if not letter:
continue
pid = letter.lower()
if pid in seen:
continue
seen.add(pid)
out.append(ProblemSummary(id=pid, name=title))
return out
def scrape(url: str) -> list[TestCase]:
try:
response = _make_request(url)
soup = BeautifulSoup(response.text, "html.parser")
sample_headers = soup.find_all(
"h3", string=lambda x: x and "sample" in x.lower() if x else False
)
tests: list[TestCase] = []
i = 0
while i < len(sample_headers):
test_case = extract_test_case_from_headers(sample_headers, i)
if test_case:
input_text, output_text = test_case
tests.append(TestCase(input=input_text, expected=output_text))
i += 2
else:
i += 1
return tests
except Exception as e:
print(f"Error scraping AtCoder: {e}", file=sys.stderr)
return []
async def _fetch_all_contests_async() -> list[ContestSummary]:
async with httpx.AsyncClient(
limits=httpx.Limits(max_connections=100, max_keepalive_connections=100)
) as client:
first_html = await _get_async(client, ARCHIVE_URL)
last = _parse_last_page(first_html)
out = _parse_archive_contests(first_html)
if last <= 1:
return out
tasks = [
asyncio.create_task(_get_async(client, f"{ARCHIVE_URL}?page={p}"))
for p in range(2, last + 1)
]
for coro in asyncio.as_completed(tasks):
html = await coro
out.extend(_parse_archive_contests(html))
return out
def scrape_contests() -> list[ContestSummary]:
def get_max_pages() -> int:
try:
response = _make_request("https://atcoder.jp/contests/archive")
soup = BeautifulSoup(response.text, "html.parser")
pagination = soup.find("ul", class_="pagination")
if not pagination or not isinstance(pagination, Tag):
return 15
lis = pagination.find_all("li")
if lis and isinstance(lis[-1], Tag):
last_li_text = lis[-1].get_text().strip()
try:
return int(last_li_text)
except ValueError:
return 15
return 15
except Exception:
return 15
def scrape_page(page: int) -> list[ContestSummary]:
try:
response = _make_request(f"https://atcoder.jp/contests/archive?page={page}")
except Exception:
return []
soup = BeautifulSoup(response.text, "html.parser")
table = soup.find("table", class_="table")
if not table:
return []
tbody = table.find("tbody")
if not tbody or not isinstance(tbody, Tag):
return []
rows = tbody.find_all("tr")
if not rows:
return []
contests = []
for row in rows:
cells = row.find_all("td")
if len(cells) < 2:
continue
contest_cell = cells[1]
link = contest_cell.find("a")
if not link or not link.get("href"):
continue
href = link.get("href")
contest_id = href.split("/")[-1]
name = link.get_text().strip()
try:
name = name.encode().decode("unicode_escape")
except (UnicodeDecodeError, UnicodeEncodeError):
pass
name = (
name.replace("\uff08", "(")
.replace("\uff09", ")")
.replace("\u3000", " ")
)
name = re.sub(
r"[\uff01-\uff5e]", lambda m: chr(ord(m.group()) - 0xFEE0), name
)
if not (
contest_id.startswith("ahc") or name.lower().find("heuristic") != -1
):
contests.append(
ContestSummary(id=contest_id, name=name, display_name=name)
)
return contests
max_pages = get_max_pages()
page_results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_page = {
executor.submit(scrape_page, page): page for page in range(1, max_pages + 1)
}
for future in concurrent.futures.as_completed(future_to_page):
page = future_to_page[future]
page_contests = future.result()
page_results[page] = page_contests
all_contests = []
for page in sorted(page_results.keys()):
all_contests.extend(page_results[page])
return all_contests
class AtCoderScraper(BaseScraper):
class AtcoderScraper(BaseScraper):
@property
def platform_name(self) -> str:
return "atcoder"
def scrape_contest_metadata(self, contest_id: str) -> MetadataResult:
return self._safe_execute("metadata", self._scrape_metadata_impl, contest_id)
def scrape_problem_tests(self, contest_id: str, problem_id: str) -> TestsResult:
return self._safe_execute(
"tests", self._scrape_tests_impl, contest_id, problem_id
)
def scrape_contest_list(self) -> ContestListResult:
return self._safe_execute("contests", self._scrape_contests_impl)
def _safe_execute(self, operation: str, func, *args):
try:
return func(*args)
except Exception as e:
error_msg = f"{self.platform_name}: {str(e)}"
if operation == "metadata":
return MetadataResult(success=False, error=error_msg)
elif operation == "tests":
return TestsResult(
success=False,
error=error_msg,
problem_id="",
url="",
tests=[],
timeout_ms=0,
memory_mb=0,
async def scrape_contest_metadata(self, contest_id: str) -> MetadataResult:
async def impl(cid: str) -> MetadataResult:
rows = await asyncio.to_thread(_scrape_tasks_sync, cid)
problems = _to_problem_summaries(rows)
if not problems:
return self._create_metadata_error(
f"No problems found for contest {cid}", cid
)
elif operation == "contests":
return ContestListResult(success=False, error=error_msg)
def _scrape_metadata_impl(self, contest_id: str) -> MetadataResult:
problems = scrape_contest_problems(contest_id)
if not problems:
return MetadataResult(
success=False,
error=f"{self.platform_name}: No problems found for contest {contest_id}",
)
return MetadataResult(
success=True, error="", contest_id=contest_id, problems=problems
)
def _scrape_tests_impl(self, contest_id: str, problem_id: str) -> TestsResult:
problem_letter = problem_id.upper()
url = parse_problem_url(contest_id, problem_letter)
tests = scrape(url)
response = _make_request(url)
soup = BeautifulSoup(response.text, "html.parser")
timeout_ms, memory_mb = extract_problem_limits(soup)
if not tests:
return TestsResult(
success=False,
error=f"{self.platform_name}: No tests found for {contest_id} {problem_letter}",
problem_id=f"{contest_id}_{problem_id.lower()}",
url=url,
tests=[],
timeout_ms=timeout_ms,
memory_mb=memory_mb,
success=True, error="", contest_id=cid, problems=problems
)
return TestsResult(
success=True,
error="",
problem_id=f"{contest_id}_{problem_id.lower()}",
url=url,
tests=tests,
timeout_ms=timeout_ms,
memory_mb=memory_mb,
)
return await self._safe_execute("metadata", impl, contest_id)
def _scrape_contests_impl(self) -> ContestListResult:
contests = scrape_contests()
if not contests:
return ContestListResult(
success=False, error=f"{self.platform_name}: No contests found"
)
return ContestListResult(success=True, error="", contests=contests)
async def scrape_contest_list(self) -> ContestListResult:
async def impl() -> ContestListResult:
try:
contests = await _fetch_all_contests_async()
except Exception as e:
return self._create_contests_error(str(e))
if not contests:
return self._create_contests_error("No contests found")
return ContestListResult(success=True, error="", contests=contests)
return await self._safe_execute("contests", impl)
async def stream_tests_for_category_async(self, category_id: str) -> None:
rows = await asyncio.to_thread(_scrape_tasks_sync, category_id)
async def emit(row: dict[str, str]) -> None:
letter = (row.get("letter") or "").strip().lower()
slug = row.get("slug") or ""
if not letter or not slug:
return
try:
data = await asyncio.to_thread(
_scrape_problem_page_sync, category_id, slug
)
tests: list[TestCase] = data["tests"]
if not tests:
print(
json.dumps(
{
"problem_id": letter,
"error": f"{self.platform_name}: no tests found",
}
),
flush=True,
)
return
print(
json.dumps(
{
"problem_id": letter,
"tests": [
{"input": t.input, "expected": t.expected}
for t in tests
],
"timeout_ms": data["timeout_ms"],
"memory_mb": data["memory_mb"],
"interactive": bool(data["interactive"]),
}
),
flush=True,
)
except Exception as e:
print(
json.dumps(
{
"problem_id": letter,
"error": f"{self.platform_name}: {str(e)}",
}
),
flush=True,
)
await asyncio.gather(*(emit(r) for r in rows))
def main() -> None:
async def main_async() -> int:
if len(sys.argv) < 2:
result = MetadataResult(
success=False,
error="Usage: atcoder.py metadata <contest_id> OR atcoder.py tests <contest_id> <problem_letter> OR atcoder.py contests",
error="Usage: atcoder.py metadata <contest_id> OR atcoder.py tests <contest_id> OR atcoder.py contests",
)
print(json.dumps(asdict(result)))
sys.exit(1)
return 1
mode: str = sys.argv[1]
scraper = AtCoderScraper()
scraper = AtcoderScraper()
if mode == "metadata":
if len(sys.argv) != 3:
result = MetadataResult(
success=False,
error="Usage: atcoder.py metadata <contest_id>",
success=False, error="Usage: atcoder.py metadata <contest_id>"
)
print(json.dumps(asdict(result)))
sys.exit(1)
contest_id: str = sys.argv[2]
result = scraper.scrape_contest_metadata(contest_id)
return 1
contest_id = sys.argv[2]
result = await scraper.scrape_contest_metadata(contest_id)
print(json.dumps(asdict(result)))
if not result.success:
sys.exit(1)
return 0 if result.success else 1
elif mode == "tests":
if len(sys.argv) != 4:
if mode == "tests":
if len(sys.argv) != 3:
tests_result = TestsResult(
success=False,
error="Usage: atcoder.py tests <contest_id> <problem_letter>",
error="Usage: atcoder.py tests <contest_id>",
problem_id="",
url="",
tests=[],
@ -419,35 +378,32 @@ def main() -> None:
memory_mb=0,
)
print(json.dumps(asdict(tests_result)))
sys.exit(1)
return 1
contest_id = sys.argv[2]
await scraper.stream_tests_for_category_async(contest_id)
return 0
test_contest_id: str = sys.argv[2]
problem_letter: str = sys.argv[3]
tests_result = scraper.scrape_problem_tests(test_contest_id, problem_letter)
print(json.dumps(asdict(tests_result)))
if not tests_result.success:
sys.exit(1)
elif mode == "contests":
if mode == "contests":
if len(sys.argv) != 2:
contest_result = ContestListResult(
success=False, error="Usage: atcoder.py contests"
)
print(json.dumps(asdict(contest_result)))
sys.exit(1)
contest_result = scraper.scrape_contest_list()
return 1
contest_result = await scraper.scrape_contest_list()
print(json.dumps(asdict(contest_result)))
if not contest_result.success:
sys.exit(1)
return 0 if contest_result.success else 1
else:
result = MetadataResult(
success=False,
error=f"Unknown mode: {mode}. Use 'metadata', 'tests', or 'contests'",
)
print(json.dumps(asdict(result)))
sys.exit(1)
result = MetadataResult(
success=False,
error="Unknown mode. Use 'metadata <contest_id>', 'tests <contest_id>', or 'contests'",
)
print(json.dumps(asdict(result)))
return 1
def main() -> None:
sys.exit(asyncio.run(main_async()))
if __name__ == "__main__":

11
uv.lock generated
View file

@ -115,6 +115,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/77/06/bb80f5f86020c4551da315d78b3ab75e8228f89f0162f2c3a819e407941a/attrs-25.3.0-py3-none-any.whl", hash = "sha256:427318ce031701fea540783410126f03899a97ffc6f61596ad581ac2e40e3bc3", size = 63815, upload-time = "2025-03-13T11:10:21.14Z" },
]
[[package]]
name = "backoff"
version = "2.2.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/47/d7/5bbeb12c44d7c4f2fb5b56abce497eb5ed9f34d85701de869acedd602619/backoff-2.2.1.tar.gz", hash = "sha256:03f829f5bb1923180821643f8753b0502c3b682293992485b0eef2807afa5cba", size = 17001, upload-time = "2022-10-05T19:19:32.061Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/df/73/b6e24bd22e6720ca8ee9a85a0c4a2971af8497d8f3193fa05390cbd46e09/backoff-2.2.1-py3-none-any.whl", hash = "sha256:63579f9a0628e06278f7e47b7d7d5b6ce20dc65c5e96a6f3ca99a6adca0396e8", size = 15148, upload-time = "2022-10-05T19:19:30.546Z" },
]
[[package]]
name = "basedpyright"
version = "1.31.6"
@ -1446,6 +1455,7 @@ name = "scrapers"
version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "backoff" },
{ name = "beautifulsoup4" },
{ name = "curl-cffi" },
{ name = "httpx" },
@ -1467,6 +1477,7 @@ dev = [
[package.metadata]
requires-dist = [
{ name = "backoff", specifier = ">=2.2.1" },
{ name = "beautifulsoup4", specifier = ">=4.13.5" },
{ name = "curl-cffi", specifier = ">=0.13.0" },
{ name = "httpx", specifier = ">=0.28.1" },