feat(codeforces): implement submit; cache CSES token (#300)

## Problem

Codeforces submit was a stub. CSES submit re-ran the full login flow on
every invocation (~1.5s overhead).

## Solution

**Codeforces**: headless browser submit via StealthySession (same
pattern as AtCoder). Solves Cloudflare Turnstile on login, uploads
source via file input, caches cookies at
`~/.cache/cp-nvim/codeforces-cookies.json` so repeat submits skip login.

**CSES**: persist the API token in credentials via a `credentials`
ndjson event. Subsequent submits validate the cached token with a single
GET before falling back to full login.

Also includes a vimdoc table of contents.
This commit is contained in:
Barrett Ruth 2026-03-05 10:37:39 -05:00 committed by GitHub
parent e9f72dfbbc
commit 6fcb5d1bbc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 307 additions and 96 deletions

View file

@ -9,6 +9,7 @@ from typing import Any
import httpx
from .base import BaseScraper, extract_precision
from .timeouts import HTTP_TIMEOUT, SUBMIT_POLL_TIMEOUT
from .models import (
ContestListResult,
ContestSummary,
@ -26,7 +27,6 @@ TASK_PATH = "/problemset/task/{id}"
HEADERS = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
TIMEOUT_S = 15.0
CONNECTIONS = 8
CSES_LANGUAGES: dict[str, dict[str, str]] = {
@ -78,7 +78,7 @@ def snake_to_title(name: str) -> str:
async def fetch_text(client: httpx.AsyncClient, path: str) -> str:
r = await client.get(BASE_URL + path, headers=HEADERS, timeout=TIMEOUT_S)
r = await client.get(BASE_URL + path, headers=HEADERS, timeout=HTTP_TIMEOUT)
r.raise_for_status()
return r.text
@ -290,7 +290,7 @@ class CSESScraper(BaseScraper):
password: str,
) -> str | None:
login_page = await client.get(
f"{BASE_URL}/login", headers=HEADERS, timeout=TIMEOUT_S
f"{BASE_URL}/login", headers=HEADERS, timeout=HTTP_TIMEOUT
)
csrf_match = re.search(r'name="csrf_token" value="([^"]+)"', login_page.text)
if not csrf_match:
@ -304,20 +304,20 @@ class CSESScraper(BaseScraper):
"pass": password,
},
headers=HEADERS,
timeout=TIMEOUT_S,
timeout=HTTP_TIMEOUT,
)
if "Invalid username or password" in login_resp.text:
return None
api_resp = await client.post(
f"{API_URL}/login", headers=HEADERS, timeout=TIMEOUT_S
f"{API_URL}/login", headers=HEADERS, timeout=HTTP_TIMEOUT
)
api_data = api_resp.json()
token: str = api_data["X-Auth-Token"]
auth_url: str = api_data["authentication_url"]
auth_page = await client.get(auth_url, headers=HEADERS, timeout=TIMEOUT_S)
auth_page = await client.get(auth_url, headers=HEADERS, timeout=HTTP_TIMEOUT)
auth_csrf = re.search(r'name="csrf_token" value="([^"]+)"', auth_page.text)
form_token = re.search(r'name="token" value="([^"]+)"', auth_page.text)
if not auth_csrf or not form_token:
@ -330,18 +330,29 @@ class CSESScraper(BaseScraper):
"token": form_token.group(1),
},
headers=HEADERS,
timeout=TIMEOUT_S,
timeout=HTTP_TIMEOUT,
)
check = await client.get(
f"{API_URL}/login",
headers={"X-Auth-Token": token, **HEADERS},
timeout=TIMEOUT_S,
timeout=HTTP_TIMEOUT,
)
if check.status_code != 200:
return None
return token
async def _check_token(self, client: httpx.AsyncClient, token: str) -> bool:
try:
r = await client.get(
f"{API_URL}/login",
headers={"X-Auth-Token": token, **HEADERS},
timeout=HTTP_TIMEOUT,
)
return r.status_code == 200
except Exception:
return False
async def submit(
self,
contest_id: str,
@ -356,11 +367,30 @@ class CSESScraper(BaseScraper):
return self._submit_error("Missing credentials. Use :CP login cses")
async with httpx.AsyncClient(follow_redirects=True) as client:
print(json.dumps({"status": "logging_in"}), flush=True)
token = credentials.get("token")
if token:
print(json.dumps({"status": "checking_login"}), flush=True)
if not await self._check_token(client, token):
token = None
token = await self._web_login(client, username, password)
if not token:
return self._submit_error("Login failed (bad credentials?)")
print(json.dumps({"status": "logging_in"}), flush=True)
token = await self._web_login(client, username, password)
if not token:
return self._submit_error("Login failed (bad credentials?)")
print(
json.dumps(
{
"credentials": {
"username": username,
"password": password,
"token": token,
}
}
),
flush=True,
)
print(json.dumps({"status": "submitting"}), flush=True)
@ -383,7 +413,7 @@ class CSESScraper(BaseScraper):
"Content-Type": "application/json",
**HEADERS,
},
timeout=TIMEOUT_S,
timeout=HTTP_TIMEOUT,
)
if r.status_code not in range(200, 300):
@ -406,7 +436,7 @@ class CSESScraper(BaseScraper):
"X-Auth-Token": token,
**HEADERS,
},
timeout=30.0,
timeout=SUBMIT_POLL_TIMEOUT,
)
if r.status_code == 200:
info = r.json()