Merge pull request #215 from barrettruth/fix/scraper-refactor

refactor scrapers
This commit is contained in:
Barrett Ruth 2026-01-27 15:06:05 -06:00 committed by GitHub
commit dcadf7447d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 140 additions and 355 deletions

View file

@ -266,43 +266,31 @@ class AtcoderScraper(BaseScraper):
return "atcoder" return "atcoder"
async def scrape_contest_metadata(self, contest_id: str) -> MetadataResult: async def scrape_contest_metadata(self, contest_id: str) -> MetadataResult:
async def impl(cid: str) -> MetadataResult:
try: try:
rows = await asyncio.to_thread(_scrape_tasks_sync, cid) rows = await asyncio.to_thread(_scrape_tasks_sync, contest_id)
except requests.HTTPError as e:
if e.response is not None and e.response.status_code == 404:
return self._create_metadata_error(
f"No problems found for contest {cid}", cid
)
raise
problems = _to_problem_summaries(rows) problems = _to_problem_summaries(rows)
if not problems: if not problems:
return self._create_metadata_error( return self._metadata_error(
f"No problems found for contest {cid}", cid f"No problems found for contest {contest_id}"
) )
return MetadataResult( return MetadataResult(
success=True, success=True,
error="", error="",
contest_id=cid, contest_id=contest_id,
problems=problems, problems=problems,
url=f"https://atcoder.jp/contests/{contest_id}/tasks/{contest_id}_%s", url=f"https://atcoder.jp/contests/{contest_id}/tasks/{contest_id}_%s",
) )
except Exception as e:
return await self._safe_execute("metadata", impl, contest_id) return self._metadata_error(str(e))
async def scrape_contest_list(self) -> ContestListResult: async def scrape_contest_list(self) -> ContestListResult:
async def impl() -> ContestListResult:
try: try:
contests = await _fetch_all_contests_async() contests = await _fetch_all_contests_async()
except Exception as e:
return self._create_contests_error(str(e))
if not contests: if not contests:
return self._create_contests_error("No contests found") return self._contests_error("No contests found")
return ContestListResult(success=True, error="", contests=contests) return ContestListResult(success=True, error="", contests=contests)
except Exception as e:
return await self._safe_execute("contests", impl) return self._contests_error(str(e))
async def stream_tests_for_category_async(self, category_id: str) -> None: async def stream_tests_for_category_async(self, category_id: str) -> None:
rows = await asyncio.to_thread(_scrape_tasks_sync, category_id) rows = await asyncio.to_thread(_scrape_tasks_sync, category_id)

View file

@ -1,9 +1,8 @@
import asyncio
import sys
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Any, Awaitable, Callable, ParamSpec, cast
from .models import ContestListResult, MetadataResult, TestsResult from .models import CombinedTest, ContestListResult, MetadataResult, TestsResult
P = ParamSpec("P")
class BaseScraper(ABC): class BaseScraper(ABC):
@ -20,57 +19,65 @@ class BaseScraper(ABC):
@abstractmethod @abstractmethod
async def stream_tests_for_category_async(self, category_id: str) -> None: ... async def stream_tests_for_category_async(self, category_id: str) -> None: ...
def _create_metadata_error( def _usage(self) -> str:
self, error_msg: str, contest_id: str = "" name = self.platform_name
) -> MetadataResult: return f"Usage: {name}.py metadata <id> | tests <id> | contests"
return MetadataResult(
success=False,
error=f"{self.platform_name}: {error_msg}",
contest_id=contest_id,
problems=[],
url="",
)
def _create_tests_error( def _metadata_error(self, msg: str) -> MetadataResult:
self, error_msg: str, problem_id: str = "", url: str = "" return MetadataResult(success=False, error=msg, url="")
) -> TestsResult:
from .models import CombinedTest
def _tests_error(self, msg: str) -> TestsResult:
return TestsResult( return TestsResult(
success=False, success=False,
error=f"{self.platform_name}: {error_msg}", error=msg,
problem_id=problem_id, problem_id="",
combined=CombinedTest(input="", expected=""), combined=CombinedTest(input="", expected=""),
tests=[], tests=[],
timeout_ms=0, timeout_ms=0,
memory_mb=0, memory_mb=0,
interactive=False,
) )
def _create_contests_error(self, error_msg: str) -> ContestListResult: def _contests_error(self, msg: str) -> ContestListResult:
return ContestListResult( return ContestListResult(success=False, error=msg)
success=False,
error=f"{self.platform_name}: {error_msg}",
contests=[],
)
async def _safe_execute( async def _run_cli_async(self, args: list[str]) -> int:
self, if len(args) < 2:
operation: str, print(self._metadata_error(self._usage()).model_dump_json())
func: Callable[P, Awaitable[Any]], return 1
*args: P.args,
**kwargs: P.kwargs, mode = args[1]
):
try: match mode:
return await func(*args, **kwargs) case "metadata":
except Exception as e: if len(args) != 3:
if operation == "metadata": print(self._metadata_error(self._usage()).model_dump_json())
contest_id = cast(str, args[0]) if args else "" return 1
return self._create_metadata_error(str(e), contest_id) result = await self.scrape_contest_metadata(args[2])
elif operation == "tests": print(result.model_dump_json())
problem_id = cast(str, args[1]) if len(args) > 1 else "" return 0 if result.success else 1
return self._create_tests_error(str(e), problem_id)
elif operation == "contests": case "tests":
return self._create_contests_error(str(e)) if len(args) != 3:
else: print(self._tests_error(self._usage()).model_dump_json())
raise return 1
await self.stream_tests_for_category_async(args[2])
return 0
case "contests":
if len(args) != 2:
print(self._contests_error(self._usage()).model_dump_json())
return 1
result = await self.scrape_contest_list()
print(result.model_dump_json())
return 0 if result.success else 1
case _:
print(
self._metadata_error(
f"Unknown mode: {mode}. {self._usage()}"
).model_dump_json()
)
return 1
def run_cli(self) -> None:
sys.exit(asyncio.run(self._run_cli_async(sys.argv)))

