Problem: Wrong credentials during login produced two bugs: scrapers wrapped the `bad_credentials` error code in `"Login failed: ..."`, causing double-prefixed messages in the UI; and `credentials.lua` did not clear cached credentials before re-prompting or on failure, leaving stale bad creds in the cache. Solution: Standardize all scrapers to emit `"bad_credentials"` as the raw error code. Add `LOGIN_ERRORS` map in `constants.lua` to translate it to a human-readable string in both `credentials.lua` and `submit.lua`. Fix `credentials.lua` to clear credentials on failure in both the fresh-prompt and cached-creds-fail paths. For AtCoder and Codeforces, replace `wait_for_url` with `wait_for_function` to detect the login error element immediately rather than waiting the full 10s navigation timeout. Also add "Remember me" checkbox check on Codeforces login.
434 lines
15 KiB
Python
434 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import asyncio
|
|
import io
|
|
import json
|
|
import re
|
|
import zipfile
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
|
|
from .base import (
|
|
BaseScraper,
|
|
clear_platform_cookies,
|
|
extract_precision,
|
|
load_platform_cookies,
|
|
save_platform_cookies,
|
|
)
|
|
from .timeouts import HTTP_TIMEOUT
|
|
from .models import (
|
|
ContestListResult,
|
|
ContestSummary,
|
|
LoginResult,
|
|
MetadataResult,
|
|
ProblemSummary,
|
|
SubmitResult,
|
|
TestCase,
|
|
)
|
|
|
|
BASE_URL = "https://open.kattis.com"
|
|
HEADERS = {
|
|
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
|
}
|
|
CONNECTIONS = 8
|
|
|
|
TIME_RE = re.compile(
|
|
r"CPU Time limit</span>\s*<span[^>]*>\s*(\d+)\s*seconds?\s*</span>",
|
|
re.DOTALL,
|
|
)
|
|
MEM_RE = re.compile(
|
|
r"Memory limit</span>\s*<span[^>]*>\s*(\d+)\s*MB\s*</span>",
|
|
re.DOTALL,
|
|
)
|
|
|
|
|
|
async def _fetch_text(client: httpx.AsyncClient, url: str) -> str:
|
|
r = await client.get(url, headers=HEADERS, timeout=HTTP_TIMEOUT)
|
|
r.raise_for_status()
|
|
return r.text
|
|
|
|
|
|
async def _fetch_bytes(client: httpx.AsyncClient, url: str) -> bytes:
|
|
r = await client.get(url, headers=HEADERS, timeout=HTTP_TIMEOUT)
|
|
r.raise_for_status()
|
|
return r.content
|
|
|
|
|
|
def _parse_limits(html: str) -> tuple[int, int]:
|
|
tm = TIME_RE.search(html)
|
|
mm = MEM_RE.search(html)
|
|
timeout_ms = int(tm.group(1)) * 1000 if tm else 1000
|
|
memory_mb = int(mm.group(1)) if mm else 1024
|
|
return timeout_ms, memory_mb
|
|
|
|
|
|
def _parse_samples_html(html: str) -> list[TestCase]:
|
|
tests: list[TestCase] = []
|
|
tables = re.finditer(r'<table\s+class="sample"[^>]*>.*?</table>', html, re.DOTALL)
|
|
for table_match in tables:
|
|
table_html = table_match.group(0)
|
|
pres = re.findall(r"<pre>(.*?)</pre>", table_html, re.DOTALL)
|
|
if len(pres) >= 2:
|
|
inp = pres[0].strip()
|
|
out = pres[1].strip()
|
|
tests.append(TestCase(input=inp, expected=out))
|
|
return tests
|
|
|
|
|
|
def _parse_samples_zip(data: bytes) -> list[TestCase]:
|
|
try:
|
|
zf = zipfile.ZipFile(io.BytesIO(data))
|
|
except zipfile.BadZipFile:
|
|
return []
|
|
inputs: dict[str, str] = {}
|
|
outputs: dict[str, str] = {}
|
|
for name in zf.namelist():
|
|
content = zf.read(name).decode("utf-8").strip()
|
|
if name.endswith(".in"):
|
|
key = name[: -len(".in")]
|
|
inputs[key] = content
|
|
elif name.endswith(".ans"):
|
|
key = name[: -len(".ans")]
|
|
outputs[key] = content
|
|
tests: list[TestCase] = []
|
|
for key in sorted(set(inputs) & set(outputs)):
|
|
tests.append(TestCase(input=inputs[key], expected=outputs[key]))
|
|
return tests
|
|
|
|
|
|
def _is_interactive(html: str) -> bool:
|
|
return "This is an interactive problem" in html
|
|
|
|
|
|
def _parse_contests_page(html: str) -> list[ContestSummary]:
|
|
results: list[ContestSummary] = []
|
|
seen: set[str] = set()
|
|
for row_m in re.finditer(r"<tr[^>]*>(.*?)</tr>", html, re.DOTALL):
|
|
row = row_m.group(1)
|
|
link_m = re.search(r'href="/contests/([a-z0-9]+)"[^>]*>([^<]+)</a>', row)
|
|
if not link_m:
|
|
continue
|
|
cid = link_m.group(1)
|
|
name = link_m.group(2).strip()
|
|
if cid in seen:
|
|
continue
|
|
seen.add(cid)
|
|
start_time: int | None = None
|
|
ts_m = re.search(r'data-timestamp="(\d+)"', row)
|
|
if ts_m:
|
|
start_time = int(ts_m.group(1))
|
|
else:
|
|
time_m = re.search(r'<time[^>]+datetime="([^"]+)"', row)
|
|
if time_m:
|
|
try:
|
|
dt = datetime.fromisoformat(time_m.group(1).replace("Z", "+00:00"))
|
|
start_time = int(dt.timestamp())
|
|
except Exception:
|
|
pass
|
|
results.append(
|
|
ContestSummary(id=cid, name=name, display_name=name, start_time=start_time)
|
|
)
|
|
return results
|
|
|
|
|
|
def _parse_contest_problem_list(html: str) -> list[tuple[str, str]]:
|
|
if "The problems will become available when the contest starts" in html:
|
|
return []
|
|
results: list[tuple[str, str]] = []
|
|
seen: set[str] = set()
|
|
for row_m in re.finditer(r"<tr[^>]*>(.*?)</tr>", html, re.DOTALL):
|
|
row = row_m.group(1)
|
|
link_m = re.search(
|
|
r'href="/contests/[^/]+/problems/([^"]+)"[^>]*>([^<]+)</a>', row
|
|
)
|
|
if not link_m:
|
|
continue
|
|
slug = link_m.group(1)
|
|
name = link_m.group(2).strip()
|
|
if slug in seen:
|
|
continue
|
|
seen.add(slug)
|
|
label_m = re.search(r"<td[^>]*>\s*([A-Z])\s*</td>", row)
|
|
label = label_m.group(1) if label_m else ""
|
|
display = f"{label} - {name}" if label else name
|
|
results.append((slug, display))
|
|
return results
|
|
|
|
|
|
async def _fetch_contest_slugs(
|
|
client: httpx.AsyncClient, contest_id: str
|
|
) -> list[tuple[str, str]]:
|
|
try:
|
|
html = await _fetch_text(client, f"{BASE_URL}/contests/{contest_id}/problems")
|
|
return _parse_contest_problem_list(html)
|
|
except httpx.HTTPStatusError:
|
|
return []
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
async def _stream_single_problem(client: httpx.AsyncClient, slug: str) -> None:
|
|
try:
|
|
html = await _fetch_text(client, f"{BASE_URL}/problems/{slug}")
|
|
except Exception:
|
|
return
|
|
|
|
timeout_ms, memory_mb = _parse_limits(html)
|
|
interactive = _is_interactive(html)
|
|
precision = extract_precision(html)
|
|
|
|
tests: list[TestCase] = []
|
|
try:
|
|
zip_data = await _fetch_bytes(
|
|
client,
|
|
f"{BASE_URL}/problems/{slug}/file/statement/samples.zip",
|
|
)
|
|
tests = _parse_samples_zip(zip_data)
|
|
except Exception:
|
|
tests = _parse_samples_html(html)
|
|
|
|
combined_input = "\n".join(t.input for t in tests) if tests else ""
|
|
combined_expected = "\n".join(t.expected for t in tests) if tests else ""
|
|
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"problem_id": slug,
|
|
"combined": {
|
|
"input": combined_input,
|
|
"expected": combined_expected,
|
|
},
|
|
"tests": [{"input": t.input, "expected": t.expected} for t in tests],
|
|
"timeout_ms": timeout_ms,
|
|
"memory_mb": memory_mb,
|
|
"interactive": interactive,
|
|
"multi_test": False,
|
|
"precision": precision,
|
|
}
|
|
),
|
|
flush=True,
|
|
)
|
|
|
|
|
|
async def _load_kattis_cookies(client: httpx.AsyncClient) -> None:
|
|
data = load_platform_cookies("kattis")
|
|
if isinstance(data, dict):
|
|
for k, v in data.items():
|
|
client.cookies.set(k, v)
|
|
|
|
|
|
async def _save_kattis_cookies(client: httpx.AsyncClient) -> None:
|
|
cookies = dict(client.cookies.items())
|
|
if cookies:
|
|
save_platform_cookies("kattis", cookies)
|
|
|
|
|
|
async def _check_kattis_login(client: httpx.AsyncClient) -> bool:
|
|
try:
|
|
r = await client.get(BASE_URL, headers=HEADERS, timeout=HTTP_TIMEOUT)
|
|
return bool(r.headers.get("x-username"))
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
async def _do_kattis_login(
|
|
client: httpx.AsyncClient, username: str, password: str
|
|
) -> bool:
|
|
client.cookies.clear()
|
|
r = await client.post(
|
|
f"{BASE_URL}/login",
|
|
data={"user": username, "password": password, "script": "true"},
|
|
headers=HEADERS,
|
|
timeout=HTTP_TIMEOUT,
|
|
)
|
|
return r.status_code == 200
|
|
|
|
|
|
class KattisScraper(BaseScraper):
|
|
@property
|
|
def platform_name(self) -> str:
|
|
return "kattis"
|
|
|
|
async def scrape_contest_metadata(self, contest_id: str) -> MetadataResult:
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
slugs = await _fetch_contest_slugs(client, contest_id)
|
|
if slugs:
|
|
return MetadataResult(
|
|
success=True,
|
|
error="",
|
|
contest_id=contest_id,
|
|
problems=[
|
|
ProblemSummary(id=slug, name=name) for slug, name in slugs
|
|
],
|
|
url=f"{BASE_URL}/problems/%s",
|
|
contest_url=f"{BASE_URL}/contests/{contest_id}",
|
|
standings_url=f"{BASE_URL}/contests/{contest_id}/standings",
|
|
)
|
|
try:
|
|
html = await _fetch_text(
|
|
client, f"{BASE_URL}/problems/{contest_id}"
|
|
)
|
|
except Exception as e:
|
|
return self._metadata_error(str(e))
|
|
title_m = re.search(r"<title>([^<]+)</title>", html)
|
|
name = (
|
|
title_m.group(1).split("\u2013")[0].strip()
|
|
if title_m
|
|
else contest_id
|
|
)
|
|
return MetadataResult(
|
|
success=True,
|
|
error="",
|
|
contest_id=contest_id,
|
|
problems=[ProblemSummary(id=contest_id, name=name)],
|
|
url=f"{BASE_URL}/problems/%s",
|
|
contest_url=f"{BASE_URL}/problems/{contest_id}",
|
|
standings_url="",
|
|
)
|
|
except Exception as e:
|
|
return self._metadata_error(str(e))
|
|
|
|
async def scrape_contest_list(self) -> ContestListResult:
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
html = await _fetch_text(
|
|
client,
|
|
f"{BASE_URL}/contests?kattis_original=on&kattis_recycled=off&user_created=off",
|
|
)
|
|
contests = _parse_contests_page(html)
|
|
if not contests:
|
|
return self._contests_error("No contests found")
|
|
return ContestListResult(success=True, error="", contests=contests)
|
|
except Exception as e:
|
|
return self._contests_error(str(e))
|
|
|
|
async def stream_tests_for_category_async(self, category_id: str) -> None:
|
|
async with httpx.AsyncClient(
|
|
limits=httpx.Limits(max_connections=CONNECTIONS)
|
|
) as client:
|
|
slugs = await _fetch_contest_slugs(client, category_id)
|
|
if slugs:
|
|
sem = asyncio.Semaphore(CONNECTIONS)
|
|
|
|
async def emit_one(slug: str, _name: str) -> None:
|
|
async with sem:
|
|
await _stream_single_problem(client, slug)
|
|
|
|
await asyncio.gather(*(emit_one(s, n) for s, n in slugs))
|
|
return
|
|
|
|
await _stream_single_problem(client, category_id)
|
|
|
|
async def submit(
|
|
self,
|
|
contest_id: str,
|
|
problem_id: str,
|
|
file_path: str,
|
|
language_id: str,
|
|
credentials: dict[str, str],
|
|
) -> SubmitResult:
|
|
source = Path(file_path).read_bytes()
|
|
username = credentials.get("username", "")
|
|
password = credentials.get("password", "")
|
|
if not username or not password:
|
|
return self._submit_error("Missing credentials. Use :CP kattis login")
|
|
|
|
async with httpx.AsyncClient(follow_redirects=True) as client:
|
|
await _load_kattis_cookies(client)
|
|
if client.cookies:
|
|
print(json.dumps({"status": "checking_login"}), flush=True)
|
|
else:
|
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
|
ok = await _do_kattis_login(client, username, password)
|
|
if not ok:
|
|
return self._submit_error("bad_credentials")
|
|
await _save_kattis_cookies(client)
|
|
|
|
print(json.dumps({"status": "submitting"}), flush=True)
|
|
lang_lower = language_id.lower()
|
|
mainclass = Path(file_path).stem if "java" in lang_lower else ""
|
|
data: dict[str, str] = {
|
|
"submit": "true",
|
|
"script": "true",
|
|
"language": language_id,
|
|
"problem": problem_id,
|
|
"mainclass": mainclass,
|
|
"submit_ctr": "2",
|
|
}
|
|
if contest_id != problem_id:
|
|
data["contest"] = contest_id
|
|
|
|
async def _do_submit() -> httpx.Response:
|
|
return await client.post(
|
|
f"{BASE_URL}/submit",
|
|
data=data,
|
|
files={"sub_file[]": (Path(file_path).name, source, "text/plain")},
|
|
headers=HEADERS,
|
|
timeout=HTTP_TIMEOUT,
|
|
)
|
|
|
|
try:
|
|
r = await _do_submit()
|
|
r.raise_for_status()
|
|
except Exception as e:
|
|
return self._submit_error(f"Submit request failed: {e}")
|
|
|
|
if r.status_code in (400, 403) or r.text == "Request validation failed":
|
|
clear_platform_cookies("kattis")
|
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
|
ok = await _do_kattis_login(client, username, password)
|
|
if not ok:
|
|
return self._submit_error("bad_credentials")
|
|
await _save_kattis_cookies(client)
|
|
try:
|
|
r = await _do_submit()
|
|
r.raise_for_status()
|
|
except Exception as e:
|
|
return self._submit_error(f"Submit request failed: {e}")
|
|
|
|
sid_m = re.search(r"Submission ID:\s*(\d+)", r.text, re.IGNORECASE)
|
|
if not sid_m:
|
|
return self._submit_error(
|
|
r.text.strip() or "Submit failed (no submission ID)"
|
|
)
|
|
return SubmitResult(
|
|
success=True,
|
|
error="",
|
|
submission_id=sid_m.group(1),
|
|
verdict="submitted",
|
|
)
|
|
|
|
async def login(self, credentials: dict[str, str]) -> LoginResult:
|
|
username = credentials.get("username", "")
|
|
password = credentials.get("password", "")
|
|
if not username or not password:
|
|
return self._login_error("Missing username or password")
|
|
|
|
async with httpx.AsyncClient(follow_redirects=True) as client:
|
|
await _load_kattis_cookies(client)
|
|
if client.cookies:
|
|
print(json.dumps({"status": "checking_login"}), flush=True)
|
|
if await _check_kattis_login(client):
|
|
return LoginResult(
|
|
success=True,
|
|
error="",
|
|
credentials={"username": username, "password": password},
|
|
)
|
|
|
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
|
ok = await _do_kattis_login(client, username, password)
|
|
if not ok:
|
|
return self._login_error("bad_credentials")
|
|
await _save_kattis_cookies(client)
|
|
return LoginResult(
|
|
success=True,
|
|
error="",
|
|
credentials={"username": username, "password": password},
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
KattisScraper().run_cli()
|