From f48acb4672cd894183e4ae6e8a5f97bd3a2726ee Mon Sep 17 00:00:00 2001 From: Barrett Ruth Date: Fri, 3 Oct 2025 21:06:20 -0400 Subject: [PATCH] fix(scrapers/codeforces): scrape time --- scrapers/base.py | 5 - scrapers/codeforces.py | 305 +++++++++++++++++++++++++++++++++++++++++ scrapers/cses.py | 32 ----- 3 files changed, 305 insertions(+), 37 deletions(-) create mode 100644 scrapers/codeforces.py diff --git a/scrapers/base.py b/scrapers/base.py index 398ab6c..7cd3714 100644 --- a/scrapers/base.py +++ b/scrapers/base.py @@ -25,11 +25,6 @@ class BaseScraper(ABC): @abstractmethod async def scrape_contest_metadata(self, contest_id: str) -> MetadataResult: ... - @abstractmethod - async def scrape_problem_tests( - self, contest_id: str, problem_id: str - ) -> TestsResult: ... - @abstractmethod async def scrape_contest_list(self) -> ContestListResult: ... diff --git a/scrapers/codeforces.py b/scrapers/codeforces.py new file mode 100644 index 0000000..93b8afa --- /dev/null +++ b/scrapers/codeforces.py @@ -0,0 +1,305 @@ +#!/usr/bin/env python3 + +import asyncio +import json +import re +import sys +from dataclasses import asdict +from typing import Any + +import requests +from bs4 import BeautifulSoup, Tag +from scrapling.fetchers import StealthyFetcher + +from .base import BaseScraper +from .models import ( + ContestListResult, + ContestSummary, + MetadataResult, + ProblemSummary, + TestCase, + TestsResult, +) + +BASE_URL = "https://codeforces.com" +API_CONTEST_LIST_URL = f"{BASE_URL}/api/contest.list" +TIMEOUT_SECONDS = 30 +HEADERS = { + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" +} + + +def _text_from_pre(pre: Tag) -> str: + return ( + pre.get_text(separator="\n", strip=False) + .replace("\r", "") + .replace("\xa0", " ") + .rstrip("\n") + ) + + +def _extract_limits(block: Tag) -> tuple[int, float]: + tdiv = block.find("div", class_="time-limit") + mdiv = block.find("div", class_="memory-limit") + timeout_ms = 0 + memory_mb = 0.0 + if tdiv: + ttxt = tdiv.get_text(" ", strip=True) + ts = re.search(r"(\d+)\s*seconds?", ttxt) + if ts: + timeout_ms = int(ts.group(1)) * 1000 + if mdiv: + mtxt = mdiv.get_text(" ", strip=True) + ms = re.search(r"(\d+)\s*megabytes?", mtxt) + if ms: + memory_mb = float(ms.group(1)) + return timeout_ms, memory_mb + + +def _extract_title(block: Tag) -> tuple[str, str]: + t = block.find("div", class_="title") + if not t: + return "", "" + s = t.get_text(" ", strip=True) + parts = s.split(".", 1) + if len(parts) != 2: + return "", s.strip() + return parts[0].strip().upper(), parts[1].strip() + + +def _extract_samples(block: Tag) -> list[TestCase]: + st = block.find("div", class_="sample-test") + if not st: + return [] + + inputs = [ + _text_from_pre(pre) + for inp in st.find_all("div", class_="input") # type: ignore[union-attr] + for pre in [inp.find("pre")] + if isinstance(pre, Tag) + ] + outputs = [ + _text_from_pre(pre) + for out in st.find_all("div", class_="output") # type: ignore[union-attr] + for pre in [out.find("pre")] + if isinstance(pre, Tag) + ] + + n = min(len(inputs), len(outputs)) + return [TestCase(input=inputs[i], expected=outputs[i]) for i in range(n)] + + +def _is_interactive(block: Tag) -> bool: + ps = block.find("div", class_="problem-statement") + txt = ps.get_text(" ", strip=True) if ps else block.get_text(" ", strip=True) + return "This is an interactive problem" in txt + + +def _fetch_problems_html(contest_id: str) -> str: + url = f"{BASE_URL}/contest/{contest_id}/problems" + page = StealthyFetcher.fetch( + url, + headless=True, + solve_cloudflare=True, + ) + return page.html_content + + +def _parse_all_blocks(html: str) -> list[dict[str, Any]]: + soup = BeautifulSoup(html, "html.parser") + blocks = soup.find_all("div", class_="problem-statement") + out: list[dict[str, Any]] = [] + for b in blocks: + letter, name = _extract_title(b) + if not letter: + continue + tests = _extract_samples(b) + timeout_ms, memory_mb = _extract_limits(b) + interactive = _is_interactive(b) + out.append( + { + "letter": letter, + "name": name, + "tests": tests, + "timeout_ms": timeout_ms, + "memory_mb": memory_mb, + "interactive": interactive, + } + ) + return out + + +def _scrape_contest_problems_sync(contest_id: str) -> list[ProblemSummary]: + html = _fetch_problems_html(contest_id) + blocks = _parse_all_blocks(html) + problems: list[ProblemSummary] = [] + seen: set[str] = set() + for b in blocks: + pid = b["letter"].upper() + if pid in seen: + continue + seen.add(pid) + problems.append(ProblemSummary(id=pid.lower(), name=b["name"])) + return problems + + +def _scrape_contests_sync() -> list[ContestSummary]: + r = requests.get(API_CONTEST_LIST_URL, headers=HEADERS, timeout=TIMEOUT_SECONDS) + r.raise_for_status() + data = r.json() + if data.get("status") != "OK": + return [] + out: list[ContestSummary] = [] + for c in data["result"]: + cid = str(c["id"]) + name = c["name"] + out.append(ContestSummary(id=cid, name=name, display_name=name)) + return out + + +class CodeforcesScraper(BaseScraper): + @property + def platform_name(self) -> str: + return "codeforces" + + async def scrape_contest_metadata(self, contest_id: str) -> MetadataResult: + async def impl(cid: str) -> MetadataResult: + problems = await asyncio.to_thread(_scrape_contest_problems_sync, cid) + if not problems: + return self._create_metadata_error( + f"No problems found for contest {cid}", cid + ) + return MetadataResult( + success=True, error="", contest_id=cid, problems=problems + ) + + return await self._safe_execute("metadata", impl, contest_id) + + async def scrape_contest_list(self) -> ContestListResult: + async def impl() -> ContestListResult: + try: + r = requests.get(API_CONTEST_LIST_URL, timeout=TIMEOUT_SECONDS) + r.raise_for_status() + data = r.json() + if data.get("status") != "OK": + return self._create_contests_error("Invalid API response") + + contests: list[ContestSummary] = [] + for c in data["result"]: + if c.get("phase") == "FINISHED": # only FINISHED contests + cid = str(c["id"]) + name = c["name"] + contests.append( + ContestSummary(id=cid, name=name, display_name=name) + ) + + if not contests: + return self._create_contests_error("No contests found") + + return ContestListResult(success=True, error="", contests=contests) + except Exception as e: + return self._create_contests_error(str(e)) + + return await self._safe_execute("contests", impl) + + async def stream_tests_for_category_async(self, category_id: str) -> None: + html = await asyncio.to_thread(_fetch_problems_html, category_id) + blocks = await asyncio.to_thread(_parse_all_blocks, html) + + for b in blocks: + pid = f"{category_id}{b['letter'].lower()}" + tests: list[TestCase] = b["tests"] + if not tests: + print( + json.dumps( + { + "problem_id": pid, + "error": f"{self.platform_name}: no tests found", + } + ), + flush=True, + ) + continue + + print( + json.dumps( + { + "problem_id": pid, + "tests": [ + {"input": t.input, "expected": t.expected} for t in tests + ], + "timeout_ms": b["timeout_ms"], + "memory_mb": b["memory_mb"], + "interactive": bool(b["interactive"]), + } + ), + flush=True, + ) + + +async def main_async() -> int: + if len(sys.argv) < 2: + result = MetadataResult( + success=False, + error="Usage: codeforces.py metadata OR codeforces.py tests OR codeforces.py contests", + ) + print(json.dumps(asdict(result))) + return 1 + + mode: str = sys.argv[1] + scraper = CodeforcesScraper() + + if mode == "metadata": + if len(sys.argv) != 3: + result = MetadataResult( + success=False, error="Usage: codeforces.py metadata " + ) + print(json.dumps(asdict(result))) + return 1 + contest_id = sys.argv[2] + result = await scraper.scrape_contest_metadata(contest_id) + print(json.dumps(asdict(result))) + return 0 if result.success else 1 + + if mode == "tests": + if len(sys.argv) != 3: + tests_result = TestsResult( + success=False, + error="Usage: codeforces.py tests ", + problem_id="", + url="", + tests=[], + timeout_ms=0, + memory_mb=0, + ) + print(json.dumps(asdict(tests_result))) + return 1 + contest_id = sys.argv[2] + await scraper.stream_tests_for_category_async(contest_id) + return 0 + + if mode == "contests": + if len(sys.argv) != 2: + contest_result = ContestListResult( + success=False, error="Usage: codeforces.py contests" + ) + print(json.dumps(asdict(contest_result))) + return 1 + contest_result = await scraper.scrape_contest_list() + print(json.dumps(asdict(contest_result))) + return 0 if contest_result.success else 1 + + result = MetadataResult( + success=False, + error="Unknown mode. Use 'metadata ', 'tests ', or 'contests'", + ) + print(json.dumps(asdict(result))) + return 1 + + +def main() -> None: + sys.exit(asyncio.run(main_async())) + + +if __name__ == "__main__": + main() diff --git a/scrapers/cses.py b/scrapers/cses.py index 9535762..73c5964 100644 --- a/scrapers/cses.py +++ b/scrapers/cses.py @@ -29,10 +29,6 @@ TIMEOUT_S = 15.0 CONNECTIONS = 8 -def _run(coro): - return asyncio.run(coro) - - def normalize_category_name(category_name: str) -> str: return category_name.lower().replace(" ", "_").replace("&", "and") @@ -198,34 +194,6 @@ class CSESScraper(BaseScraper): success=True, error="", contest_id=contest_id, problems=problems ) - async def scrape_problem_tests( - self, contest_id: str, problem_id: str - ) -> TestsResult: - path = task_path(problem_id) - async with httpx.AsyncClient() as client: - html = await fetch_text(client, path) - tests = parse_tests(html) - timeout_ms, memory_mb = parse_limits(html) - if not tests: - return TestsResult( - success=False, - error=f"{self.platform_name}: No tests found for {problem_id}", - problem_id=problem_id if problem_id.isdigit() else "", - url=BASE_URL + path, - tests=[], - timeout_ms=timeout_ms, - memory_mb=memory_mb, - ) - return TestsResult( - success=True, - error="", - problem_id=problem_id if problem_id.isdigit() else "", - url=BASE_URL + path, - tests=tests, - timeout_ms=timeout_ms, - memory_mb=memory_mb, - ) - async def scrape_contest_list(self) -> ContestListResult: async with httpx.AsyncClient() as client: html = await fetch_text(client, INDEX_PATH)