View file

@ -1,8 +1,8 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import asyncio import asyncio
import json import json
import re import re
import sys
from typing import Any from typing import Any
import httpx import httpx
@ -10,13 +10,11 @@ from scrapling.fetchers import Fetcher
from .base import BaseScraper from .base import BaseScraper
from .models import ( from .models import (
CombinedTest,
ContestListResult, ContestListResult,
ContestSummary, ContestSummary,
MetadataResult, MetadataResult,
ProblemSummary, ProblemSummary,
TestCase, TestCase,
TestsResult,
) )
BASE_URL = "https://www.codechef.com" BASE_URL = "https://www.codechef.com"
@ -62,18 +60,14 @@ class CodeChefScraper(BaseScraper):
return "codechef" return "codechef"
async def scrape_contest_metadata(self, contest_id: str) -> MetadataResult: async def scrape_contest_metadata(self, contest_id: str) -> MetadataResult:
async with httpx.AsyncClient() as client:
try: try:
async with httpx.AsyncClient() as client:
data = await fetch_json( data = await fetch_json(
client, API_CONTEST.format(contest_id=contest_id) client, API_CONTEST.format(contest_id=contest_id)
) )
except httpx.HTTPStatusError as e:
return self._create_metadata_error(
f"Failed to fetch contest {contest_id}: {e}", contest_id
)
if not data.get("problems"): if not data.get("problems"):
return self._create_metadata_error( return self._metadata_error(
f"No problems found for contest {contest_id}", contest_id f"No problems found for contest {contest_id}"
) )
problems = [] problems = []
for problem_code, problem_data in data["problems"].items(): for problem_code, problem_data in data["problems"].items():
@ -91,13 +85,15 @@ class CodeChefScraper(BaseScraper):
problems=problems, problems=problems,
url=f"{BASE_URL}/{contest_id}", url=f"{BASE_URL}/{contest_id}",
) )
except Exception as e:
return self._metadata_error(f"Failed to fetch contest {contest_id}: {e}")
async def scrape_contest_list(self) -> ContestListResult: async def scrape_contest_list(self) -> ContestListResult:
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
try: try:
data = await fetch_json(client, API_CONTESTS_ALL) data = await fetch_json(client, API_CONTESTS_ALL)
except httpx.HTTPStatusError as e: except httpx.HTTPStatusError as e:
return self._create_contests_error(f"Failed to fetch contests: {e}") return self._contests_error(f"Failed to fetch contests: {e}")
all_contests = data.get("future_contests", []) + data.get( all_contests = data.get("future_contests", []) + data.get(
"past_contests", [] "past_contests", []
) )
@ -110,7 +106,7 @@ class CodeChefScraper(BaseScraper):
num = int(match.group(1)) num = int(match.group(1))
max_num = max(max_num, num) max_num = max(max_num, num)
if max_num == 0: if max_num == 0:
return self._create_contests_error("No Starters contests found") return self._contests_error("No Starters contests found")
contests = [] contests = []
sem = asyncio.Semaphore(CONNECTIONS) sem = asyncio.Semaphore(CONNECTIONS)
@ -252,68 +248,5 @@ class CodeChefScraper(BaseScraper):
print(json.dumps(payload), flush=True) print(json.dumps(payload), flush=True)
async def main_async() -> int:
if len(sys.argv) < 2:
result = MetadataResult(
success=False,
error="Usage: codechef.py metadata <contest_id> OR codechef.py tests <contest_id> OR codechef.py contests",
url="",
)
print(result.model_dump_json())
return 1
mode: str = sys.argv[1]
scraper = CodeChefScraper()
if mode == "metadata":
if len(sys.argv) != 3:
result = MetadataResult(
success=False,
error="Usage: codechef.py metadata <contest_id>",
url="",
)
print(result.model_dump_json())
return 1
contest_id = sys.argv[2]
result = await scraper.scrape_contest_metadata(contest_id)
print(result.model_dump_json())
return 0 if result.success else 1
if mode == "tests":
if len(sys.argv) != 3:
tests_result = TestsResult(
success=False,
error="Usage: codechef.py tests <contest_id>",
problem_id="",
combined=CombinedTest(input="", expected=""),
tests=[],
timeout_ms=0,
memory_mb=0,
)
print(tests_result.model_dump_json())
return 1
contest_id = sys.argv[2]
await scraper.stream_tests_for_category_async(contest_id)
return 0
if mode == "contests":
if len(sys.argv) != 2:
contest_result = ContestListResult(
success=False, error="Usage: codechef.py contests"
)
print(contest_result.model_dump_json())
return 1
contest_result = await scraper.scrape_contest_list()
print(contest_result.model_dump_json())
return 0 if contest_result.success else 1
result = MetadataResult(
success=False,
error=f"Unknown mode: {mode}. Use 'metadata <contest_id>', 'tests <contest_id>', or 'contests'",
url="",
)
print(result.model_dump_json())
return 1
def main() -> None:
sys.exit(asyncio.run(main_async()))
if __name__ == "__main__": if __name__ == "__main__":
main() CodeChefScraper().run_cli()

