feat(codechef): finalize codechef impl

This commit is contained in:
Barrett Ruth 2025-10-25 01:41:55 -04:00
parent f78e43bdd4
commit e89c2e1cf5
5 changed files with 106 additions and 56 deletions

View file

@ -31,7 +31,9 @@ HEADERS = {
TIMEOUT_S = 15.0
CONNECTIONS = 8
MEMORY_LIMIT_RE = re.compile(r"Memory\s+[Ll]imit[:\s]+([0-9.]+)\s*MB", re.IGNORECASE)
MEMORY_LIMIT_RE = re.compile(
r"Memory\s+[Ll]imit.*?([0-9.]+)\s*(MB|GB)", re.IGNORECASE | re.DOTALL
)
async def fetch_json(client: httpx.AsyncClient, path: str) -> dict:
@ -42,7 +44,13 @@ async def fetch_json(client: httpx.AsyncClient, path: str) -> dict:
def _extract_memory_limit(html: str) -> float:
m = MEMORY_LIMIT_RE.search(html)
return float(m.group(1)) if m else 256.0
if not m:
return 256.0
value = float(m.group(1))
unit = m.group(2).upper()
if unit == "GB":
return value * 1024.0
return value
def _fetch_html_sync(url: str) -> str:
@ -50,20 +58,17 @@ def _fetch_html_sync(url: str) -> str:
return str(response.body)
def get_div4_contest_id(contest_id: str) -> str:
return f"{contest_id}D"
class CodeChefScraper(BaseScraper):
@property
def platform_name(self) -> str:
return "codechef"
async def scrape_contest_metadata(self, contest_id: str) -> MetadataResult:
div4_id = get_div4_contest_id(contest_id)
async with httpx.AsyncClient() as client:
try:
data = await fetch_json(client, API_CONTEST.format(contest_id=div4_id))
data = await fetch_json(
client, API_CONTEST.format(contest_id=contest_id)
)
except httpx.HTTPStatusError as e:
return self._create_metadata_error(
f"Failed to fetch contest {contest_id}: {e}", contest_id
@ -76,12 +81,13 @@ class CodeChefScraper(BaseScraper):
problems = []
for problem_code, problem_data in data["problems"].items():
problems.append(
ProblemSummary(
id=problem_code,
name=problem_data.get("name", problem_code),
if problem_data.get("category_name") == "main":
problems.append(
ProblemSummary(
id=problem_code,
name=problem_data.get("name", problem_code),
)
)
)
return MetadataResult(
success=True,
@ -98,56 +104,87 @@ class CodeChefScraper(BaseScraper):
except httpx.HTTPStatusError as e:
return self._create_contests_error(f"Failed to fetch contests: {e}")
all_contests = data.get("future_contests", []) + data.get("past_contests", [])
max_num = 0
contest_names = {}
for contest in all_contests:
contest_code = contest.get("contest_code", "")
if contest_code.startswith("START"):
match = re.match(r"START(\d+)", contest_code)
if match:
num = int(match.group(1))
max_num = max(max_num, num)
contest_names[contest_code] = contest.get(
"contest_name", contest_code
)
if max_num == 0:
return self._create_contests_error("No Starters contests found")
contests = []
for i in range(1, max_num + 1):
contest_id = f"START{i}"
name = contest_names.get(contest_id, f"Starters {i}")
contests.append(
ContestSummary(
id=contest_id,
name=name,
display_name=name,
)
all_contests = data.get("future_contests", []) + data.get(
"past_contests", []
)
max_num = 0
for contest in all_contests:
contest_code = contest.get("contest_code", "")
if contest_code.startswith("START"):
match = re.match(r"START(\d+)", contest_code)
if match:
num = int(match.group(1))
max_num = max(max_num, num)
if max_num == 0:
return self._create_contests_error("No Starters contests found")
contests = []
sem = asyncio.Semaphore(CONNECTIONS)
async def fetch_divisions(i: int) -> list[ContestSummary]:
parent_id = f"START{i}"
async with sem:
try:
parent_data = await fetch_json(
client, API_CONTEST.format(contest_id=parent_id)
)
except Exception as e:
import sys
print(f"Error fetching {parent_id}: {e}", file=sys.stderr)
return []
child_contests = parent_data.get("child_contests", {})
if not child_contests:
return []
base_name = f"Starters {i}"
divisions = []
for div_key, div_data in child_contests.items():
div_code = div_data.get("contest_code", "")
div_num = div_data.get("div", {}).get("div_number", "")
if div_code and div_num:
divisions.append(
ContestSummary(
id=div_code,
name=base_name,
display_name=f"{base_name} (Div. {div_num})",
)
)
return divisions
tasks = [fetch_divisions(i) for i in range(1, max_num + 1)]
for coro in asyncio.as_completed(tasks):
divisions = await coro
contests.extend(divisions)
return ContestListResult(success=True, error="", contests=contests)
async def stream_tests_for_category_async(self, contest_id: str) -> None:
div4_id = get_div4_contest_id(contest_id)
async with httpx.AsyncClient(
limits=httpx.Limits(max_connections=CONNECTIONS)
) as client:
try:
contest_data = await fetch_json(
client, API_CONTEST.format(contest_id=div4_id)
client, API_CONTEST.format(contest_id=contest_id)
)
except Exception:
return
problems = contest_data.get("problems", {})
if not problems:
all_problems = contest_data.get("problems", {})
if not all_problems:
return
problems = {
code: data
for code, data in all_problems.items()
if data.get("category_name") == "main"
}
sem = asyncio.Semaphore(CONNECTIONS)
async def run_one(problem_code: str) -> dict[str, Any]:
@ -156,7 +193,7 @@ class CodeChefScraper(BaseScraper):
problem_data = await fetch_json(
client,
API_PROBLEM.format(
contest_id=div4_id, problem_id=problem_code
contest_id=contest_id, problem_id=problem_code
),
)