feat(scraper): add precision extraction, start_time, and submit support

Problem: problem pages contain floating-point precision requirements and
contest start timestamps that were not being extracted or stored. The
submit workflow also needed a foundation in the scraper layer.

Solution: add extract_precision() to base.py and propagate through all
scrapers into cache. Add start_time to ContestSummary and extract it
from AtCoder and Codeforces. Add SubmitResult model, abstract submit()
method, submit CLI case with get_language_id() resolution, stdin/env_extra
support in run_scraper, and a full AtCoder submit implementation; stub
the remaining platforms.
This commit is contained in:
Barrett Ruth 2026-03-03 14:51:42 -05:00 committed by Barrett Ruth
parent 865e3b5928
commit 90bd13580b
9 changed files with 245 additions and 20 deletions

View file

@ -27,7 +27,7 @@
---@field multi_test? boolean
---@field memory_mb? number
---@field timeout_ms? number
---@field epsilon? number
---@field precision? number
---@field combined_test? CombinedTest
---@field test_cases TestCase[]
@ -231,7 +231,8 @@ function M.set_test_cases(
timeout_ms,
memory_mb,
interactive,
multi_test
multi_test,
precision
)
vim.validate({
platform = { platform, 'string' },
@ -243,6 +244,7 @@ function M.set_test_cases(
memory_mb = { memory_mb, { 'number', 'nil' }, true },
interactive = { interactive, { 'boolean', 'nil' }, true },
multi_test = { multi_test, { 'boolean', 'nil' }, true },
precision = { precision, { 'number', 'nil' }, true },
})
local index = cache_data[platform][contest_id].index_map[problem_id]
@ -253,6 +255,7 @@ function M.set_test_cases(
cache_data[platform][contest_id].problems[index].memory_mb = memory_mb
cache_data[platform][contest_id].problems[index].interactive = interactive
cache_data[platform][contest_id].problems[index].multi_test = multi_test
cache_data[platform][contest_id].problems[index].precision = precision
M.save()
end
@ -278,7 +281,7 @@ end
---@param contest_id string
---@param problem_id? string
---@return number?
function M.get_epsilon(platform, contest_id, problem_id)
function M.get_precision(platform, contest_id, problem_id)
vim.validate({
platform = { platform, 'string' },
contest_id = { contest_id, 'string' },
@ -299,7 +302,7 @@ function M.get_epsilon(platform, contest_id, problem_id)
end
local problem_data = cache_data[platform][contest_id].problems[index]
return problem_data and problem_data.epsilon or nil
return problem_data and problem_data.precision or nil
end
---@param file_path string
@ -349,11 +352,24 @@ function M.set_contest_summaries(platform, contests)
cache_data[platform][contest.id] = cache_data[platform][contest.id] or {}
cache_data[platform][contest.id].display_name = contest.display_name
cache_data[platform][contest.id].name = contest.name
if contest.start_time then
cache_data[platform][contest.id].start_time = contest.start_time
end
end
M.save()
end
---@param platform string
---@param contest_id string
---@return integer?
function M.get_contest_start_time(platform, contest_id)
if not cache_data[platform] or not cache_data[platform][contest_id] then
return nil
end
return cache_data[platform][contest_id].start_time
end
function M.clear_all()
cache_data = {}
M.save()

View file

@ -56,6 +56,12 @@ local function run_scraper(platform, subcommand, args, opts)
env.PYTHONPATH = ''
env.CONDA_PREFIX = ''
if opts and opts.env_extra then
for k, v in pairs(opts.env_extra) do
env[k] = v
end
end
if opts and opts.ndjson then
local uv = vim.uv
local stdout = uv.new_pipe(false)
@ -126,6 +132,9 @@ local function run_scraper(platform, subcommand, args, opts)
end
local sysopts = { text = true, timeout = 30000, env = env, cwd = plugin_path }
if opts and opts.stdin then
sysopts.stdin = opts.stdin
end
if opts and opts.sync then
local result = vim.system(cmd, sysopts):wait()
return syshandle(result)
@ -228,6 +237,7 @@ function M.scrape_all_tests(platform, contest_id, callback)
memory_mb = ev.memory_mb or 0,
interactive = ev.interactive or false,
multi_test = ev.multi_test or false,
precision = ev.precision,
problem_id = ev.problem_id,
})
end
@ -236,4 +246,21 @@ function M.scrape_all_tests(platform, contest_id, callback)
})
end
function M.submit(platform, contest_id, problem_id, language, source_code, credentials, callback)
local creds_json = vim.json.encode(credentials)
run_scraper(platform, 'submit', { contest_id, problem_id, language }, {
stdin = source_code,
env_extra = { CP_CREDENTIALS = creds_json },
on_exit = function(result)
if type(callback) == 'function' then
if result and result.success then
callback(result.data or { success = true })
else
callback({ success = false, error = result and result.error or 'unknown' })
end
end
end,
})
end
return M