View file

@ -4,7 +4,6 @@ import asyncio
import json import json
import logging import logging
import re import re
import sys
from typing import Any from typing import Any
import requests import requests
@ -13,13 +12,11 @@ from scrapling.fetchers import Fetcher
from .base import BaseScraper from .base import BaseScraper
from .models import ( from .models import (
CombinedTest,
ContestListResult, ContestListResult,
ContestSummary, ContestSummary,
MetadataResult, MetadataResult,
ProblemSummary, ProblemSummary,
TestCase, TestCase,
TestsResult,
) )
# suppress scrapling logging - https://github.com/D4Vinci/Scrapling/issues/31) # suppress scrapling logging - https://github.com/D4Vinci/Scrapling/issues/31)
@ -89,14 +86,14 @@ def _extract_samples(block: Tag) -> tuple[list[TestCase], bool]:
if not st: if not st:
return [], False return [], False
input_pres: list[Tag] = [ # type: ignore[misc] input_pres: list[Tag] = [
inp.find("pre") # type: ignore[misc] inp.find("pre")
for inp in st.find_all("div", class_="input") # type: ignore[union-attr] for inp in st.find_all("div", class_="input")
if isinstance(inp, Tag) and inp.find("pre") if isinstance(inp, Tag) and inp.find("pre")
] ]
output_pres: list[Tag] = [ output_pres: list[Tag] = [
out.find("pre") # type: ignore[misc] out.find("pre")
for out in st.find_all("div", class_="output") # type: ignore[union-attr] for out in st.find_all("div", class_="output")
if isinstance(out, Tag) and out.find("pre") if isinstance(out, Tag) and out.find("pre")
] ]
input_pres = [p for p in input_pres if isinstance(p, Tag)] input_pres = [p for p in input_pres if isinstance(p, Tag)]
@ -209,30 +206,31 @@ class CodeforcesScraper(BaseScraper):
return "codeforces" return "codeforces"
async def scrape_contest_metadata(self, contest_id: str) -> MetadataResult: async def scrape_contest_metadata(self, contest_id: str) -> MetadataResult:
async def impl(cid: str) -> MetadataResult: try:
problems = await asyncio.to_thread(_scrape_contest_problems_sync, cid) problems = await asyncio.to_thread(
_scrape_contest_problems_sync, contest_id
)
if not problems: if not problems:
return self._create_metadata_error( return self._metadata_error(
f"No problems found for contest {cid}", cid f"No problems found for contest {contest_id}"
) )
return MetadataResult( return MetadataResult(
success=True, success=True,
error="", error="",
contest_id=cid, contest_id=contest_id,
problems=problems, problems=problems,
url=f"https://codeforces.com/contest/{contest_id}/problem/%s", url=f"https://codeforces.com/contest/{contest_id}/problem/%s",
) )
except Exception as e:
return await self._safe_execute("metadata", impl, contest_id) return self._metadata_error(str(e))
async def scrape_contest_list(self) -> ContestListResult: async def scrape_contest_list(self) -> ContestListResult:
async def impl() -> ContestListResult:
try: try:
r = requests.get(API_CONTEST_LIST_URL, timeout=TIMEOUT_SECONDS) r = requests.get(API_CONTEST_LIST_URL, timeout=TIMEOUT_SECONDS)
r.raise_for_status() r.raise_for_status()
data = r.json() data = r.json()
if data.get("status") != "OK": if data.get("status") != "OK":
return self._create_contests_error("Invalid API response") return self._contests_error("Invalid API response")
contests: list[ContestSummary] = [] contests: list[ContestSummary] = []
for c in data["result"]: for c in data["result"]:
@ -240,18 +238,14 @@ class CodeforcesScraper(BaseScraper):
continue continue
cid = str(c["id"]) cid = str(c["id"])
name = c["name"] name = c["name"]
contests.append( contests.append(ContestSummary(id=cid, name=name, display_name=name))
ContestSummary(id=cid, name=name, display_name=name)
)
if not contests: if not contests:
return self._create_contests_error("No contests found") return self._contests_error("No contests found")
return ContestListResult(success=True, error="", contests=contests) return ContestListResult(success=True, error="", contests=contests)
except Exception as e: except Exception as e:
return self._create_contests_error(str(e)) return self._contests_error(str(e))
return await self._safe_execute("contests", impl)
async def stream_tests_for_category_async(self, category_id: str) -> None: async def stream_tests_for_category_async(self, category_id: str) -> None:
html = await asyncio.to_thread(_fetch_problems_html, category_id) html = await asyncio.to_thread(_fetch_problems_html, category_id)
@ -281,73 +275,5 @@ class CodeforcesScraper(BaseScraper):
) )
async def main_async() -> int:
if len(sys.argv) < 2:
result = MetadataResult(
success=False,
error="Usage: codeforces.py metadata <contest_id> OR codeforces.py tests <contest_id> OR codeforces.py contests",
url="",
)
print(result.model_dump_json())
return 1
mode: str = sys.argv[1]
scraper = CodeforcesScraper()
if mode == "metadata":
if len(sys.argv) != 3:
result = MetadataResult(
success=False,
error="Usage: codeforces.py metadata <contest_id>",
url="",
)
print(result.model_dump_json())
return 1
contest_id = sys.argv[2]
result = await scraper.scrape_contest_metadata(contest_id)
print(result.model_dump_json())
return 0 if result.success else 1
if mode == "tests":
if len(sys.argv) != 3:
tests_result = TestsResult(
success=False,
error="Usage: codeforces.py tests <contest_id>",
problem_id="",
combined=CombinedTest(input="", expected=""),
tests=[],
timeout_ms=0,
memory_mb=0,
)
print(tests_result.model_dump_json())
return 1
contest_id = sys.argv[2]
await scraper.stream_tests_for_category_async(contest_id)
return 0
if mode == "contests":
if len(sys.argv) != 2:
contest_result = ContestListResult(
success=False, error="Usage: codeforces.py contests"
)
print(contest_result.model_dump_json())
return 1
contest_result = await scraper.scrape_contest_list()
print(contest_result.model_dump_json())
return 0 if contest_result.success else 1
result = MetadataResult(
success=False,
error="Unknown mode. Use 'metadata <contest_id>', 'tests <contest_id>', or 'contests'",
url="",
)
print(result.model_dump_json())
return 1
def main() -> None:
sys.exit(asyncio.run(main_async()))
if __name__ == "__main__": if __name__ == "__main__":
main() CodeforcesScraper().run_cli()

