## Problem Wrong credentials produced garbled error messages (`"login failed: Login failed: bad_credentials"`) and stale credentials remained cached after failure, causing silent re-use on the next invocation. ## Solution Standardize all scrapers to emit `"bad_credentials"` as a plain error code, mapped to a human-readable string via `LOGIN_ERRORS` in `constants.lua`. Fix `credentials.lua` to clear cached credentials on failure in both the fresh-prompt and re-prompt paths. For AtCoder and Codeforces, replace `wait_for_url` with `wait_for_function` to detect the login error element immediately rather than sitting the full 10s navigation timeout. Add "Remember me" checkbox on Codeforces login.
434 lines
15 KiB
Python
434 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import asyncio
|
|
import io
|
|
import json
|
|
import re
|
|
import zipfile
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
|
|
from .base import (
|
|
BaseScraper,
|
|
clear_platform_cookies,
|
|
extract_precision,
|
|
load_platform_cookies,
|
|
save_platform_cookies,
|
|
)
|
|
from .timeouts import HTTP_TIMEOUT
|
|
from .models import (
|
|
ContestListResult,
|
|
ContestSummary,
|
|
LoginResult,
|
|
MetadataResult,
|
|
ProblemSummary,
|
|
SubmitResult,
|
|
TestCase,
|
|
)
|
|
|
|
BASE_URL = "https://open.kattis.com"
|
|
HEADERS = {
|
|
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
|
}
|
|
CONNECTIONS = 8
|
|
|
|
TIME_RE = re.compile(
|
|
r"CPU Time limit</span>\s*<span[^>]*>\s*(\d+)\s*seconds?\s*</span>",
|
|
re.DOTALL,
|
|
)
|
|
MEM_RE = re.compile(
|
|
r"Memory limit</span>\s*<span[^>]*>\s*(\d+)\s*MB\s*</span>",
|
|
re.DOTALL,
|
|
)
|
|
|
|
|
|
async def _fetch_text(client: httpx.AsyncClient, url: str) -> str:
|
|
r = await client.get(url, headers=HEADERS, timeout=HTTP_TIMEOUT)
|
|
r.raise_for_status()
|
|
return r.text
|
|
|
|
|
|
async def _fetch_bytes(client: httpx.AsyncClient, url: str) -> bytes:
|
|
r = await client.get(url, headers=HEADERS, timeout=HTTP_TIMEOUT)
|
|
r.raise_for_status()
|
|
return r.content
|
|
|
|
|
|
def _parse_limits(html: str) -> tuple[int, int]:
|
|
tm = TIME_RE.search(html)
|
|
mm = MEM_RE.search(html)
|
|
timeout_ms = int(tm.group(1)) * 1000 if tm else 1000
|
|
memory_mb = int(mm.group(1)) if mm else 1024
|
|
return timeout_ms, memory_mb
|
|
|
|
|
|
def _parse_samples_html(html: str) -> list[TestCase]:
|
|
tests: list[TestCase] = []
|
|
tables = re.finditer(r'<table\s+class="sample"[^>]*>.*?</table>', html, re.DOTALL)
|
|
for table_match in tables:
|
|
table_html = table_match.group(0)
|
|
pres = re.findall(r"<pre>(.*?)</pre>", table_html, re.DOTALL)
|
|
if len(pres) >= 2:
|
|
inp = pres[0].strip()
|
|
out = pres[1].strip()
|
|
tests.append(TestCase(input=inp, expected=out))
|
|
return tests
|
|
|
|
|
|
def _parse_samples_zip(data: bytes) -> list[TestCase]:
|
|
try:
|
|
zf = zipfile.ZipFile(io.BytesIO(data))
|
|
except zipfile.BadZipFile:
|
|
return []
|
|
inputs: dict[str, str] = {}
|
|
outputs: dict[str, str] = {}
|
|
for name in zf.namelist():
|
|
content = zf.read(name).decode("utf-8").strip()
|
|
if name.endswith(".in"):
|
|
key = name[: -len(".in")]
|
|
inputs[key] = content
|
|
elif name.endswith(".ans"):
|
|
key = name[: -len(".ans")]
|
|
outputs[key] = content
|
|
tests: list[TestCase] = []
|
|
for key in sorted(set(inputs) & set(outputs)):
|
|
tests.append(TestCase(input=inputs[key], expected=outputs[key]))
|
|
return tests
|
|
|
|
|
|
def _is_interactive(html: str) -> bool:
|
|
return "This is an interactive problem" in html
|
|
|
|
|
|
def _parse_contests_page(html: str) -> list[ContestSummary]:
|
|
results: list[ContestSummary] = []
|
|
seen: set[str] = set()
|
|
for row_m in re.finditer(r"<tr[^>]*>(.*?)</tr>", html, re.DOTALL):
|
|
row = row_m.group(1)
|
|
link_m = re.search(r'href="/contests/([a-z0-9]+)"[^>]*>([^<]+)</a>', row)
|
|
if not link_m:
|
|
continue
|
|
cid = link_m.group(1)
|
|
name = link_m.group(2).strip()
|
|
if cid in seen:
|
|
continue
|
|
seen.add(cid)
|
|
start_time: int | None = None
|
|
ts_m = re.search(r'data-timestamp="(\d+)"', row)
|
|
if ts_m:
|
|
start_time = int(ts_m.group(1))
|
|
else:
|
|
time_m = re.search(r'<time[^>]+datetime="([^"]+)"', row)
|
|
if time_m:
|
|
try:
|
|
dt = datetime.fromisoformat(time_m.group(1).replace("Z", "+00:00"))
|
|
start_time = int(dt.timestamp())
|
|
except Exception:
|
|
pass
|
|
results.append(
|
|
ContestSummary(id=cid, name=name, display_name=name, start_time=start_time)
|
|
)
|
|
return results
|
|
|
|
|
|
def _parse_contest_problem_list(html: str) -> list[tuple[str, str]]:
|
|
if "The problems will become available when the contest starts" in html:
|
|
return []
|
|
results: list[tuple[str, str]] = []
|
|
seen: set[str] = set()
|
|
for row_m in re.finditer(r"<tr[^>]*>(.*?)</tr>", html, re.DOTALL):
|
|
row = row_m.group(1)
|
|
link_m = re.search(
|
|
r'href="/contests/[^/]+/problems/([^"]+)"[^>]*>([^<]+)</a>', row
|
|
)
|
|
if not link_m:
|
|
continue
|
|
slug = link_m.group(1)
|
|
name = link_m.group(2).strip()
|
|
if slug in seen:
|
|
continue
|
|
seen.add(slug)
|
|
label_m = re.search(r"<td[^>]*>\s*([A-Z])\s*</td>", row)
|
|
label = label_m.group(1) if label_m else ""
|
|
display = f"{label} - {name}" if label else name
|
|
results.append((slug, display))
|
|
return results
|
|
|
|
|
|
async def _fetch_contest_slugs(
|
|
client: httpx.AsyncClient, contest_id: str
|
|
) -> list[tuple[str, str]]:
|
|
try:
|
|
html = await _fetch_text(client, f"{BASE_URL}/contests/{contest_id}/problems")
|
|
return _parse_contest_problem_list(html)
|
|
except httpx.HTTPStatusError:
|
|
return []
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
async def _stream_single_problem(client: httpx.AsyncClient, slug: str) -> None:
|
|
try:
|
|
html = await _fetch_text(client, f"{BASE_URL}/problems/{slug}")
|
|
except Exception:
|
|
return
|
|
|
|
timeout_ms, memory_mb = _parse_limits(html)
|
|
interactive = _is_interactive(html)
|
|
precision = extract_precision(html)
|
|
|
|
tests: list[TestCase] = []
|
|
try:
|
|
zip_data = await _fetch_bytes(
|
|
client,
|
|
f"{BASE_URL}/problems/{slug}/file/statement/samples.zip",
|
|
)
|
|
tests = _parse_samples_zip(zip_data)
|
|
except Exception:
|
|
tests = _parse_samples_html(html)
|
|
|
|
combined_input = "\n".join(t.input for t in tests) if tests else ""
|
|
combined_expected = "\n".join(t.expected for t in tests) if tests else ""
|
|
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"problem_id": slug,
|
|
"combined": {
|
|
"input": combined_input,
|
|
"expected": combined_expected,
|
|
},
|
|
"tests": [{"input": t.input, "expected": t.expected} for t in tests],
|
|
"timeout_ms": timeout_ms,
|
|
"memory_mb": memory_mb,
|
|
"interactive": interactive,
|
|
"multi_test": False,
|
|
"precision": precision,
|
|
}
|
|
),
|
|
flush=True,
|
|
)
|
|
|
|
|
|
async def _load_kattis_cookies(client: httpx.AsyncClient) -> None:
|
|
data = load_platform_cookies("kattis")
|
|
if isinstance(data, dict):
|
|
for k, v in data.items():
|
|
client.cookies.set(k, v)
|
|
|
|
|
|
async def _save_kattis_cookies(client: httpx.AsyncClient) -> None:
|
|
cookies = dict(client.cookies.items())
|
|
if cookies:
|
|
save_platform_cookies("kattis", cookies)
|
|
|
|
|
|
async def _check_kattis_login(client: httpx.AsyncClient) -> bool:
|
|
try:
|
|
r = await client.get(BASE_URL, headers=HEADERS, timeout=HTTP_TIMEOUT)
|
|
return bool(r.headers.get("x-username"))
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
async def _do_kattis_login(
|
|
client: httpx.AsyncClient, username: str, password: str
|
|
) -> bool:
|
|
client.cookies.clear()
|
|
r = await client.post(
|
|
f"{BASE_URL}/login",
|
|
data={"user": username, "password": password, "script": "true"},
|
|
headers=HEADERS,
|
|
timeout=HTTP_TIMEOUT,
|
|
)
|
|
return r.status_code == 200
|
|
|
|
|
|
class KattisScraper(BaseScraper):
|
|
@property
|
|
def platform_name(self) -> str:
|
|
return "kattis"
|
|
|
|
async def scrape_contest_metadata(self, contest_id: str) -> MetadataResult:
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
slugs = await _fetch_contest_slugs(client, contest_id)
|
|
if slugs:
|
|
return MetadataResult(
|
|
success=True,
|
|
error="",
|
|
contest_id=contest_id,
|
|
problems=[
|
|
ProblemSummary(id=slug, name=name) for slug, name in slugs
|
|
],
|
|
url=f"{BASE_URL}/problems/%s",
|
|
contest_url=f"{BASE_URL}/contests/{contest_id}",
|
|
standings_url=f"{BASE_URL}/contests/{contest_id}/standings",
|
|
)
|
|
try:
|
|
html = await _fetch_text(
|
|
client, f"{BASE_URL}/problems/{contest_id}"
|
|
)
|
|
except Exception as e:
|
|
return self._metadata_error(str(e))
|
|
title_m = re.search(r"<title>([^<]+)</title>", html)
|
|
name = (
|
|
title_m.group(1).split("\u2013")[0].strip()
|
|
if title_m
|
|
else contest_id
|
|
)
|
|
return MetadataResult(
|
|
success=True,
|
|
error="",
|
|
contest_id=contest_id,
|
|
problems=[ProblemSummary(id=contest_id, name=name)],
|
|
url=f"{BASE_URL}/problems/%s",
|
|
contest_url=f"{BASE_URL}/problems/{contest_id}",
|
|
standings_url="",
|
|
)
|
|
except Exception as e:
|
|
return self._metadata_error(str(e))
|
|
|
|
async def scrape_contest_list(self) -> ContestListResult:
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
html = await _fetch_text(
|
|
client,
|
|
f"{BASE_URL}/contests?kattis_original=on&kattis_recycled=off&user_created=off",
|
|
)
|
|
contests = _parse_contests_page(html)
|
|
if not contests:
|
|
return self._contests_error("No contests found")
|
|
return ContestListResult(success=True, error="", contests=contests)
|
|
except Exception as e:
|
|
return self._contests_error(str(e))
|
|
|
|
async def stream_tests_for_category_async(self, category_id: str) -> None:
|
|
async with httpx.AsyncClient(
|
|
limits=httpx.Limits(max_connections=CONNECTIONS)
|
|
) as client:
|
|
slugs = await _fetch_contest_slugs(client, category_id)
|
|
if slugs:
|
|
sem = asyncio.Semaphore(CONNECTIONS)
|
|
|
|
async def emit_one(slug: str, _name: str) -> None:
|
|
async with sem:
|
|
await _stream_single_problem(client, slug)
|
|
|
|
await asyncio.gather(*(emit_one(s, n) for s, n in slugs))
|
|
return
|
|
|
|
await _stream_single_problem(client, category_id)
|
|
|
|
async def submit(
|
|
self,
|
|
contest_id: str,
|
|
problem_id: str,
|
|
file_path: str,
|
|
language_id: str,
|
|
credentials: dict[str, str],
|
|
) -> SubmitResult:
|
|
source = Path(file_path).read_bytes()
|
|
username = credentials.get("username", "")
|
|
password = credentials.get("password", "")
|
|
if not username or not password:
|
|
return self._submit_error("Missing credentials. Use :CP kattis login")
|
|
|
|
async with httpx.AsyncClient(follow_redirects=True) as client:
|
|
await _load_kattis_cookies(client)
|
|
if client.cookies:
|
|
print(json.dumps({"status": "checking_login"}), flush=True)
|
|
else:
|
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
|
ok = await _do_kattis_login(client, username, password)
|
|
if not ok:
|
|
return self._submit_error("bad_credentials")
|
|
await _save_kattis_cookies(client)
|
|
|
|
print(json.dumps({"status": "submitting"}), flush=True)
|
|
lang_lower = language_id.lower()
|
|
mainclass = Path(file_path).stem if "java" in lang_lower else ""
|
|
data: dict[str, str] = {
|
|
"submit": "true",
|
|
"script": "true",
|
|
"language": language_id,
|
|
"problem": problem_id,
|
|
"mainclass": mainclass,
|
|
"submit_ctr": "2",
|
|
}
|
|
if contest_id != problem_id:
|
|
data["contest"] = contest_id
|
|
|
|
async def _do_submit() -> httpx.Response:
|
|
return await client.post(
|
|
f"{BASE_URL}/submit",
|
|
data=data,
|
|
files={"sub_file[]": (Path(file_path).name, source, "text/plain")},
|
|
headers=HEADERS,
|
|
timeout=HTTP_TIMEOUT,
|
|
)
|
|
|
|
try:
|
|
r = await _do_submit()
|
|
r.raise_for_status()
|
|
except Exception as e:
|
|
return self._submit_error(f"Submit request failed: {e}")
|
|
|
|
if r.status_code in (400, 403) or r.text == "Request validation failed":
|
|
clear_platform_cookies("kattis")
|
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
|
ok = await _do_kattis_login(client, username, password)
|
|
if not ok:
|
|
return self._submit_error("bad_credentials")
|
|
await _save_kattis_cookies(client)
|
|
try:
|
|
r = await _do_submit()
|
|
r.raise_for_status()
|
|
except Exception as e:
|
|
return self._submit_error(f"Submit request failed: {e}")
|
|
|
|
sid_m = re.search(r"Submission ID:\s*(\d+)", r.text, re.IGNORECASE)
|
|
if not sid_m:
|
|
return self._submit_error(
|
|
r.text.strip() or "Submit failed (no submission ID)"
|
|
)
|
|
return SubmitResult(
|
|
success=True,
|
|
error="",
|
|
submission_id=sid_m.group(1),
|
|
verdict="submitted",
|
|
)
|
|
|
|
async def login(self, credentials: dict[str, str]) -> LoginResult:
|
|
username = credentials.get("username", "")
|
|
password = credentials.get("password", "")
|
|
if not username or not password:
|
|
return self._login_error("Missing username or password")
|
|
|
|
async with httpx.AsyncClient(follow_redirects=True) as client:
|
|
await _load_kattis_cookies(client)
|
|
if client.cookies:
|
|
print(json.dumps({"status": "checking_login"}), flush=True)
|
|
if await _check_kattis_login(client):
|
|
return LoginResult(
|
|
success=True,
|
|
error="",
|
|
credentials={"username": username, "password": password},
|
|
)
|
|
|
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
|
ok = await _do_kattis_login(client, username, password)
|
|
if not ok:
|
|
return self._login_error("bad_credentials")
|
|
await _save_kattis_cookies(client)
|
|
return LoginResult(
|
|
success=True,
|
|
error="",
|
|
credentials={"username": username, "password": password},
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
KattisScraper().run_cli()
|