diff --git a/scrapers/kattis.py b/scrapers/kattis.py
index 24bfb45..d1675bf 100644
--- a/scrapers/kattis.py
+++ b/scrapers/kattis.py
@@ -5,6 +5,7 @@ import io
import json
import re
import zipfile
+from datetime import datetime
import httpx
@@ -33,7 +34,6 @@ MEM_RE = re.compile(
r"Memory limit\s*]*>\s*(\d+)\s*MB\s* ",
re.DOTALL,
)
-LAST_PAGE_RE = re.compile(r"\bpage=(\d+)")
async def _fetch_text(client: httpx.AsyncClient, url: str) -> str:
@@ -94,24 +94,110 @@ def _is_interactive(html: str) -> bool:
return "This is an interactive problem" in html
-def _parse_problem_rows(html: str) -> list[tuple[str, str]]:
+def _parse_contests_page(html: str) -> list[ContestSummary]:
+ results: list[ContestSummary] = []
seen: set[str] = set()
- out: list[tuple[str, str]] = []
- for m in re.finditer(
- r'
\s*([^<]+) ',
- html,
- ):
- pid = m.group(1)
- name = m.group(2).strip()
- if pid not in seen:
- seen.add(pid)
- out.append((pid, name))
- return out
+ for row_m in re.finditer(r" ]*>(.*?) ", html, re.DOTALL):
+ row = row_m.group(1)
+ link_m = re.search(r'href="/contests/([a-z0-9]+)"[^>]*>([^<]+)', row)
+ if not link_m:
+ continue
+ cid = link_m.group(1)
+ name = link_m.group(2).strip()
+ if cid in seen:
+ continue
+ seen.add(cid)
+ start_time: int | None = None
+ ts_m = re.search(r'data-timestamp="(\d+)"', row)
+ if ts_m:
+ start_time = int(ts_m.group(1))
+ else:
+ time_m = re.search(r']+datetime="([^"]+)"', row)
+ if time_m:
+ try:
+ dt = datetime.fromisoformat(time_m.group(1).replace("Z", "+00:00"))
+ start_time = int(dt.timestamp())
+ except Exception:
+ pass
+ results.append(ContestSummary(id=cid, name=name, start_time=start_time))
+ return results
-def _parse_last_page(html: str) -> int:
- nums = [int(m.group(1)) for m in LAST_PAGE_RE.finditer(html)]
- return max(nums) if nums else 0
+def _parse_contest_problem_list(html: str) -> list[tuple[str, str]]:
+ if "The problems will become available when the contest starts" in html:
+ return []
+ results: list[tuple[str, str]] = []
+ seen: set[str] = set()
+ for row_m in re.finditer(r"]*>(.*?) ", html, re.DOTALL):
+ row = row_m.group(1)
+ link_m = re.search(
+ r'href="/contests/[^/]+/problems/([^"]+)"[^>]*>([^<]+)', row
+ )
+ if not link_m:
+ continue
+ slug = link_m.group(1)
+ name = link_m.group(2).strip()
+ if slug in seen:
+ continue
+ seen.add(slug)
+ label_m = re.search(r"]*>\s*([A-Z])\s* ", row)
+ label = label_m.group(1) if label_m else ""
+ display = f"{label} - {name}" if label else name
+ results.append((slug, display))
+ return results
+
+
+async def _fetch_contest_slugs(
+ client: httpx.AsyncClient, contest_id: str
+) -> list[tuple[str, str]]:
+ try:
+ html = await _fetch_text(client, f"{BASE_URL}/contests/{contest_id}/problems")
+ return _parse_contest_problem_list(html)
+ except httpx.HTTPStatusError:
+ return []
+ except Exception:
+ return []
+
+
+async def _stream_single_problem(client: httpx.AsyncClient, slug: str) -> None:
+ try:
+ html = await _fetch_text(client, f"{BASE_URL}/problems/{slug}")
+ except Exception:
+ return
+
+ timeout_ms, memory_mb = _parse_limits(html)
+ interactive = _is_interactive(html)
+
+ tests: list[TestCase] = []
+ try:
+ zip_data = await _fetch_bytes(
+ client,
+ f"{BASE_URL}/problems/{slug}/file/statement/samples.zip",
+ )
+ tests = _parse_samples_zip(zip_data)
+ except Exception:
+ tests = _parse_samples_html(html)
+
+ combined_input = "\n".join(t.input for t in tests) if tests else ""
+ combined_expected = "\n".join(t.expected for t in tests) if tests else ""
+
+ print(
+ json.dumps(
+ {
+ "problem_id": slug,
+ "combined": {
+ "input": combined_input,
+ "expected": combined_expected,
+ },
+ "tests": [{"input": t.input, "expected": t.expected} for t in tests],
+ "timeout_ms": timeout_ms,
+ "memory_mb": memory_mb,
+ "interactive": interactive,
+ "multi_test": False,
+ }
+ ),
+ flush=True,
+ )
class KattisScraper(BaseScraper):
@@ -122,57 +208,46 @@ class KattisScraper(BaseScraper):
async def scrape_contest_metadata(self, contest_id: str) -> MetadataResult:
try:
async with httpx.AsyncClient() as client:
- html = await _fetch_text(client, f"{BASE_URL}/problems/{contest_id}")
- timeout_ms, memory_mb = _parse_limits(html)
- title_m = re.search(r"([^<]+) ", html)
- name = (
- title_m.group(1).split("\u2013")[0].strip() if title_m else contest_id
- )
- return MetadataResult(
- success=True,
- error="",
- contest_id=contest_id,
- problems=[ProblemSummary(id=contest_id, name=name)],
- url=f"{BASE_URL}/problems/%s",
- )
+ slugs = await _fetch_contest_slugs(client, contest_id)
+ if slugs:
+ return MetadataResult(
+ success=True,
+ error="",
+ contest_id=contest_id,
+ problems=[
+ ProblemSummary(id=slug, name=name) for slug, name in slugs
+ ],
+ url=f"{BASE_URL}/problems/%s",
+ )
+ try:
+ html = await _fetch_text(
+ client, f"{BASE_URL}/problems/{contest_id}"
+ )
+ except Exception as e:
+ return self._metadata_error(str(e))
+ title_m = re.search(r"([^<]+) ", html)
+ name = (
+ title_m.group(1).split("\u2013")[0].strip()
+ if title_m
+ else contest_id
+ )
+ return MetadataResult(
+ success=True,
+ error="",
+ contest_id=contest_id,
+ problems=[ProblemSummary(id=contest_id, name=name)],
+ url=f"{BASE_URL}/problems/%s",
+ )
except Exception as e:
return self._metadata_error(str(e))
async def scrape_contest_list(self) -> ContestListResult:
try:
- async with httpx.AsyncClient(
- limits=httpx.Limits(max_connections=CONNECTIONS)
- ) as client:
- first_html = await _fetch_text(
- client, f"{BASE_URL}/problems?page=0&order=problem_difficulty"
- )
- last = _parse_last_page(first_html)
- rows = _parse_problem_rows(first_html)
-
- sem = asyncio.Semaphore(CONNECTIONS)
-
- async def fetch_page(page: int) -> list[tuple[str, str]]:
- async with sem:
- html = await _fetch_text(
- client,
- f"{BASE_URL}/problems?page={page}&order=problem_difficulty",
- )
- return _parse_problem_rows(html)
-
- tasks = [fetch_page(p) for p in range(1, last + 1)]
- for coro in asyncio.as_completed(tasks):
- rows.extend(await coro)
-
- seen: set[str] = set()
- contests: list[ContestSummary] = []
- for pid, name in rows:
- if pid not in seen:
- seen.add(pid)
- contests.append(
- ContestSummary(id=pid, name=name, display_name=name)
- )
+ async with httpx.AsyncClient() as client:
+ html = await _fetch_text(client, f"{BASE_URL}/contests")
+ contests = _parse_contests_page(html)
if not contests:
- return self._contests_error("No problems found")
+ return self._contests_error("No contests found")
return ContestListResult(success=True, error="", contests=contests)
except Exception as e:
return self._contests_error(str(e))
@@ -181,46 +256,18 @@ class KattisScraper(BaseScraper):
async with httpx.AsyncClient(
limits=httpx.Limits(max_connections=CONNECTIONS)
) as client:
- try:
- html = await _fetch_text(client, f"{BASE_URL}/problems/{category_id}")
- except Exception:
+ slugs = await _fetch_contest_slugs(client, category_id)
+ if slugs:
+ sem = asyncio.Semaphore(CONNECTIONS)
+
+ async def emit_one(slug: str, _name: str) -> None:
+ async with sem:
+ await _stream_single_problem(client, slug)
+
+ await asyncio.gather(*(emit_one(s, n) for s, n in slugs))
return
- timeout_ms, memory_mb = _parse_limits(html)
- interactive = _is_interactive(html)
-
- tests: list[TestCase] = []
- try:
- zip_data = await _fetch_bytes(
- client,
- f"{BASE_URL}/problems/{category_id}/file/statement/samples.zip",
- )
- tests = _parse_samples_zip(zip_data)
- except Exception:
- tests = _parse_samples_html(html)
-
- combined_input = "\n".join(t.input for t in tests) if tests else ""
- combined_expected = "\n".join(t.expected for t in tests) if tests else ""
-
- print(
- json.dumps(
- {
- "problem_id": category_id,
- "combined": {
- "input": combined_input,
- "expected": combined_expected,
- },
- "tests": [
- {"input": t.input, "expected": t.expected} for t in tests
- ],
- "timeout_ms": timeout_ms,
- "memory_mb": memory_mb,
- "interactive": interactive,
- "multi_test": False,
- }
- ),
- flush=True,
- )
+ await _stream_single_problem(client, category_id)
async def submit(
self,