View file

@ -3,20 +3,17 @@
import asyncio import asyncio
import json import json
import re import re
import sys
from typing import Any from typing import Any
import httpx import httpx
from .base import BaseScraper from .base import BaseScraper
from .models import ( from .models import (
CombinedTest,
ContestListResult, ContestListResult,
ContestSummary, ContestSummary,
MetadataResult, MetadataResult,
ProblemSummary, ProblemSummary,
TestCase, TestCase,
TestsResult,
) )
BASE_URL = "https://cses.fi" BASE_URL = "https://cses.fi"
@ -261,73 +258,5 @@ class CSESScraper(BaseScraper):
print(json.dumps(payload), flush=True) print(json.dumps(payload), flush=True)
async def main_async() -> int:
if len(sys.argv) < 2:
result = MetadataResult(
success=False,
error="Usage: cses.py metadata <category_id> OR cses.py tests <category> OR cses.py contests",
url="",
)
print(result.model_dump_json())
return 1
mode: str = sys.argv[1]
scraper = CSESScraper()
if mode == "metadata":
if len(sys.argv) != 3:
result = MetadataResult(
success=False,
error="Usage: cses.py metadata <category_id>",
url="",
)
print(result.model_dump_json())
return 1
category_id = sys.argv[2]
result = await scraper.scrape_contest_metadata(category_id)
print(result.model_dump_json())
return 0 if result.success else 1
if mode == "tests":
if len(sys.argv) != 3:
tests_result = TestsResult(
success=False,
error="Usage: cses.py tests <category>",
problem_id="",
combined=CombinedTest(input="", expected=""),
tests=[],
timeout_ms=0,
memory_mb=0,
)
print(tests_result.model_dump_json())
return 1
category = sys.argv[2]
await scraper.stream_tests_for_category_async(category)
return 0
if mode == "contests":
if len(sys.argv) != 2:
contest_result = ContestListResult(
success=False, error="Usage: cses.py contests"
)
print(contest_result.model_dump_json())
return 1
contest_result = await scraper.scrape_contest_list()
print(contest_result.model_dump_json())
return 0 if contest_result.success else 1
result = MetadataResult(
success=False,
error=f"Unknown mode: {mode}. Use 'metadata <category>', 'tests <category>', or 'contests'",
url="",
)
print(result.model_dump_json())
return 1
def main() -> None:
sys.exit(asyncio.run(main_async()))
if __name__ == "__main__": if __name__ == "__main__":
main() CSESScraper().run_cli()

