Problem: Wrong credentials during login produced two bugs: scrapers wrapped the `bad_credentials` error code in `"Login failed: ..."`, causing double-prefixed messages in the UI; and `credentials.lua` did not clear cached credentials before re-prompting or on failure, leaving stale bad creds in the cache. Solution: Standardize all scrapers to emit `"bad_credentials"` as the raw error code. Add `LOGIN_ERRORS` map in `constants.lua` to translate it to a human-readable string in both `credentials.lua` and `submit.lua`. Fix `credentials.lua` to clear credentials on failure in both the fresh-prompt and cached-creds-fail paths. For AtCoder and Codeforces, replace `wait_for_url` with `wait_for_function` to detect the login error element immediately rather than waiting the full 10s navigation timeout. Also add "Remember me" checkbox check on Codeforces login.
597 lines
20 KiB
Python
597 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import asyncio
|
|
import json
|
|
import re
|
|
from typing import Any
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup, Tag
|
|
|
|
from .base import (
|
|
BaseScraper,
|
|
clear_platform_cookies,
|
|
extract_precision,
|
|
load_platform_cookies,
|
|
save_platform_cookies,
|
|
)
|
|
from .models import (
|
|
ContestListResult,
|
|
ContestSummary,
|
|
LoginResult,
|
|
MetadataResult,
|
|
ProblemSummary,
|
|
SubmitResult,
|
|
TestCase,
|
|
)
|
|
from .timeouts import (
|
|
BROWSER_NAV_TIMEOUT,
|
|
BROWSER_SESSION_TIMEOUT,
|
|
BROWSER_SUBMIT_NAV_TIMEOUT,
|
|
HTTP_TIMEOUT,
|
|
)
|
|
|
|
BASE_URL = "https://codeforces.com"
|
|
API_CONTEST_LIST_URL = f"{BASE_URL}/api/contest.list"
|
|
HEADERS = {
|
|
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
|
|
}
|
|
|
|
|
|
def _text_from_pre(pre: Tag) -> str:
|
|
return (
|
|
pre.get_text(separator="\n", strip=False)
|
|
.replace("\r", "")
|
|
.replace("\xa0", " ")
|
|
.strip()
|
|
)
|
|
|
|
|
|
def _extract_limits(block: Tag) -> tuple[int, float]:
|
|
tdiv = block.find("div", class_="time-limit")
|
|
mdiv = block.find("div", class_="memory-limit")
|
|
timeout_ms = 0
|
|
memory_mb = 0.0
|
|
if tdiv:
|
|
ttxt = tdiv.get_text(" ", strip=True)
|
|
ts = re.search(r"(\d+)\s*seconds?", ttxt)
|
|
if ts:
|
|
timeout_ms = int(ts.group(1)) * 1000
|
|
if mdiv:
|
|
mtxt = mdiv.get_text(" ", strip=True)
|
|
ms = re.search(r"(\d+)\s*megabytes?", mtxt)
|
|
if ms:
|
|
memory_mb = float(ms.group(1))
|
|
return timeout_ms, memory_mb
|
|
|
|
|
|
def _group_lines_by_id(pre: Tag) -> dict[int, list[str]]:
|
|
groups: dict[int, list[str]] = {}
|
|
for div in pre.find_all("div", class_="test-example-line"):
|
|
cls = " ".join(div.get("class", []))
|
|
m = re.search(r"\btest-example-line-(\d+)\b", cls)
|
|
if not m:
|
|
continue
|
|
gid = int(m.group(1))
|
|
groups.setdefault(gid, []).append(div.get_text("", strip=False))
|
|
return groups
|
|
|
|
|
|
def _extract_title(block: Tag) -> tuple[str, str]:
|
|
t = block.find("div", class_="title")
|
|
if not t:
|
|
return "", ""
|
|
s = t.get_text(" ", strip=True)
|
|
parts = s.split(".", 1)
|
|
if len(parts) != 2:
|
|
return "", s.strip()
|
|
return parts[0].strip().upper(), parts[1].strip()
|
|
|
|
|
|
def _extract_samples(block: Tag) -> tuple[list[TestCase], bool]:
|
|
st = block.find("div", class_="sample-test")
|
|
if not isinstance(st, Tag):
|
|
return [], False
|
|
|
|
input_pres: list[Tag] = [
|
|
inp.find("pre")
|
|
for inp in st.find_all("div", class_="input")
|
|
if isinstance(inp, Tag) and inp.find("pre")
|
|
]
|
|
output_pres: list[Tag] = [
|
|
out.find("pre")
|
|
for out in st.find_all("div", class_="output")
|
|
if isinstance(out, Tag) and out.find("pre")
|
|
]
|
|
input_pres = [p for p in input_pres if isinstance(p, Tag)]
|
|
output_pres = [p for p in output_pres if isinstance(p, Tag)]
|
|
|
|
has_grouped = any(
|
|
p.find("div", class_="test-example-line") for p in input_pres + output_pres
|
|
)
|
|
if has_grouped:
|
|
inputs_by_gid: dict[int, list[str]] = {}
|
|
outputs_by_gid: dict[int, list[str]] = {}
|
|
for p in input_pres:
|
|
g = _group_lines_by_id(p)
|
|
for k, v in g.items():
|
|
inputs_by_gid.setdefault(k, []).extend(v)
|
|
for p in output_pres:
|
|
g = _group_lines_by_id(p)
|
|
for k, v in g.items():
|
|
outputs_by_gid.setdefault(k, []).extend(v)
|
|
inputs_by_gid.pop(0, None)
|
|
outputs_by_gid.pop(0, None)
|
|
keys = sorted(set(inputs_by_gid.keys()) & set(outputs_by_gid.keys()))
|
|
if keys:
|
|
samples = [
|
|
TestCase(
|
|
input="\n".join(inputs_by_gid[k]).strip(),
|
|
expected="\n".join(outputs_by_gid[k]).strip(),
|
|
)
|
|
for k in keys
|
|
]
|
|
return samples, True
|
|
|
|
inputs = [_text_from_pre(p) for p in input_pres]
|
|
outputs = [_text_from_pre(p) for p in output_pres]
|
|
n = min(len(inputs), len(outputs))
|
|
return [TestCase(input=inputs[i], expected=outputs[i]) for i in range(n)], False
|
|
|
|
|
|
def _is_interactive(block: Tag) -> bool:
|
|
ps = block.find("div", class_="problem-statement")
|
|
txt = ps.get_text(" ", strip=True) if ps else block.get_text(" ", strip=True)
|
|
return "This is an interactive problem" in txt
|
|
|
|
|
|
def _fetch_problems_html(contest_id: str) -> str:
|
|
try:
|
|
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
|
|
except ImportError:
|
|
raise RuntimeError("scrapling is required for Codeforces metadata")
|
|
|
|
from .atcoder import _ensure_browser
|
|
|
|
_ensure_browser()
|
|
|
|
url = f"{BASE_URL}/contest/{contest_id}/problems"
|
|
html = ""
|
|
|
|
def page_action(page):
|
|
nonlocal html
|
|
html = page.content()
|
|
|
|
with StealthySession(
|
|
headless=True,
|
|
timeout=BROWSER_SESSION_TIMEOUT,
|
|
google_search=False,
|
|
) as session:
|
|
session.fetch(url, page_action=page_action, solve_cloudflare=True)
|
|
|
|
return html
|
|
|
|
|
|
def _parse_all_blocks(html: str) -> list[dict[str, Any]]:
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
blocks = soup.find_all("div", class_="problem-statement")
|
|
out: list[dict[str, Any]] = []
|
|
for b in blocks:
|
|
holder = b.find_parent("div", class_="problemindexholder")
|
|
letter = (holder.get("problemindex") if holder else "").strip().upper()
|
|
name = _extract_title(b)[1]
|
|
if not letter:
|
|
continue
|
|
raw_samples, is_grouped = _extract_samples(b)
|
|
timeout_ms, memory_mb = _extract_limits(b)
|
|
interactive = _is_interactive(b)
|
|
precision = extract_precision(b.get_text(" ", strip=True))
|
|
|
|
if is_grouped and raw_samples:
|
|
combined_input = f"{len(raw_samples)}\n" + "\n".join(
|
|
tc.input for tc in raw_samples
|
|
)
|
|
combined_expected = "\n".join(tc.expected for tc in raw_samples)
|
|
individual_tests = [
|
|
TestCase(input=f"1\n{tc.input}", expected=tc.expected)
|
|
for tc in raw_samples
|
|
]
|
|
else:
|
|
combined_input = "\n".join(tc.input for tc in raw_samples)
|
|
combined_expected = "\n".join(tc.expected for tc in raw_samples)
|
|
individual_tests = raw_samples
|
|
|
|
out.append(
|
|
{
|
|
"letter": letter,
|
|
"name": name,
|
|
"combined_input": combined_input,
|
|
"combined_expected": combined_expected,
|
|
"tests": individual_tests,
|
|
"timeout_ms": timeout_ms,
|
|
"memory_mb": memory_mb,
|
|
"interactive": interactive,
|
|
"multi_test": is_grouped,
|
|
"precision": precision,
|
|
}
|
|
)
|
|
return out
|
|
|
|
|
|
def _scrape_contest_problems_sync(contest_id: str) -> list[ProblemSummary]:
|
|
html = _fetch_problems_html(contest_id)
|
|
blocks = _parse_all_blocks(html)
|
|
problems: list[ProblemSummary] = []
|
|
for b in blocks:
|
|
pid = b["letter"].upper()
|
|
problems.append(ProblemSummary(id=pid.lower(), name=b["name"]))
|
|
return problems
|
|
|
|
|
|
class CodeforcesScraper(BaseScraper):
|
|
@property
|
|
def platform_name(self) -> str:
|
|
return "codeforces"
|
|
|
|
async def scrape_contest_metadata(self, contest_id: str) -> MetadataResult:
|
|
try:
|
|
problems = await asyncio.to_thread(
|
|
_scrape_contest_problems_sync, contest_id
|
|
)
|
|
if not problems:
|
|
return self._metadata_error(
|
|
f"No problems found for contest {contest_id}"
|
|
)
|
|
return MetadataResult(
|
|
success=True,
|
|
error="",
|
|
contest_id=contest_id,
|
|
problems=problems,
|
|
url=f"https://codeforces.com/contest/{contest_id}/problem/%s",
|
|
contest_url=f"https://codeforces.com/contest/{contest_id}",
|
|
standings_url=f"https://codeforces.com/contest/{contest_id}/standings",
|
|
)
|
|
except Exception as e:
|
|
return self._metadata_error(str(e))
|
|
|
|
async def scrape_contest_list(self) -> ContestListResult:
|
|
try:
|
|
r = requests.get(API_CONTEST_LIST_URL, timeout=HTTP_TIMEOUT)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
if data.get("status") != "OK":
|
|
return self._contests_error("Invalid API response")
|
|
|
|
contests: list[ContestSummary] = []
|
|
for c in data["result"]:
|
|
phase = c.get("phase")
|
|
if phase not in ("FINISHED", "BEFORE", "CODING"):
|
|
continue
|
|
cid = str(c["id"])
|
|
name = c["name"]
|
|
start_time = c.get("startTimeSeconds") if phase != "FINISHED" else None
|
|
contests.append(
|
|
ContestSummary(
|
|
id=cid,
|
|
name=name,
|
|
display_name=name,
|
|
start_time=start_time,
|
|
)
|
|
)
|
|
|
|
if not contests:
|
|
return self._contests_error("No contests found")
|
|
|
|
return ContestListResult(success=True, error="", contests=contests)
|
|
except Exception as e:
|
|
return self._contests_error(str(e))
|
|
|
|
async def stream_tests_for_category_async(self, category_id: str) -> None:
|
|
html = await asyncio.to_thread(_fetch_problems_html, category_id)
|
|
blocks = await asyncio.to_thread(_parse_all_blocks, html)
|
|
|
|
for b in blocks:
|
|
pid = b["letter"].lower()
|
|
tests: list[TestCase] = b.get("tests", [])
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"problem_id": pid,
|
|
"combined": {
|
|
"input": b.get("combined_input", ""),
|
|
"expected": b.get("combined_expected", ""),
|
|
},
|
|
"tests": [
|
|
{"input": t.input, "expected": t.expected} for t in tests
|
|
],
|
|
"timeout_ms": b.get("timeout_ms", 0),
|
|
"memory_mb": b.get("memory_mb", 0),
|
|
"interactive": bool(b.get("interactive")),
|
|
"multi_test": bool(b.get("multi_test", False)),
|
|
"precision": b.get("precision"),
|
|
}
|
|
),
|
|
flush=True,
|
|
)
|
|
|
|
async def submit(
|
|
self,
|
|
contest_id: str,
|
|
problem_id: str,
|
|
file_path: str,
|
|
language_id: str,
|
|
credentials: dict[str, str],
|
|
) -> SubmitResult:
|
|
return await asyncio.to_thread(
|
|
_submit_headless,
|
|
contest_id,
|
|
problem_id,
|
|
file_path,
|
|
language_id,
|
|
credentials,
|
|
)
|
|
|
|
async def login(self, credentials: dict[str, str]) -> LoginResult:
|
|
if not credentials.get("username") or not credentials.get("password"):
|
|
return self._login_error("Missing username or password")
|
|
return await asyncio.to_thread(_login_headless_cf, credentials)
|
|
|
|
|
|
def _cf_check_logged_in(page) -> bool:
|
|
return page.evaluate(
|
|
"() => Array.from(document.querySelectorAll('a'))"
|
|
".some(a => a.textContent.includes('Logout'))"
|
|
)
|
|
|
|
|
|
def _cf_login_action(credentials: dict[str, str]):
|
|
login_error: str | None = None
|
|
|
|
def login_action(page):
|
|
nonlocal login_error
|
|
try:
|
|
page.wait_for_selector('input[name="handleOrEmail"]', timeout=60000)
|
|
page.fill('input[name="handleOrEmail"]', credentials.get("username", ""))
|
|
page.fill('input[name="password"]', credentials.get("password", ""))
|
|
page.locator('#enterForm input[name="remember"]').check()
|
|
page.locator('#enterForm input[type="submit"]').click()
|
|
page.wait_for_function(
|
|
"() => !window.location.href.includes('/enter') || !!document.querySelector('#enterForm span.error')",
|
|
timeout=BROWSER_NAV_TIMEOUT,
|
|
)
|
|
if "/enter" in page.url:
|
|
login_error = "bad_credentials"
|
|
return
|
|
except Exception as e:
|
|
login_error = str(e)
|
|
|
|
return login_action, lambda: login_error
|
|
|
|
|
|
def _login_headless_cf(credentials: dict[str, str]) -> LoginResult:
|
|
try:
|
|
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
|
|
except ImportError:
|
|
return LoginResult(
|
|
success=False,
|
|
error="scrapling is required for Codeforces login",
|
|
)
|
|
|
|
from .atcoder import _ensure_browser
|
|
|
|
_ensure_browser()
|
|
|
|
saved_cookies = load_platform_cookies("codeforces") or []
|
|
|
|
if saved_cookies:
|
|
print(json.dumps({"status": "checking_login"}), flush=True)
|
|
logged_in = False
|
|
|
|
def check_action(page):
|
|
nonlocal logged_in
|
|
logged_in = _cf_check_logged_in(page)
|
|
|
|
try:
|
|
with StealthySession(
|
|
headless=True,
|
|
timeout=BROWSER_SESSION_TIMEOUT,
|
|
google_search=False,
|
|
cookies=saved_cookies,
|
|
) as session:
|
|
session.fetch(
|
|
f"{BASE_URL}/", page_action=check_action, solve_cloudflare=True
|
|
)
|
|
if logged_in:
|
|
return LoginResult(success=True, error="")
|
|
except Exception:
|
|
pass
|
|
|
|
login_action, get_error = _cf_login_action(credentials)
|
|
|
|
try:
|
|
with StealthySession(
|
|
headless=True,
|
|
timeout=BROWSER_SESSION_TIMEOUT,
|
|
google_search=False,
|
|
) as session:
|
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
|
session.fetch(
|
|
f"{BASE_URL}/enter",
|
|
page_action=login_action,
|
|
solve_cloudflare=True,
|
|
)
|
|
login_error = get_error()
|
|
if login_error == "bad_credentials":
|
|
return LoginResult(success=False, error="bad_credentials")
|
|
elif login_error:
|
|
return LoginResult(success=False, error=f"Login failed: {login_error}")
|
|
|
|
logged_in = False
|
|
|
|
def verify_action(page):
|
|
nonlocal logged_in
|
|
logged_in = _cf_check_logged_in(page)
|
|
|
|
session.fetch(f"{BASE_URL}/", page_action=verify_action, network_idle=True)
|
|
if not logged_in:
|
|
return LoginResult(
|
|
success=False, error="bad_credentials"
|
|
)
|
|
|
|
try:
|
|
browser_cookies = session.context.cookies()
|
|
if any(c.get("name") == "X-User-Sha1" for c in browser_cookies):
|
|
save_platform_cookies("codeforces", browser_cookies)
|
|
except Exception:
|
|
pass
|
|
|
|
return LoginResult(success=True, error="")
|
|
except Exception as e:
|
|
return LoginResult(success=False, error=str(e))
|
|
|
|
|
|
def _submit_headless(
|
|
contest_id: str,
|
|
problem_id: str,
|
|
file_path: str,
|
|
language_id: str,
|
|
credentials: dict[str, str],
|
|
_retried: bool = False,
|
|
) -> SubmitResult:
|
|
from pathlib import Path
|
|
|
|
source_code = Path(file_path).read_text()
|
|
|
|
try:
|
|
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
|
|
except ImportError:
|
|
return SubmitResult(
|
|
success=False,
|
|
error="scrapling is required for Codeforces submit",
|
|
)
|
|
|
|
from .atcoder import _ensure_browser, _solve_turnstile
|
|
|
|
_ensure_browser()
|
|
|
|
saved_cookies: list[dict[str, Any]] = []
|
|
if not _retried:
|
|
saved_cookies = load_platform_cookies("codeforces") or []
|
|
|
|
logged_in = bool(saved_cookies)
|
|
submit_error: str | None = None
|
|
needs_relogin = False
|
|
|
|
def check_login(page):
|
|
nonlocal logged_in
|
|
logged_in = _cf_check_logged_in(page)
|
|
|
|
_login_action, _get_login_error = _cf_login_action(credentials)
|
|
|
|
def submit_action(page):
|
|
nonlocal submit_error, needs_relogin
|
|
if "/enter" in page.url or "/login" in page.url:
|
|
needs_relogin = True
|
|
return
|
|
_solve_turnstile(page)
|
|
try:
|
|
page.select_option(
|
|
'select[name="submittedProblemIndex"]',
|
|
problem_id.upper(),
|
|
)
|
|
page.select_option('select[name="programTypeId"]', language_id)
|
|
page.evaluate(
|
|
"""(code) => {
|
|
const cm = document.querySelector('.CodeMirror');
|
|
if (cm && cm.CodeMirror) {
|
|
cm.CodeMirror.setValue(code);
|
|
}
|
|
const ta = document.querySelector('textarea[name="source"]');
|
|
if (ta) ta.value = code;
|
|
}""",
|
|
source_code,
|
|
)
|
|
page.locator("form.submit-form input.submit").click(no_wait_after=True)
|
|
try:
|
|
page.wait_for_url(
|
|
lambda url: "/my" in url or "/status" in url,
|
|
timeout=BROWSER_SUBMIT_NAV_TIMEOUT["codeforces"],
|
|
)
|
|
except Exception:
|
|
err_el = page.query_selector("span.error")
|
|
if err_el:
|
|
submit_error = err_el.inner_text().strip()
|
|
else:
|
|
submit_error = "Submit failed: page did not navigate"
|
|
except Exception as e:
|
|
submit_error = str(e)
|
|
|
|
try:
|
|
with StealthySession(
|
|
headless=True,
|
|
timeout=BROWSER_SESSION_TIMEOUT,
|
|
google_search=False,
|
|
cookies=saved_cookies if saved_cookies else [],
|
|
) as session:
|
|
if not _retried and saved_cookies:
|
|
print(json.dumps({"status": "checking_login"}), flush=True)
|
|
session.fetch(
|
|
f"{BASE_URL}/", page_action=check_login, solve_cloudflare=True
|
|
)
|
|
|
|
if not logged_in:
|
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
|
session.fetch(
|
|
f"{BASE_URL}/enter",
|
|
page_action=_login_action,
|
|
solve_cloudflare=True,
|
|
)
|
|
login_error = _get_login_error()
|
|
if login_error == "bad_credentials":
|
|
return SubmitResult(success=False, error="bad_credentials")
|
|
elif login_error:
|
|
return SubmitResult(
|
|
success=False, error=f"Login failed: {login_error}"
|
|
)
|
|
logged_in = True
|
|
|
|
print(json.dumps({"status": "submitting"}), flush=True)
|
|
session.fetch(
|
|
f"{BASE_URL}/contest/{contest_id}/submit",
|
|
page_action=submit_action,
|
|
solve_cloudflare=False,
|
|
)
|
|
|
|
try:
|
|
browser_cookies = session.context.cookies()
|
|
if any(c.get("name") == "X-User-Sha1" for c in browser_cookies):
|
|
save_platform_cookies("codeforces", browser_cookies)
|
|
except Exception:
|
|
pass
|
|
|
|
if needs_relogin and not _retried:
|
|
clear_platform_cookies("codeforces")
|
|
return _submit_headless(
|
|
contest_id,
|
|
problem_id,
|
|
file_path,
|
|
language_id,
|
|
credentials,
|
|
_retried=True,
|
|
)
|
|
|
|
if submit_error:
|
|
return SubmitResult(success=False, error=submit_error)
|
|
|
|
return SubmitResult(
|
|
success=True,
|
|
error="",
|
|
submission_id="",
|
|
verdict="submitted",
|
|
)
|
|
except Exception as e:
|
|
return SubmitResult(success=False, error=str(e))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
CodeforcesScraper().run_cli()
|