View file

@ -130,7 +130,8 @@ local function start_tests(platform, contest_id, problems)
ev.timeout_ms or 0,
ev.memory_mb or 0,
ev.interactive,
ev.multi_test
ev.multi_test,
ev.precision
)
local io_state = state.get_io_view_state()

View file

@ -14,13 +14,14 @@ from bs4 import BeautifulSoup, Tag
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from .base import BaseScraper
from .base import BaseScraper, extract_precision
from .models import (
CombinedTest,
ContestListResult,
ContestSummary,
MetadataResult,
ProblemSummary,
SubmitResult,
TestCase,
TestsResult,
)
@ -121,6 +122,23 @@ def _parse_last_page(html: str) -> int:
return max(nums) if nums else 1
def _parse_start_time(tr: Tag) -> int | None:
tds = tr.select("td")
if not tds:
return None
time_el = tds[0].select_one("time.fixtime-full")
if not time_el:
return None
text = time_el.get_text(strip=True)
try:
from datetime import datetime
dt = datetime.strptime(text, "%Y-%m-%d %H:%M:%S%z")
return int(dt.timestamp())
except (ValueError, TypeError):
return None
def _parse_archive_contests(html: str) -> list[ContestSummary]:
soup = BeautifulSoup(html, "html.parser")
tbody = soup.select_one("table.table-default tbody") or soup.select_one("tbody")
@ -139,7 +157,10 @@ def _parse_archive_contests(html: str) -> list[ContestSummary]:
continue
cid = m.group(1)
name = a.get_text(strip=True)
out.append(ContestSummary(id=cid, name=name, display_name=name))
start_time = _parse_start_time(tr)
out.append(
ContestSummary(id=cid, name=name, display_name=name, start_time=start_time)
)
return out
@ -169,7 +190,7 @@ def _parse_tasks_list(html: str) -> list[dict[str, str]]:
return rows
def _extract_problem_info(html: str) -> tuple[int, float, bool]:
def _extract_problem_info(html: str) -> tuple[int, float, bool, float | None]:
soup = BeautifulSoup(html, "html.parser")
txt = soup.get_text(" ", strip=True)
timeout_ms = 0
@ -181,9 +202,10 @@ def _extract_problem_info(html: str) -> tuple[int, float, bool]:
if ms:
memory_mb = float(ms.group(1)) * MIB_TO_MB
div = soup.select_one("#problem-statement")
txt = div.get_text(" ", strip=True) if div else soup.get_text(" ", strip=True)
interactive = "This is an interactive" in txt
return timeout_ms, memory_mb, interactive
body = div.get_text(" ", strip=True) if div else soup.get_text(" ", strip=True)
interactive = "This is an interactive" in body
precision = extract_precision(body)
return timeout_ms, memory_mb, interactive, precision
def _extract_samples(html: str) -> list[TestCase]:
@ -220,12 +242,13 @@ def _scrape_problem_page_sync(contest_id: str, slug: str) -> dict[str, Any]:
tests = _extract_samples(html)
except Exception:
tests = []
timeout_ms, memory_mb, interactive = _extract_problem_info(html)
timeout_ms, memory_mb, interactive, precision = _extract_problem_info(html)
return {
"tests": tests,
"timeout_ms": timeout_ms,
"memory_mb": memory_mb,
"interactive": interactive,
"precision": precision,
}
@ -241,14 +264,29 @@ def _to_problem_summaries(rows: list[dict[str, str]]) -> list[ProblemSummary]:
return out
async def _fetch_upcoming_contests_async(
client: httpx.AsyncClient,
) -> list[ContestSummary]:
try:
html = await _get_async(client, f"{BASE_URL}/contests/")
return _parse_archive_contests(html)
except Exception:
return []
async def _fetch_all_contests_async() -> list[ContestSummary]:
async with httpx.AsyncClient(
limits=httpx.Limits(max_connections=100, max_keepalive_connections=100),
) as client:
upcoming = await _fetch_upcoming_contests_async(client)
first_html = await _get_async(client, ARCHIVE_URL)
last = _parse_last_page(first_html)
out = _parse_archive_contests(first_html)
if last <= 1:
seen = {c.id for c in out}
for c in upcoming:
if c.id not in seen:
out.append(c)
return out
tasks = [
asyncio.create_task(_get_async(client, f"{ARCHIVE_URL}?page={p}"))
@ -257,6 +295,10 @@ async def _fetch_all_contests_async() -> list[ContestSummary]:
for coro in asyncio.as_completed(tasks):
html = await coro
out.extend(_parse_archive_contests(html))
seen = {c.id for c in out}
for c in upcoming:
if c.id not in seen:
out.append(c)
return out
@ -319,6 +361,7 @@ class AtcoderScraper(BaseScraper):
"memory_mb": data.get("memory_mb", 0),
"interactive": bool(data.get("interactive")),
"multi_test": False,
"precision": data.get("precision"),
}
),
flush=True,
@ -326,6 +369,61 @@ class AtcoderScraper(BaseScraper):
await asyncio.gather(*(emit(r) for r in rows))
async def submit(self, contest_id: str, problem_id: str, source_code: str, language_id: str, credentials: dict[str, str]) -> SubmitResult:
def _submit_sync() -> SubmitResult:
try:
login_page = _session.get(f"{BASE_URL}/login", headers=HEADERS, timeout=TIMEOUT_SECONDS)
login_page.raise_for_status()
soup = BeautifulSoup(login_page.text, "html.parser")
csrf_input = soup.find("input", {"name": "csrf_token"})
if not csrf_input:
return SubmitResult(success=False, error="Could not find CSRF token on login page")
csrf_token = csrf_input.get("value", "")
login_resp = _session.post(
f"{BASE_URL}/login",
data={
"username": credentials.get("username", ""),
"password": credentials.get("password", ""),
"csrf_token": csrf_token,
},
headers=HEADERS,
timeout=TIMEOUT_SECONDS,
)
login_resp.raise_for_status()
submit_page = _session.get(
f"{BASE_URL}/contests/{contest_id}/submit",
headers=HEADERS,
timeout=TIMEOUT_SECONDS,
)
submit_page.raise_for_status()
soup = BeautifulSoup(submit_page.text, "html.parser")
csrf_input = soup.find("input", {"name": "csrf_token"})
if not csrf_input:
return SubmitResult(success=False, error="Could not find CSRF token on submit page")
csrf_token = csrf_input.get("value", "")
task_screen_name = f"{contest_id}_{problem_id}"
submit_resp = _session.post(
f"{BASE_URL}/contests/{contest_id}/submit",
data={
"data.TaskScreenName": task_screen_name,
"data.LanguageId": language_id,
"sourceCode": source_code,
"csrf_token": csrf_token,
},
headers=HEADERS,
timeout=TIMEOUT_SECONDS,
)
submit_resp.raise_for_status()
return SubmitResult(success=True, error="", submission_id="", verdict="submitted")
except Exception as e:
return SubmitResult(success=False, error=str(e))
return await asyncio.to_thread(_submit_sync)
async def main_async() -> int:
if len(sys.argv) < 2:

