#!/usr/bin/env python3
import asyncio
import io
import json
import re
import zipfile
from datetime import datetime
from pathlib import Path
import httpx
from .base import BaseScraper
from .timeouts import HTTP_TIMEOUT
from .models import (
ContestListResult,
ContestSummary,
LoginResult,
MetadataResult,
ProblemSummary,
SubmitResult,
TestCase,
)
BASE_URL = "https://open.kattis.com"
HEADERS = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
CONNECTIONS = 8
_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "kattis-cookies.json"
TIME_RE = re.compile(
r"CPU Time limit\s*]*>\s*(\d+)\s*seconds?\s*",
re.DOTALL,
)
MEM_RE = re.compile(
r"Memory limit\s*]*>\s*(\d+)\s*MB\s*",
re.DOTALL,
)
async def _fetch_text(client: httpx.AsyncClient, url: str) -> str:
r = await client.get(url, headers=HEADERS, timeout=HTTP_TIMEOUT)
r.raise_for_status()
return r.text
async def _fetch_bytes(client: httpx.AsyncClient, url: str) -> bytes:
r = await client.get(url, headers=HEADERS, timeout=HTTP_TIMEOUT)
r.raise_for_status()
return r.content
def _parse_limits(html: str) -> tuple[int, int]:
tm = TIME_RE.search(html)
mm = MEM_RE.search(html)
timeout_ms = int(tm.group(1)) * 1000 if tm else 1000
memory_mb = int(mm.group(1)) if mm else 1024
return timeout_ms, memory_mb
def _parse_samples_html(html: str) -> list[TestCase]:
tests: list[TestCase] = []
tables = re.finditer(r'
', html, re.DOTALL)
for table_match in tables:
table_html = table_match.group(0)
pres = re.findall(r"(.*?)
", table_html, re.DOTALL)
if len(pres) >= 2:
inp = pres[0].strip()
out = pres[1].strip()
tests.append(TestCase(input=inp, expected=out))
return tests
def _parse_samples_zip(data: bytes) -> list[TestCase]:
try:
zf = zipfile.ZipFile(io.BytesIO(data))
except zipfile.BadZipFile:
return []
inputs: dict[str, str] = {}
outputs: dict[str, str] = {}
for name in zf.namelist():
content = zf.read(name).decode("utf-8").strip()
if name.endswith(".in"):
key = name[: -len(".in")]
inputs[key] = content
elif name.endswith(".ans"):
key = name[: -len(".ans")]
outputs[key] = content
tests: list[TestCase] = []
for key in sorted(set(inputs) & set(outputs)):
tests.append(TestCase(input=inputs[key], expected=outputs[key]))
return tests
def _is_interactive(html: str) -> bool:
return "This is an interactive problem" in html
def _parse_contests_page(html: str) -> list[ContestSummary]:
results: list[ContestSummary] = []
seen: set[str] = set()
for row_m in re.finditer(r"]*>(.*?)
", html, re.DOTALL):
row = row_m.group(1)
link_m = re.search(r'href="/contests/([a-z0-9]+)"[^>]*>([^<]+)', row)
if not link_m:
continue
cid = link_m.group(1)
name = link_m.group(2).strip()
if cid in seen:
continue
seen.add(cid)
start_time: int | None = None
ts_m = re.search(r'data-timestamp="(\d+)"', row)
if ts_m:
start_time = int(ts_m.group(1))
else:
time_m = re.search(r'