View file

@ -232,33 +232,35 @@ def run_scraper_offline(fixture_text):
case _: case _:
raise AssertionError(f"Unknown scraper: {scraper_name}") raise AssertionError(f"Unknown scraper: {scraper_name}")
scraper_classes = {
"cses": "CSESScraper",
"atcoder": "AtcoderScraper",
"codeforces": "CodeforcesScraper",
"codechef": "CodeChefScraper",
}
def _run(scraper_name: str, mode: str, *args: str): def _run(scraper_name: str, mode: str, *args: str):
mod_path = ROOT / "scrapers" / f"{scraper_name}.py" mod_path = ROOT / "scrapers" / f"{scraper_name}.py"
ns = _load_scraper_module(mod_path, scraper_name) ns = _load_scraper_module(mod_path, scraper_name)
offline_fetches = _make_offline_fetches(scraper_name) offline_fetches = _make_offline_fetches(scraper_name)
if scraper_name == "codeforces": if scraper_name == "codeforces":
fetchers.Fetcher.get = offline_fetches["Fetcher.get"] # type: ignore[assignment] fetchers.Fetcher.get = offline_fetches["Fetcher.get"]
requests.get = offline_fetches["requests.get"] requests.get = offline_fetches["requests.get"]
elif scraper_name == "atcoder": elif scraper_name == "atcoder":
ns._fetch = offline_fetches["_fetch"] ns._fetch = offline_fetches["_fetch"]
ns._get_async = offline_fetches["_get_async"] ns._get_async = offline_fetches["_get_async"]
elif scraper_name == "cses": elif scraper_name == "cses":
httpx.AsyncClient.get = offline_fetches["__offline_fetch_text"] # type: ignore[assignment] httpx.AsyncClient.get = offline_fetches["__offline_fetch_text"]
elif scraper_name == "codechef": elif scraper_name == "codechef":
httpx.AsyncClient.get = offline_fetches["__offline_get_async"] # type: ignore[assignment] httpx.AsyncClient.get = offline_fetches["__offline_get_async"]
fetchers.Fetcher.get = offline_fetches["Fetcher.get"] # type: ignore[assignment] fetchers.Fetcher.get = offline_fetches["Fetcher.get"]
main_async = getattr(ns, "main_async") scraper_class = getattr(ns, scraper_classes[scraper_name])
assert callable(main_async), f"main_async not found in {scraper_name}" scraper = scraper_class()
argv = [str(mod_path), mode, *args] argv = [str(mod_path), mode, *args]
old_argv = sys.argv rc, out = _capture_stdout(scraper._run_cli_async(argv))
sys.argv = argv
try:
rc, out = _capture_stdout(main_async())
finally:
sys.argv = old_argv
json_lines: list[Any] = [] json_lines: list[Any] = []
for line in (_line for _line in out.splitlines() if _line.strip()): for line in (_line for _line in out.splitlines() if _line.strip()):