View file

@ -1,8 +1,31 @@
import asyncio
import json
import os
import re
import sys
from abc import ABC, abstractmethod
from .models import CombinedTest, ContestListResult, MetadataResult, TestsResult
from .language_ids import get_language_id
from .models import CombinedTest, ContestListResult, MetadataResult, SubmitResult, TestsResult
_PRECISION_ABS_REL_RE = re.compile(
r"(?:absolute|relative)\s+error[^.]*?10\s*[\^{]\s*\{?\s*[-\u2212]\s*(\d+)\s*\}?",
re.IGNORECASE,
)
_PRECISION_DECIMAL_RE = re.compile(
r"round(?:ed)?\s+to\s+(\d+)\s+decimal\s+place",
re.IGNORECASE,
)
def extract_precision(text: str) -> float | None:
m = _PRECISION_ABS_REL_RE.search(text)
if m:
return 10 ** -int(m.group(1))
m = _PRECISION_DECIMAL_RE.search(text)
if m:
return 10 ** -int(m.group(1))
return None
class BaseScraper(ABC):
@ -19,6 +42,9 @@ class BaseScraper(ABC):
@abstractmethod
async def stream_tests_for_category_async(self, category_id: str) -> None: ...
@abstractmethod
async def submit(self, contest_id: str, problem_id: str, source_code: str, language_id: str, credentials: dict[str, str]) -> SubmitResult: ...
def _usage(self) -> str:
name = self.platform_name
return f"Usage: {name}.py metadata <id> | tests <id> | contests"
@ -40,6 +66,9 @@ class BaseScraper(ABC):
def _contests_error(self, msg: str) -> ContestListResult:
return ContestListResult(success=False, error=msg)
def _submit_error(self, msg: str) -> SubmitResult:
return SubmitResult(success=False, error=msg)
async def _run_cli_async(self, args: list[str]) -> int:
if len(args) < 2:
print(self._metadata_error(self._usage()).model_dump_json())
@ -71,6 +100,21 @@ class BaseScraper(ABC):
print(result.model_dump_json())
return 0 if result.success else 1
case "submit":
if len(args) != 5:
print(self._submit_error("Usage: <platform> submit <contest_id> <problem_id> <language_id>").model_dump_json())
return 1
source_code = sys.stdin.read()
creds_raw = os.environ.get("CP_CREDENTIALS", "{}")
try:
credentials = json.loads(creds_raw)
except json.JSONDecodeError:
credentials = {}
language_id = get_language_id(self.platform_name, args[4]) or args[4]
result = await self.submit(args[2], args[3], source_code, language_id, credentials)
print(result.model_dump_json())
return 0 if result.success else 1
case _:
print(
self._metadata_error(

View file

@ -8,12 +8,13 @@ from typing import Any
import httpx
from curl_cffi import requests as curl_requests
from .base import BaseScraper
from .base import BaseScraper, extract_precision
from .models import (
ContestListResult,
ContestSummary,
MetadataResult,
ProblemSummary,
SubmitResult,
TestCase,
)
@ -219,11 +220,13 @@ class CodeChefScraper(BaseScraper):
)
memory_mb = _extract_memory_limit(html)
interactive = False
precision = extract_precision(html)
except Exception:
tests = []
timeout_ms = 1000
memory_mb = 256.0
interactive = False
precision = None
combined_input = "\n".join(t.input for t in tests) if tests else ""
combined_expected = (
"\n".join(t.expected for t in tests) if tests else ""
@ -241,6 +244,7 @@ class CodeChefScraper(BaseScraper):
"memory_mb": memory_mb,
"interactive": interactive,
"multi_test": False,
"precision": precision,
}
tasks = [run_one(problem_code) for problem_code in problems.keys()]
@ -248,6 +252,9 @@ class CodeChefScraper(BaseScraper):
payload = await coro
print(json.dumps(payload), flush=True)
async def submit(self, contest_id: str, problem_id: str, source_code: str, language_id: str, credentials: dict[str, str]) -> SubmitResult:
return SubmitResult(success=False, error="CodeChef submit not yet implemented", submission_id="", verdict="")
if __name__ == "__main__":
CodeChefScraper().run_cli()

View file

@ -9,12 +9,13 @@ import requests
from bs4 import BeautifulSoup, Tag
from curl_cffi import requests as curl_requests
from .base import BaseScraper
from .base import BaseScraper, extract_precision
from .models import (
ContestListResult,
ContestSummary,
MetadataResult,
ProblemSummary,
SubmitResult,
TestCase,
)
@ -153,6 +154,7 @@ def _parse_all_blocks(html: str) -> list[dict[str, Any]]:
raw_samples, is_grouped = _extract_samples(b)
timeout_ms, memory_mb = _extract_limits(b)
interactive = _is_interactive(b)
precision = extract_precision(b.get_text(" ", strip=True))
if is_grouped and raw_samples:
combined_input = f"{len(raw_samples)}\n" + "\n".join(
@ -179,6 +181,7 @@ def _parse_all_blocks(html: str) -> list[dict[str, Any]]:
"memory_mb": memory_mb,
"interactive": interactive,
"multi_test": is_grouped,
"precision": precision,
}
)
return out
@ -228,11 +231,20 @@ class CodeforcesScraper(BaseScraper):
contests: list[ContestSummary] = []
for c in data["result"]:
if c.get("phase") != "FINISHED":
phase = c.get("phase")
if phase not in ("FINISHED", "BEFORE", "CODING"):
continue
cid = str(c["id"])
name = c["name"]
contests.append(ContestSummary(id=cid, name=name, display_name=name))
start_time = c.get("startTimeSeconds") if phase != "FINISHED" else None
contests.append(
ContestSummary(
id=cid,
name=name,
display_name=name,
start_time=start_time,
)
)
if not contests:
return self._contests_error("No contests found")
@ -263,11 +275,15 @@ class CodeforcesScraper(BaseScraper):
"memory_mb": b.get("memory_mb", 0),
"interactive": bool(b.get("interactive")),
"multi_test": bool(b.get("multi_test", False)),
"precision": b.get("precision"),
}
),
flush=True,
)
async def submit(self, contest_id: str, problem_id: str, source_code: str, language_id: str, credentials: dict[str, str]) -> SubmitResult:
return SubmitResult(success=False, error="Codeforces submit not yet implemented", submission_id="", verdict="")
if __name__ == "__main__":
CodeforcesScraper().run_cli()

