Problem: Wrong credentials during login produced two bugs: scrapers wrapped the `bad_credentials` error code in `"Login failed: ..."`, causing double-prefixed messages in the UI; and `credentials.lua` did not clear cached credentials before re-prompting or on failure, leaving stale bad creds in the cache. Solution: Standardize all scrapers to emit `"bad_credentials"` as the raw error code. Add `LOGIN_ERRORS` map in `constants.lua` to translate it to a human-readable string in both `credentials.lua` and `submit.lua`. Fix `credentials.lua` to clear credentials on failure in both the fresh-prompt and cached-creds-fail paths. For AtCoder and Codeforces, replace `wait_for_url` with `wait_for_function` to detect the login error element immediately rather than waiting the full 10s navigation timeout. Also add "Remember me" checkbox check on Codeforces login.
557 lines
19 KiB
Python
557 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import asyncio
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Any, cast
|
|
|
|
import httpx
|
|
|
|
from .base import (
|
|
BaseScraper,
|
|
extract_precision,
|
|
load_platform_cookies,
|
|
save_platform_cookies,
|
|
)
|
|
from .timeouts import HTTP_TIMEOUT
|
|
from .models import (
|
|
ContestListResult,
|
|
ContestSummary,
|
|
LoginResult,
|
|
MetadataResult,
|
|
ProblemSummary,
|
|
SubmitResult,
|
|
TestCase,
|
|
)
|
|
|
|
BASE_URL = "http://www.usaco.org"
|
|
_AUTH_BASE = "https://usaco.org"
|
|
HEADERS = {
|
|
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
|
}
|
|
CONNECTIONS = 4
|
|
|
|
_LOGIN_PATH = "/current/tpcm/login-session.php"
|
|
_SUBMIT_PATH = "/current/tpcm/submit-solution.php"
|
|
|
|
_LANG_KEYWORDS: dict[str, list[str]] = {
|
|
"cpp": ["c++17", "c++ 17", "g++17", "c++", "cpp"],
|
|
"python": ["python3", "python 3", "python"],
|
|
"java": ["java"],
|
|
}
|
|
|
|
MONTHS = [
|
|
"dec",
|
|
"jan",
|
|
"feb",
|
|
"mar",
|
|
"open",
|
|
]
|
|
|
|
DIVISION_HEADING_RE = re.compile(
|
|
r"<h2>.*?USACO\s+(\d{4})\s+(\w+)\s+Contest,\s+(\w+)\s*</h2>",
|
|
re.IGNORECASE,
|
|
)
|
|
PROBLEM_BLOCK_RE = re.compile(
|
|
r"<b>([^<]+)</b>\s*<br\s*/?>.*?"
|
|
r"viewproblem2&cpid=(\d+)",
|
|
re.DOTALL,
|
|
)
|
|
SAMPLE_IN_RE = re.compile(r"<pre\s+class=['\"]in['\"]>(.*?)</pre>", re.DOTALL)
|
|
SAMPLE_OUT_RE = re.compile(r"<pre\s+class=['\"]out['\"]>(.*?)</pre>", re.DOTALL)
|
|
TIME_NOTE_RE = re.compile(
|
|
r"time\s+limit\s+(?:for\s+this\s+problem\s+is\s+)?(\d+)s",
|
|
re.IGNORECASE,
|
|
)
|
|
MEMORY_NOTE_RE = re.compile(
|
|
r"memory\s+limit\s+(?:for\s+this\s+problem\s+is\s+)?(\d+)\s*MB",
|
|
re.IGNORECASE,
|
|
)
|
|
RESULTS_PAGE_RE = re.compile(
|
|
r'href="index\.php\?page=([a-z]+\d{2,4}results)"',
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
async def _fetch_text(client: httpx.AsyncClient, url: str) -> str:
|
|
r = await client.get(
|
|
url, headers=HEADERS, timeout=HTTP_TIMEOUT, follow_redirects=True
|
|
)
|
|
r.raise_for_status()
|
|
return r.text
|
|
|
|
|
|
def _parse_results_page(html: str) -> dict[str, list[tuple[str, str]]]:
|
|
sections: dict[str, list[tuple[str, str]]] = {}
|
|
current_div: str | None = None
|
|
|
|
parts = re.split(r"(<h2>.*?</h2>)", html, flags=re.DOTALL)
|
|
for part in parts:
|
|
heading_m = DIVISION_HEADING_RE.search(part)
|
|
if heading_m:
|
|
div = heading_m.group(3)
|
|
if div:
|
|
key = div.lower()
|
|
current_div = key
|
|
sections.setdefault(key, [])
|
|
continue
|
|
if current_div is not None:
|
|
for m in PROBLEM_BLOCK_RE.finditer(part):
|
|
name = m.group(1).strip()
|
|
cpid = m.group(2)
|
|
sections[current_div].append((cpid, name))
|
|
|
|
return sections
|
|
|
|
|
|
def _parse_contest_id(contest_id: str) -> tuple[str, str]:
|
|
parts = contest_id.rsplit("_", 1)
|
|
if len(parts) != 2:
|
|
return contest_id, ""
|
|
return parts[0], parts[1].lower()
|
|
|
|
|
|
def _results_page_slug(month_year: str) -> str:
|
|
return f"{month_year}results"
|
|
|
|
|
|
def _parse_problem_page(html: str) -> dict[str, Any]:
|
|
inputs = SAMPLE_IN_RE.findall(html)
|
|
outputs = SAMPLE_OUT_RE.findall(html)
|
|
tests: list[TestCase] = []
|
|
for inp, out in zip(inputs, outputs):
|
|
tests.append(
|
|
TestCase(
|
|
input=inp.strip().replace("\r", ""),
|
|
expected=out.strip().replace("\r", ""),
|
|
)
|
|
)
|
|
|
|
tm = TIME_NOTE_RE.search(html)
|
|
mm = MEMORY_NOTE_RE.search(html)
|
|
timeout_ms = int(tm.group(1)) * 1000 if tm else 4000
|
|
memory_mb = int(mm.group(1)) if mm else 256
|
|
|
|
interactive = "interactive problem" in html.lower()
|
|
precision = extract_precision(html)
|
|
|
|
return {
|
|
"tests": tests,
|
|
"timeout_ms": timeout_ms,
|
|
"memory_mb": memory_mb,
|
|
"interactive": interactive,
|
|
"precision": precision,
|
|
}
|
|
|
|
|
|
def _pick_lang_option(select_body: str, language_id: str) -> str | None:
|
|
keywords = _LANG_KEYWORDS.get(language_id.lower(), [language_id.lower()])
|
|
options = [
|
|
(m.group(1), m.group(2).strip().lower())
|
|
for m in re.finditer(
|
|
r'<option\b[^>]*\bvalue=["\']([^"\']*)["\'][^>]*>([^<]+)',
|
|
select_body,
|
|
re.IGNORECASE,
|
|
)
|
|
]
|
|
for kw in keywords:
|
|
for val, text in options:
|
|
if kw in text:
|
|
return val
|
|
return None
|
|
|
|
|
|
def _parse_submit_form(
|
|
html: str, language_id: str
|
|
) -> tuple[str, dict[str, str], str | None]:
|
|
form_action = _AUTH_BASE + _SUBMIT_PATH
|
|
hidden: dict[str, str] = {}
|
|
lang_val: str | None = None
|
|
for form_m in re.finditer(
|
|
r'<form\b[^>]*action=["\']([^"\']+)["\'][^>]*>(.*?)</form>',
|
|
html,
|
|
re.DOTALL | re.IGNORECASE,
|
|
):
|
|
action, body = form_m.group(1), form_m.group(2)
|
|
if "sourcefile" not in body.lower():
|
|
continue
|
|
if action.startswith("http"):
|
|
form_action = action
|
|
elif action.startswith("/"):
|
|
form_action = _AUTH_BASE + action
|
|
else:
|
|
form_action = _AUTH_BASE + "/" + action
|
|
for input_m in re.finditer(
|
|
r'<input\b[^>]*\btype=["\']hidden["\'][^>]*/?>',
|
|
body,
|
|
re.IGNORECASE,
|
|
):
|
|
tag = input_m.group(0)
|
|
name_m = re.search(r'\bname=["\']([^"\']+)["\']', tag, re.IGNORECASE)
|
|
val_m = re.search(r'\bvalue=["\']([^"\']*)["\']', tag, re.IGNORECASE)
|
|
if name_m and val_m:
|
|
hidden[name_m.group(1)] = val_m.group(1)
|
|
for sel_m in re.finditer(
|
|
r'<select\b[^>]*\bname=["\']([^"\']+)["\'][^>]*>(.*?)</select>',
|
|
body,
|
|
re.DOTALL | re.IGNORECASE,
|
|
):
|
|
name, sel_body = sel_m.group(1), sel_m.group(2)
|
|
if "lang" in name.lower():
|
|
lang_val = _pick_lang_option(sel_body, language_id)
|
|
break
|
|
break
|
|
return form_action, hidden, lang_val
|
|
|
|
|
|
async def _load_usaco_cookies(client: httpx.AsyncClient) -> None:
|
|
data = load_platform_cookies("usaco")
|
|
if isinstance(data, dict):
|
|
for k, v in data.items():
|
|
client.cookies.set(k, v)
|
|
|
|
|
|
async def _save_usaco_cookies(client: httpx.AsyncClient) -> None:
|
|
cookies = dict(client.cookies.items())
|
|
if cookies:
|
|
save_platform_cookies("usaco", cookies)
|
|
|
|
|
|
async def _check_usaco_login(client: httpx.AsyncClient, username: str) -> bool:
|
|
try:
|
|
r = await client.get(
|
|
f"{_AUTH_BASE}/index.php",
|
|
headers=HEADERS,
|
|
timeout=HTTP_TIMEOUT,
|
|
)
|
|
text = r.text.lower()
|
|
return username.lower() in text or "logout" in text
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
async def _do_usaco_login(
|
|
client: httpx.AsyncClient, username: str, password: str
|
|
) -> bool:
|
|
r = await client.post(
|
|
f"{_AUTH_BASE}{_LOGIN_PATH}",
|
|
data={"uname": username, "password": password},
|
|
headers=HEADERS,
|
|
timeout=HTTP_TIMEOUT,
|
|
)
|
|
r.raise_for_status()
|
|
try:
|
|
return r.json().get("code") == 1
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
class USACOScraper(BaseScraper):
|
|
@property
|
|
def platform_name(self) -> str:
|
|
return "usaco"
|
|
|
|
async def scrape_contest_metadata(self, contest_id: str) -> MetadataResult:
|
|
try:
|
|
month_year, division = _parse_contest_id(contest_id)
|
|
if not division:
|
|
return self._metadata_error(
|
|
f"Invalid contest ID '{contest_id}'. "
|
|
"Expected format: <monthYY>_<division> (e.g. dec24_gold)"
|
|
)
|
|
|
|
slug = _results_page_slug(month_year)
|
|
async with httpx.AsyncClient() as client:
|
|
html = await _fetch_text(client, f"{BASE_URL}/index.php?page={slug}")
|
|
sections = _parse_results_page(html)
|
|
problems_raw = sections.get(division, [])
|
|
if not problems_raw:
|
|
return self._metadata_error(
|
|
f"No problems found for {contest_id} (division: {division})"
|
|
)
|
|
problems = [
|
|
ProblemSummary(id=cpid, name=name) for cpid, name in problems_raw
|
|
]
|
|
return MetadataResult(
|
|
success=True,
|
|
error="",
|
|
contest_id=contest_id,
|
|
problems=problems,
|
|
url=f"{BASE_URL}/index.php?page=viewproblem2&cpid=%s",
|
|
)
|
|
except Exception as e:
|
|
return self._metadata_error(str(e))
|
|
|
|
async def scrape_contest_list(self) -> ContestListResult:
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
limits=httpx.Limits(max_connections=CONNECTIONS)
|
|
) as client:
|
|
html = await _fetch_text(client, f"{BASE_URL}/index.php?page=contests")
|
|
|
|
page_slugs: set[str] = set()
|
|
for m in RESULTS_PAGE_RE.finditer(html):
|
|
page_slugs.add(m.group(1))
|
|
|
|
recent_patterns = []
|
|
for year in range(15, 27):
|
|
for month in MONTHS:
|
|
recent_patterns.append(f"{month}{year:02d}results")
|
|
page_slugs.update(recent_patterns)
|
|
|
|
contests: list[ContestSummary] = []
|
|
sem = asyncio.Semaphore(CONNECTIONS)
|
|
|
|
async def check_page(slug: str) -> list[ContestSummary]:
|
|
async with sem:
|
|
try:
|
|
page_html = await _fetch_text(
|
|
client, f"{BASE_URL}/index.php?page={slug}"
|
|
)
|
|
except Exception:
|
|
return []
|
|
sections = _parse_results_page(page_html)
|
|
if not sections:
|
|
return []
|
|
month_year = slug.replace("results", "")
|
|
out: list[ContestSummary] = []
|
|
for div in sections:
|
|
cid = f"{month_year}_{div}"
|
|
year_m = re.search(r"\d{2,4}", month_year)
|
|
month_m = re.search(r"[a-z]+", month_year)
|
|
year_str = year_m.group() if year_m else ""
|
|
month_str = month_m.group().capitalize() if month_m else ""
|
|
if len(year_str) == 2:
|
|
year_str = f"20{year_str}"
|
|
display = (
|
|
f"USACO {year_str} {month_str} - {div.capitalize()}"
|
|
)
|
|
out.append(
|
|
ContestSummary(id=cid, name=cid, display_name=display)
|
|
)
|
|
return out
|
|
|
|
tasks = [check_page(slug) for slug in sorted(page_slugs)]
|
|
for coro in asyncio.as_completed(tasks):
|
|
contests.extend(await coro)
|
|
|
|
if not contests:
|
|
return ContestListResult(
|
|
success=False, error="No contests found", supports_countdown=False
|
|
)
|
|
return ContestListResult(
|
|
success=True, error="", contests=contests, supports_countdown=False
|
|
)
|
|
except Exception as e:
|
|
return ContestListResult(
|
|
success=False, error=str(e), supports_countdown=False
|
|
)
|
|
|
|
async def stream_tests_for_category_async(self, category_id: str) -> None:
|
|
month_year, division = _parse_contest_id(category_id)
|
|
if not division:
|
|
return
|
|
|
|
slug = _results_page_slug(month_year)
|
|
async with httpx.AsyncClient(
|
|
limits=httpx.Limits(max_connections=CONNECTIONS)
|
|
) as client:
|
|
try:
|
|
html = await _fetch_text(client, f"{BASE_URL}/index.php?page={slug}")
|
|
except Exception:
|
|
return
|
|
|
|
sections = _parse_results_page(html)
|
|
problems_raw = sections.get(division, [])
|
|
if not problems_raw:
|
|
return
|
|
|
|
sem = asyncio.Semaphore(CONNECTIONS)
|
|
|
|
async def run_one(cpid: str) -> dict[str, Any]:
|
|
async with sem:
|
|
try:
|
|
problem_html = await _fetch_text(
|
|
client,
|
|
f"{BASE_URL}/index.php?page=viewproblem2&cpid={cpid}",
|
|
)
|
|
info = _parse_problem_page(problem_html)
|
|
except Exception:
|
|
info = {
|
|
"tests": [],
|
|
"timeout_ms": 4000,
|
|
"memory_mb": 256,
|
|
"interactive": False,
|
|
"precision": None,
|
|
}
|
|
|
|
tests = cast(list[TestCase], info["tests"])
|
|
combined_input = "\n".join(t.input for t in tests) if tests else ""
|
|
combined_expected = (
|
|
"\n".join(t.expected for t in tests) if tests else ""
|
|
)
|
|
|
|
return {
|
|
"problem_id": cpid,
|
|
"combined": {
|
|
"input": combined_input,
|
|
"expected": combined_expected,
|
|
},
|
|
"tests": [
|
|
{"input": t.input, "expected": t.expected} for t in tests
|
|
],
|
|
"timeout_ms": info["timeout_ms"],
|
|
"memory_mb": info["memory_mb"],
|
|
"interactive": info["interactive"],
|
|
"multi_test": False,
|
|
"precision": info["precision"],
|
|
}
|
|
|
|
tasks = [run_one(cpid) for cpid, _ in problems_raw]
|
|
for coro in asyncio.as_completed(tasks):
|
|
payload = await coro
|
|
print(json.dumps(payload), flush=True)
|
|
|
|
async def submit(
|
|
self,
|
|
contest_id: str,
|
|
problem_id: str,
|
|
file_path: str,
|
|
language_id: str,
|
|
credentials: dict[str, str],
|
|
) -> SubmitResult:
|
|
source = Path(file_path).read_bytes()
|
|
username = credentials.get("username", "")
|
|
password = credentials.get("password", "")
|
|
if not username or not password:
|
|
return self._submit_error("Missing credentials. Use :CP usaco login")
|
|
|
|
async with httpx.AsyncClient(follow_redirects=True) as client:
|
|
await _load_usaco_cookies(client)
|
|
if client.cookies:
|
|
print(json.dumps({"status": "checking_login"}), flush=True)
|
|
if not await _check_usaco_login(client, username):
|
|
client.cookies.clear()
|
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
|
try:
|
|
ok = await _do_usaco_login(client, username, password)
|
|
except Exception as e:
|
|
return self._submit_error(f"Login failed: {e}")
|
|
if not ok:
|
|
return self._submit_error("bad_credentials")
|
|
await _save_usaco_cookies(client)
|
|
else:
|
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
|
try:
|
|
ok = await _do_usaco_login(client, username, password)
|
|
except Exception as e:
|
|
return self._submit_error(f"Login failed: {e}")
|
|
if not ok:
|
|
return self._submit_error("bad_credentials")
|
|
await _save_usaco_cookies(client)
|
|
|
|
result = await self._do_submit(client, problem_id, language_id, source)
|
|
|
|
if result.success or result.error != "auth_failure":
|
|
return result
|
|
|
|
client.cookies.clear()
|
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
|
try:
|
|
ok = await _do_usaco_login(client, username, password)
|
|
except Exception as e:
|
|
return self._submit_error(f"Login failed: {e}")
|
|
if not ok:
|
|
return self._submit_error("bad_credentials")
|
|
await _save_usaco_cookies(client)
|
|
|
|
return await self._do_submit(client, problem_id, language_id, source)
|
|
|
|
async def _do_submit(
|
|
self,
|
|
client: httpx.AsyncClient,
|
|
problem_id: str,
|
|
language_id: str,
|
|
source: bytes,
|
|
) -> SubmitResult:
|
|
print(json.dumps({"status": "submitting"}), flush=True)
|
|
try:
|
|
page_r = await client.get(
|
|
f"{_AUTH_BASE}/index.php?page=viewproblem2&cpid={problem_id}",
|
|
headers=HEADERS,
|
|
timeout=HTTP_TIMEOUT,
|
|
)
|
|
page_url = str(page_r.url)
|
|
if "/login" in page_url or "Login" in page_r.text[:2000]:
|
|
return self._submit_error("auth_failure")
|
|
form_url, hidden_fields, lang_val = _parse_submit_form(
|
|
page_r.text, language_id
|
|
)
|
|
except Exception:
|
|
form_url = _AUTH_BASE + _SUBMIT_PATH
|
|
hidden_fields = {}
|
|
lang_val = None
|
|
|
|
data: dict[str, str] = {"cpid": problem_id, **hidden_fields}
|
|
data["language"] = lang_val if lang_val is not None else language_id
|
|
ext = "py" if "python" in language_id.lower() else "cpp"
|
|
try:
|
|
r = await client.post(
|
|
form_url,
|
|
data=data,
|
|
files={"sourcefile": (f"solution.{ext}", source, "text/plain")},
|
|
headers=HEADERS,
|
|
timeout=HTTP_TIMEOUT,
|
|
)
|
|
r.raise_for_status()
|
|
except Exception as e:
|
|
return self._submit_error(f"Submit request failed: {e}")
|
|
|
|
try:
|
|
resp = r.json()
|
|
if resp.get("code") == 0 and "login" in resp.get("message", "").lower():
|
|
return self._submit_error("auth_failure")
|
|
sid = str(resp.get("submission_id", resp.get("id", "")))
|
|
except Exception:
|
|
sid = ""
|
|
return SubmitResult(
|
|
success=True, error="", submission_id=sid, verdict="submitted"
|
|
)
|
|
|
|
async def login(self, credentials: dict[str, str]) -> LoginResult:
|
|
username = credentials.get("username", "")
|
|
password = credentials.get("password", "")
|
|
if not username or not password:
|
|
return self._login_error("Missing username or password")
|
|
|
|
async with httpx.AsyncClient(follow_redirects=True) as client:
|
|
await _load_usaco_cookies(client)
|
|
if client.cookies:
|
|
print(json.dumps({"status": "checking_login"}), flush=True)
|
|
if await _check_usaco_login(client, username):
|
|
return LoginResult(
|
|
success=True,
|
|
error="",
|
|
credentials={"username": username, "password": password},
|
|
)
|
|
|
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
|
try:
|
|
ok = await _do_usaco_login(client, username, password)
|
|
except Exception as e:
|
|
return self._login_error(f"Login request failed: {e}")
|
|
|
|
if not ok:
|
|
return self._login_error("bad_credentials")
|
|
|
|
await _save_usaco_cookies(client)
|
|
return LoginResult(
|
|
success=True,
|
|
error="",
|
|
credentials={"username": username, "password": password},
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
USACOScraper().run_cli()
|