feat(scrapers): refactor

This commit is contained in:
Barrett Ruth 2026-01-27 14:44:08 -05:00
parent 7dafb7ea43
commit 5293515aca
3 changed files with 83 additions and 221 deletions

View file

@ -1,9 +1,8 @@
import asyncio
import sys
from abc import ABC, abstractmethod
from typing import Any, Awaitable, Callable, ParamSpec, cast
from .models import ContestListResult, MetadataResult, TestsResult
P = ParamSpec("P")
from .models import CombinedTest, ContestListResult, MetadataResult, TestsResult
class BaseScraper(ABC):
@ -20,57 +19,65 @@ class BaseScraper(ABC):
@abstractmethod
async def stream_tests_for_category_async(self, category_id: str) -> None: ...
def _create_metadata_error(
self, error_msg: str, contest_id: str = ""
) -> MetadataResult:
return MetadataResult(
success=False,
error=f"{self.platform_name}: {error_msg}",
contest_id=contest_id,
problems=[],
url="",
)
def _usage(self) -> str:
name = self.platform_name
return f"Usage: {name}.py metadata <id> | tests <id> | contests"
def _create_tests_error(
self, error_msg: str, problem_id: str = "", url: str = ""
) -> TestsResult:
from .models import CombinedTest
def _metadata_error(self, msg: str) -> MetadataResult:
return MetadataResult(success=False, error=msg, url="")
def _tests_error(self, msg: str) -> TestsResult:
return TestsResult(
success=False,
error=f"{self.platform_name}: {error_msg}",
problem_id=problem_id,
error=msg,
problem_id="",
combined=CombinedTest(input="", expected=""),
tests=[],
timeout_ms=0,
memory_mb=0,
interactive=False,
)
def _create_contests_error(self, error_msg: str) -> ContestListResult:
return ContestListResult(
success=False,
error=f"{self.platform_name}: {error_msg}",
contests=[],
)
def _contests_error(self, msg: str) -> ContestListResult:
return ContestListResult(success=False, error=msg)
async def _safe_execute(
self,
operation: str,
func: Callable[P, Awaitable[Any]],
*args: P.args,
**kwargs: P.kwargs,
):
try:
return await func(*args, **kwargs)
except Exception as e:
if operation == "metadata":
contest_id = cast(str, args[0]) if args else ""
return self._create_metadata_error(str(e), contest_id)
elif operation == "tests":
problem_id = cast(str, args[1]) if len(args) > 1 else ""
return self._create_tests_error(str(e), problem_id)
elif operation == "contests":
return self._create_contests_error(str(e))
else:
raise
async def _run_cli_async(self, args: list[str]) -> int:
if len(args) < 2:
print(self._metadata_error(self._usage()).model_dump_json())
return 1
mode = args[1]
match mode:
case "metadata":
if len(args) != 3:
print(self._metadata_error(self._usage()).model_dump_json())
return 1
result = await self.scrape_contest_metadata(args[2])
print(result.model_dump_json())
return 0 if result.success else 1
case "tests":
if len(args) != 3:
print(self._tests_error(self._usage()).model_dump_json())
return 1
await self.stream_tests_for_category_async(args[2])
return 0
case "contests":
if len(args) != 2:
print(self._contests_error(self._usage()).model_dump_json())
return 1
result = await self.scrape_contest_list()
print(result.model_dump_json())
return 0 if result.success else 1
case _:
print(
self._metadata_error(
f"Unknown mode: {mode}. {self._usage()}"
).model_dump_json()
)
return 1
def run_cli(self) -> None:
sys.exit(asyncio.run(self._run_cli_async(sys.argv)))