View file

@ -7,12 +7,13 @@ from typing import Any
import httpx
from .base import BaseScraper
from .base import BaseScraper, extract_precision
from .models import (
ContestListResult,
ContestSummary,
MetadataResult,
ProblemSummary,
SubmitResult,
TestCase,
)
@ -129,17 +130,21 @@ def parse_category_problems(category_id: str, html: str) -> list[ProblemSummary]
return []
def _extract_problem_info(html: str) -> tuple[int, int, bool]:
def _extract_problem_info(html: str) -> tuple[int, int, bool, float | None]:
tm = TIME_RE.search(html)
mm = MEM_RE.search(html)
t = int(round(float(tm.group(1)) * 1000)) if tm else 0
m = int(mm.group(1)) if mm else 0
md = MD_BLOCK_RE.search(html)
interactive = False
precision = None
if md:
body = md.group(1)
interactive = "This is an interactive problem." in body
return t, m, interactive
from bs4 import BeautifulSoup
precision = extract_precision(BeautifulSoup(body, "html.parser").get_text(" "))
return t, m, interactive, precision
def parse_title(html: str) -> str:
@ -257,6 +262,9 @@ class CSESScraper(BaseScraper):
payload = await coro
print(json.dumps(payload), flush=True)
async def submit(self, contest_id: str, problem_id: str, source_code: str, language_id: str, credentials: dict[str, str]) -> SubmitResult:
return SubmitResult(success=False, error="CSES submit not yet implemented", submission_id="", verdict="")
if __name__ == "__main__":
CSESScraper().run_cli()

View file

@ -26,6 +26,7 @@ class ContestSummary(BaseModel):
id: str
name: str
display_name: str | None = None
start_time: int | None = None
model_config = ConfigDict(extra="forbid")
@ -63,6 +64,13 @@ class TestsResult(ScrapingResult):
model_config = ConfigDict(extra="forbid")
class SubmitResult(ScrapingResult):
submission_id: str = ""
verdict: str = ""
model_config = ConfigDict(extra="forbid")
class ScraperConfig(BaseModel):
timeout_seconds: int = 30
max_retries: int = 3