Problem: the submit path guarded `cookie_cache.write_text` on the presence of a `JSESSIONID` cookie, which Codeforces does not use. The file was therefore never written, breaking the fast-path on subsequent submits. Solution: replace the name-specific guard with a non-empty check so cookies are persisted whenever the browser session has any cookies, matching the unconditional save already used in the login path.
583 lines
19 KiB
Python
583 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import asyncio
|
|
import json
|
|
import re
|
|
from typing import Any
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup, Tag
|
|
from curl_cffi import requests as curl_requests
|
|
|
|
from .base import BaseScraper, extract_precision
|
|
from .models import (
|
|
ContestListResult,
|
|
ContestSummary,
|
|
LoginResult,
|
|
MetadataResult,
|
|
ProblemSummary,
|
|
SubmitResult,
|
|
TestCase,
|
|
)
|
|
from .timeouts import (
|
|
BROWSER_NAV_TIMEOUT,
|
|
BROWSER_SESSION_TIMEOUT,
|
|
BROWSER_SUBMIT_NAV_TIMEOUT,
|
|
HTTP_TIMEOUT,
|
|
)
|
|
|
|
BASE_URL = "https://codeforces.com"
|
|
API_CONTEST_LIST_URL = f"{BASE_URL}/api/contest.list"
|
|
HEADERS = {
|
|
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
|
|
}
|
|
|
|
|
|
def _text_from_pre(pre: Tag) -> str:
|
|
return (
|
|
pre.get_text(separator="\n", strip=False)
|
|
.replace("\r", "")
|
|
.replace("\xa0", " ")
|
|
.strip()
|
|
)
|
|
|
|
|
|
def _extract_limits(block: Tag) -> tuple[int, float]:
|
|
tdiv = block.find("div", class_="time-limit")
|
|
mdiv = block.find("div", class_="memory-limit")
|
|
timeout_ms = 0
|
|
memory_mb = 0.0
|
|
if tdiv:
|
|
ttxt = tdiv.get_text(" ", strip=True)
|
|
ts = re.search(r"(\d+)\s*seconds?", ttxt)
|
|
if ts:
|
|
timeout_ms = int(ts.group(1)) * 1000
|
|
if mdiv:
|
|
mtxt = mdiv.get_text(" ", strip=True)
|
|
ms = re.search(r"(\d+)\s*megabytes?", mtxt)
|
|
if ms:
|
|
memory_mb = float(ms.group(1))
|
|
return timeout_ms, memory_mb
|
|
|
|
|
|
def _group_lines_by_id(pre: Tag) -> dict[int, list[str]]:
|
|
groups: dict[int, list[str]] = {}
|
|
for div in pre.find_all("div", class_="test-example-line"):
|
|
cls = " ".join(div.get("class", []))
|
|
m = re.search(r"\btest-example-line-(\d+)\b", cls)
|
|
if not m:
|
|
continue
|
|
gid = int(m.group(1))
|
|
groups.setdefault(gid, []).append(div.get_text("", strip=False))
|
|
return groups
|
|
|
|
|
|
def _extract_title(block: Tag) -> tuple[str, str]:
|
|
t = block.find("div", class_="title")
|
|
if not t:
|
|
return "", ""
|
|
s = t.get_text(" ", strip=True)
|
|
parts = s.split(".", 1)
|
|
if len(parts) != 2:
|
|
return "", s.strip()
|
|
return parts[0].strip().upper(), parts[1].strip()
|
|
|
|
|
|
def _extract_samples(block: Tag) -> tuple[list[TestCase], bool]:
|
|
st = block.find("div", class_="sample-test")
|
|
if not isinstance(st, Tag):
|
|
return [], False
|
|
|
|
input_pres: list[Tag] = [
|
|
inp.find("pre")
|
|
for inp in st.find_all("div", class_="input")
|
|
if isinstance(inp, Tag) and inp.find("pre")
|
|
]
|
|
output_pres: list[Tag] = [
|
|
out.find("pre")
|
|
for out in st.find_all("div", class_="output")
|
|
if isinstance(out, Tag) and out.find("pre")
|
|
]
|
|
input_pres = [p for p in input_pres if isinstance(p, Tag)]
|
|
output_pres = [p for p in output_pres if isinstance(p, Tag)]
|
|
|
|
has_grouped = any(
|
|
p.find("div", class_="test-example-line") for p in input_pres + output_pres
|
|
)
|
|
if has_grouped:
|
|
inputs_by_gid: dict[int, list[str]] = {}
|
|
outputs_by_gid: dict[int, list[str]] = {}
|
|
for p in input_pres:
|
|
g = _group_lines_by_id(p)
|
|
for k, v in g.items():
|
|
inputs_by_gid.setdefault(k, []).extend(v)
|
|
for p in output_pres:
|
|
g = _group_lines_by_id(p)
|
|
for k, v in g.items():
|
|
outputs_by_gid.setdefault(k, []).extend(v)
|
|
inputs_by_gid.pop(0, None)
|
|
outputs_by_gid.pop(0, None)
|
|
keys = sorted(set(inputs_by_gid.keys()) & set(outputs_by_gid.keys()))
|
|
if keys:
|
|
samples = [
|
|
TestCase(
|
|
input="\n".join(inputs_by_gid[k]).strip(),
|
|
expected="\n".join(outputs_by_gid[k]).strip(),
|
|
)
|
|
for k in keys
|
|
]
|
|
return samples, True
|
|
|
|
inputs = [_text_from_pre(p) for p in input_pres]
|
|
outputs = [_text_from_pre(p) for p in output_pres]
|
|
n = min(len(inputs), len(outputs))
|
|
return [TestCase(input=inputs[i], expected=outputs[i]) for i in range(n)], False
|
|
|
|
|
|
def _is_interactive(block: Tag) -> bool:
|
|
ps = block.find("div", class_="problem-statement")
|
|
txt = ps.get_text(" ", strip=True) if ps else block.get_text(" ", strip=True)
|
|
return "This is an interactive problem" in txt
|
|
|
|
|
|
def _fetch_problems_html(contest_id: str) -> str:
|
|
url = f"{BASE_URL}/contest/{contest_id}/problems"
|
|
response = curl_requests.get(url, impersonate="chrome", timeout=HTTP_TIMEOUT)
|
|
response.raise_for_status()
|
|
return response.text
|
|
|
|
|
|
def _parse_all_blocks(html: str) -> list[dict[str, Any]]:
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
blocks = soup.find_all("div", class_="problem-statement")
|
|
out: list[dict[str, Any]] = []
|
|
for b in blocks:
|
|
holder = b.find_parent("div", class_="problemindexholder")
|
|
letter = (holder.get("problemindex") if holder else "").strip().upper()
|
|
name = _extract_title(b)[1]
|
|
if not letter:
|
|
continue
|
|
raw_samples, is_grouped = _extract_samples(b)
|
|
timeout_ms, memory_mb = _extract_limits(b)
|
|
interactive = _is_interactive(b)
|
|
precision = extract_precision(b.get_text(" ", strip=True))
|
|
|
|
if is_grouped and raw_samples:
|
|
combined_input = f"{len(raw_samples)}\n" + "\n".join(
|
|
tc.input for tc in raw_samples
|
|
)
|
|
combined_expected = "\n".join(tc.expected for tc in raw_samples)
|
|
individual_tests = [
|
|
TestCase(input=f"1\n{tc.input}", expected=tc.expected)
|
|
for tc in raw_samples
|
|
]
|
|
else:
|
|
combined_input = "\n".join(tc.input for tc in raw_samples)
|
|
combined_expected = "\n".join(tc.expected for tc in raw_samples)
|
|
individual_tests = raw_samples
|
|
|
|
out.append(
|
|
{
|
|
"letter": letter,
|
|
"name": name,
|
|
"combined_input": combined_input,
|
|
"combined_expected": combined_expected,
|
|
"tests": individual_tests,
|
|
"timeout_ms": timeout_ms,
|
|
"memory_mb": memory_mb,
|
|
"interactive": interactive,
|
|
"multi_test": is_grouped,
|
|
"precision": precision,
|
|
}
|
|
)
|
|
return out
|
|
|
|
|
|
def _scrape_contest_problems_sync(contest_id: str) -> list[ProblemSummary]:
|
|
html = _fetch_problems_html(contest_id)
|
|
blocks = _parse_all_blocks(html)
|
|
problems: list[ProblemSummary] = []
|
|
for b in blocks:
|
|
pid = b["letter"].upper()
|
|
problems.append(ProblemSummary(id=pid.lower(), name=b["name"]))
|
|
return problems
|
|
|
|
|
|
class CodeforcesScraper(BaseScraper):
|
|
@property
|
|
def platform_name(self) -> str:
|
|
return "codeforces"
|
|
|
|
async def scrape_contest_metadata(self, contest_id: str) -> MetadataResult:
|
|
try:
|
|
problems = await asyncio.to_thread(
|
|
_scrape_contest_problems_sync, contest_id
|
|
)
|
|
if not problems:
|
|
return self._metadata_error(
|
|
f"No problems found for contest {contest_id}"
|
|
)
|
|
return MetadataResult(
|
|
success=True,
|
|
error="",
|
|
contest_id=contest_id,
|
|
problems=problems,
|
|
url=f"https://codeforces.com/contest/{contest_id}/problem/%s",
|
|
)
|
|
except Exception as e:
|
|
return self._metadata_error(str(e))
|
|
|
|
async def scrape_contest_list(self) -> ContestListResult:
|
|
try:
|
|
r = requests.get(API_CONTEST_LIST_URL, timeout=HTTP_TIMEOUT)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
if data.get("status") != "OK":
|
|
return self._contests_error("Invalid API response")
|
|
|
|
contests: list[ContestSummary] = []
|
|
for c in data["result"]:
|
|
phase = c.get("phase")
|
|
if phase not in ("FINISHED", "BEFORE", "CODING"):
|
|
continue
|
|
cid = str(c["id"])
|
|
name = c["name"]
|
|
start_time = c.get("startTimeSeconds") if phase != "FINISHED" else None
|
|
contests.append(
|
|
ContestSummary(
|
|
id=cid,
|
|
name=name,
|
|
display_name=name,
|
|
start_time=start_time,
|
|
)
|
|
)
|
|
|
|
if not contests:
|
|
return self._contests_error("No contests found")
|
|
|
|
return ContestListResult(success=True, error="", contests=contests)
|
|
except Exception as e:
|
|
return self._contests_error(str(e))
|
|
|
|
async def stream_tests_for_category_async(self, category_id: str) -> None:
|
|
html = await asyncio.to_thread(_fetch_problems_html, category_id)
|
|
blocks = await asyncio.to_thread(_parse_all_blocks, html)
|
|
|
|
for b in blocks:
|
|
pid = b["letter"].lower()
|
|
tests: list[TestCase] = b.get("tests", [])
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"problem_id": pid,
|
|
"combined": {
|
|
"input": b.get("combined_input", ""),
|
|
"expected": b.get("combined_expected", ""),
|
|
},
|
|
"tests": [
|
|
{"input": t.input, "expected": t.expected} for t in tests
|
|
],
|
|
"timeout_ms": b.get("timeout_ms", 0),
|
|
"memory_mb": b.get("memory_mb", 0),
|
|
"interactive": bool(b.get("interactive")),
|
|
"multi_test": bool(b.get("multi_test", False)),
|
|
"precision": b.get("precision"),
|
|
}
|
|
),
|
|
flush=True,
|
|
)
|
|
|
|
async def submit(
|
|
self,
|
|
contest_id: str,
|
|
problem_id: str,
|
|
file_path: str,
|
|
language_id: str,
|
|
credentials: dict[str, str],
|
|
) -> SubmitResult:
|
|
return await asyncio.to_thread(
|
|
_submit_headless,
|
|
contest_id,
|
|
problem_id,
|
|
file_path,
|
|
language_id,
|
|
credentials,
|
|
)
|
|
|
|
async def login(self, credentials: dict[str, str]) -> LoginResult:
|
|
if not credentials.get("username") or not credentials.get("password"):
|
|
return self._login_error("Missing username or password")
|
|
return await asyncio.to_thread(_login_headless_cf, credentials)
|
|
|
|
|
|
def _login_headless_cf(credentials: dict[str, str]) -> LoginResult:
|
|
from pathlib import Path
|
|
|
|
try:
|
|
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
|
|
except ImportError:
|
|
return LoginResult(
|
|
success=False,
|
|
error="scrapling is required for Codeforces login",
|
|
)
|
|
|
|
from .atcoder import _ensure_browser
|
|
|
|
_ensure_browser()
|
|
|
|
cookie_cache = Path.home() / ".cache" / "cp-nvim" / "codeforces-cookies.json"
|
|
cookie_cache.parent.mkdir(parents=True, exist_ok=True)
|
|
saved_cookies: list[dict[str, Any]] = []
|
|
if cookie_cache.exists():
|
|
try:
|
|
saved_cookies = json.loads(cookie_cache.read_text())
|
|
except Exception:
|
|
pass
|
|
|
|
logged_in = False
|
|
login_error: str | None = None
|
|
|
|
def check_login(page):
|
|
nonlocal logged_in
|
|
logged_in = page.evaluate(
|
|
"() => Array.from(document.querySelectorAll('a'))"
|
|
".some(a => a.textContent.includes('Logout'))"
|
|
)
|
|
|
|
def login_action(page):
|
|
nonlocal login_error
|
|
try:
|
|
page.fill(
|
|
'input[name="handleOrEmail"]',
|
|
credentials.get("username", ""),
|
|
)
|
|
page.fill(
|
|
'input[name="password"]',
|
|
credentials.get("password", ""),
|
|
)
|
|
page.locator('#enterForm input[type="submit"]').click()
|
|
page.wait_for_url(
|
|
lambda url: "/enter" not in url, timeout=BROWSER_NAV_TIMEOUT
|
|
)
|
|
except Exception as e:
|
|
login_error = str(e)
|
|
|
|
try:
|
|
with StealthySession(
|
|
headless=True,
|
|
timeout=BROWSER_SESSION_TIMEOUT,
|
|
google_search=False,
|
|
cookies=saved_cookies if saved_cookies else [],
|
|
) as session:
|
|
if saved_cookies:
|
|
print(json.dumps({"status": "checking_login"}), flush=True)
|
|
session.fetch(
|
|
f"{BASE_URL}/",
|
|
page_action=check_login,
|
|
network_idle=True,
|
|
)
|
|
|
|
if not logged_in:
|
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
|
session.fetch(
|
|
f"{BASE_URL}/enter",
|
|
page_action=login_action,
|
|
solve_cloudflare=True,
|
|
)
|
|
if login_error:
|
|
return LoginResult(
|
|
success=False, error=f"Login failed: {login_error}"
|
|
)
|
|
|
|
session.fetch(
|
|
f"{BASE_URL}/",
|
|
page_action=check_login,
|
|
network_idle=True,
|
|
)
|
|
if not logged_in:
|
|
return LoginResult(
|
|
success=False, error="Login failed (bad credentials?)"
|
|
)
|
|
|
|
try:
|
|
browser_cookies = session.context.cookies()
|
|
cookie_cache.write_text(json.dumps(browser_cookies))
|
|
except Exception:
|
|
pass
|
|
|
|
return LoginResult(success=True, error="")
|
|
except Exception as e:
|
|
return LoginResult(success=False, error=str(e))
|
|
|
|
|
|
def _submit_headless(
|
|
contest_id: str,
|
|
problem_id: str,
|
|
file_path: str,
|
|
language_id: str,
|
|
credentials: dict[str, str],
|
|
_retried: bool = False,
|
|
) -> SubmitResult:
|
|
from pathlib import Path
|
|
|
|
source_code = Path(file_path).read_text()
|
|
|
|
try:
|
|
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
|
|
except ImportError:
|
|
return SubmitResult(
|
|
success=False,
|
|
error="scrapling is required for Codeforces submit",
|
|
)
|
|
|
|
from .atcoder import _ensure_browser, _solve_turnstile
|
|
|
|
_ensure_browser()
|
|
|
|
cookie_cache = Path.home() / ".cache" / "cp-nvim" / "codeforces-cookies.json"
|
|
cookie_cache.parent.mkdir(parents=True, exist_ok=True)
|
|
saved_cookies: list[dict[str, Any]] = []
|
|
if cookie_cache.exists():
|
|
try:
|
|
saved_cookies = json.loads(cookie_cache.read_text())
|
|
except Exception:
|
|
pass
|
|
|
|
logged_in = cookie_cache.exists() and not _retried
|
|
login_error: str | None = None
|
|
submit_error: str | None = None
|
|
needs_relogin = False
|
|
|
|
def check_login(page):
|
|
nonlocal logged_in
|
|
logged_in = page.evaluate(
|
|
"() => Array.from(document.querySelectorAll('a'))"
|
|
".some(a => a.textContent.includes('Logout'))"
|
|
)
|
|
|
|
def login_action(page):
|
|
nonlocal login_error
|
|
try:
|
|
page.fill(
|
|
'input[name="handleOrEmail"]',
|
|
credentials.get("username", ""),
|
|
)
|
|
page.fill(
|
|
'input[name="password"]',
|
|
credentials.get("password", ""),
|
|
)
|
|
page.locator('#enterForm input[type="submit"]').click()
|
|
page.wait_for_url(
|
|
lambda url: "/enter" not in url, timeout=BROWSER_NAV_TIMEOUT
|
|
)
|
|
except Exception as e:
|
|
login_error = str(e)
|
|
|
|
def submit_action(page):
|
|
nonlocal submit_error, needs_relogin
|
|
if "/enter" in page.url or "/login" in page.url:
|
|
needs_relogin = True
|
|
return
|
|
try:
|
|
_solve_turnstile(page)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
page.select_option(
|
|
'select[name="submittedProblemIndex"]',
|
|
problem_id.upper(),
|
|
)
|
|
page.select_option('select[name="programTypeId"]', language_id)
|
|
page.evaluate(
|
|
"""(code) => {
|
|
const cm = document.querySelector('.CodeMirror');
|
|
if (cm && cm.CodeMirror) {
|
|
cm.CodeMirror.setValue(code);
|
|
}
|
|
const ta = document.querySelector('textarea[name="source"]');
|
|
if (ta) ta.value = code;
|
|
}""",
|
|
source_code,
|
|
)
|
|
page.locator("form.submit-form input.submit").click(no_wait_after=True)
|
|
try:
|
|
page.wait_for_url(
|
|
lambda url: "/my" in url or "/status" in url,
|
|
timeout=BROWSER_SUBMIT_NAV_TIMEOUT["codeforces"],
|
|
)
|
|
except Exception:
|
|
err_el = page.query_selector("span.error")
|
|
if err_el:
|
|
submit_error = err_el.inner_text().strip()
|
|
else:
|
|
submit_error = "Submit failed: page did not navigate"
|
|
except Exception as e:
|
|
submit_error = str(e)
|
|
|
|
try:
|
|
with StealthySession(
|
|
headless=True,
|
|
timeout=BROWSER_SESSION_TIMEOUT,
|
|
google_search=False,
|
|
cookies=saved_cookies if (cookie_cache.exists() and not _retried) else [],
|
|
) as session:
|
|
if not (cookie_cache.exists() and not _retried):
|
|
print(json.dumps({"status": "checking_login"}), flush=True)
|
|
session.fetch(
|
|
f"{BASE_URL}/",
|
|
page_action=check_login,
|
|
network_idle=True,
|
|
)
|
|
|
|
if not logged_in:
|
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
|
session.fetch(
|
|
f"{BASE_URL}/enter",
|
|
page_action=login_action,
|
|
solve_cloudflare=True,
|
|
)
|
|
if login_error:
|
|
return SubmitResult(
|
|
success=False, error=f"Login failed: {login_error}"
|
|
)
|
|
|
|
print(json.dumps({"status": "submitting"}), flush=True)
|
|
session.fetch(
|
|
f"{BASE_URL}/contest/{contest_id}/submit",
|
|
page_action=submit_action,
|
|
solve_cloudflare=False,
|
|
)
|
|
|
|
try:
|
|
browser_cookies = session.context.cookies()
|
|
if browser_cookies:
|
|
cookie_cache.write_text(json.dumps(browser_cookies))
|
|
except Exception:
|
|
pass
|
|
|
|
if needs_relogin and not _retried:
|
|
cookie_cache.unlink(missing_ok=True)
|
|
return _submit_headless(
|
|
contest_id,
|
|
problem_id,
|
|
file_path,
|
|
language_id,
|
|
credentials,
|
|
_retried=True,
|
|
)
|
|
|
|
if submit_error:
|
|
return SubmitResult(success=False, error=submit_error)
|
|
|
|
return SubmitResult(
|
|
success=True,
|
|
error="",
|
|
submission_id="",
|
|
verdict="submitted",
|
|
)
|
|
except Exception as e:
|
|
return SubmitResult(success=False, error=str(e))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
CodeforcesScraper().run_cli()
|