diff --git a/lua/cp/config.lua b/lua/cp/config.lua
index 2dd2b32..fcebc0e 100644
--- a/lua/cp/config.lua
+++ b/lua/cp/config.lua
@@ -25,7 +25,7 @@
---@class PanelConfig
---@field diff_modes string[]
---@field max_output_lines integer
----@field epsilon number?
+---@field precision number?
---@class DiffGitConfig
---@field args string[]
@@ -165,6 +165,14 @@ M.defaults = {
enabled_languages = { 'cpp', 'python' },
default_language = 'cpp',
},
+ kattis = {
+ enabled_languages = { 'cpp', 'python' },
+ default_language = 'cpp',
+ },
+ usaco = {
+ enabled_languages = { 'cpp', 'python' },
+ default_language = 'cpp',
+ },
},
hooks = {
setup = {
@@ -199,7 +207,11 @@ M.defaults = {
add_test_key = 'ga',
save_and_exit_key = 'q',
},
- panel = { diff_modes = { 'side-by-side', 'git', 'vim' }, max_output_lines = 50, epsilon = nil },
+ panel = {
+ diff_modes = { 'side-by-side', 'git', 'vim' },
+ max_output_lines = 50,
+ precision = nil,
+ },
diff = {
git = {
args = { 'diff', '--no-index', '--word-diff=plain', '--word-diff-regex=.', '--no-prefix' },
@@ -420,8 +432,8 @@ function M.setup(user_config)
end,
'positive integer',
},
- epsilon = {
- cfg.ui.panel.epsilon,
+ precision = {
+ cfg.ui.panel.precision,
function(v)
return v == nil or (type(v) == 'number' and v >= 0)
end,
diff --git a/lua/cp/constants.lua b/lua/cp/constants.lua
index 7bdaa16..c4bac3e 100644
--- a/lua/cp/constants.lua
+++ b/lua/cp/constants.lua
@@ -1,13 +1,15 @@
local M = {}
-M.PLATFORMS = { 'atcoder', 'codechef', 'codeforces', 'cses' }
-M.ACTIONS = { 'run', 'panel', 'next', 'prev', 'pick', 'cache', 'interact', 'edit' }
+M.PLATFORMS = { 'atcoder', 'codechef', 'codeforces', 'cses', 'kattis', 'usaco' }
+M.ACTIONS = { 'run', 'panel', 'next', 'prev', 'pick', 'cache', 'interact', 'edit', 'race', 'stress', 'submit' }
M.PLATFORM_DISPLAY_NAMES = {
atcoder = 'AtCoder',
codechef = 'CodeChef',
codeforces = 'CodeForces',
cses = 'CSES',
+ kattis = 'Kattis',
+ usaco = 'USACO',
}
M.CPP = 'cpp'
diff --git a/scrapers/kattis.py b/scrapers/kattis.py
new file mode 100644
index 0000000..c98dc35
--- /dev/null
+++ b/scrapers/kattis.py
@@ -0,0 +1,231 @@
+#!/usr/bin/env python3
+
+import asyncio
+import io
+import json
+import re
+import zipfile
+
+import httpx
+
+from .base import BaseScraper
+from .models import (
+ ContestListResult,
+ ContestSummary,
+ MetadataResult,
+ ProblemSummary,
+ SubmitResult,
+ TestCase,
+)
+
+BASE_URL = "https://open.kattis.com"
+HEADERS = {
+ "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
+}
+TIMEOUT_S = 15.0
+CONNECTIONS = 8
+
+TIME_RE = re.compile(
+ r"CPU Time limit\s*]*>\s*(\d+)\s*seconds?\s*",
+ re.DOTALL,
+)
+MEM_RE = re.compile(
+ r"Memory limit\s*]*>\s*(\d+)\s*MB\s*",
+ re.DOTALL,
+)
+LAST_PAGE_RE = re.compile(r"\bpage=(\d+)")
+
+
+async def _fetch_text(client: httpx.AsyncClient, url: str) -> str:
+ r = await client.get(url, headers=HEADERS, timeout=TIMEOUT_S)
+ r.raise_for_status()
+ return r.text
+
+
+async def _fetch_bytes(client: httpx.AsyncClient, url: str) -> bytes:
+ r = await client.get(url, headers=HEADERS, timeout=TIMEOUT_S)
+ r.raise_for_status()
+ return r.content
+
+
+def _parse_limits(html: str) -> tuple[int, int]:
+ tm = TIME_RE.search(html)
+ mm = MEM_RE.search(html)
+ timeout_ms = int(tm.group(1)) * 1000 if tm else 1000
+ memory_mb = int(mm.group(1)) if mm else 1024
+ return timeout_ms, memory_mb
+
+
+def _parse_samples_html(html: str) -> list[TestCase]:
+ tests: list[TestCase] = []
+ tables = re.finditer(r'
', html, re.DOTALL)
+ for table_match in tables:
+ table_html = table_match.group(0)
+ pres = re.findall(r"(.*?)
", table_html, re.DOTALL)
+ if len(pres) >= 2:
+ inp = pres[0].strip()
+ out = pres[1].strip()
+ tests.append(TestCase(input=inp, expected=out))
+ return tests
+
+
+def _parse_samples_zip(data: bytes) -> list[TestCase]:
+ try:
+ zf = zipfile.ZipFile(io.BytesIO(data))
+ except zipfile.BadZipFile:
+ return []
+ inputs: dict[str, str] = {}
+ outputs: dict[str, str] = {}
+ for name in zf.namelist():
+ content = zf.read(name).decode("utf-8").strip()
+ if name.endswith(".in"):
+ key = name[: -len(".in")]
+ inputs[key] = content
+ elif name.endswith(".ans"):
+ key = name[: -len(".ans")]
+ outputs[key] = content
+ tests: list[TestCase] = []
+ for key in sorted(set(inputs) & set(outputs)):
+ tests.append(TestCase(input=inputs[key], expected=outputs[key]))
+ return tests
+
+
+def _is_interactive(html: str) -> bool:
+ return "This is an interactive problem" in html
+
+
+def _parse_problem_rows(html: str) -> list[tuple[str, str]]:
+ seen: set[str] = set()
+ out: list[tuple[str, str]] = []
+ for m in re.finditer(
+ r'\s*([^<]+)',
+ html,
+ ):
+ pid = m.group(1)
+ name = m.group(2).strip()
+ if pid not in seen:
+ seen.add(pid)
+ out.append((pid, name))
+ return out
+
+
+def _parse_last_page(html: str) -> int:
+ nums = [int(m.group(1)) for m in LAST_PAGE_RE.finditer(html)]
+ return max(nums) if nums else 0
+
+
+class KattisScraper(BaseScraper):
+ @property
+ def platform_name(self) -> str:
+ return "kattis"
+
+ async def scrape_contest_metadata(self, contest_id: str) -> MetadataResult:
+ try:
+ async with httpx.AsyncClient() as client:
+ html = await _fetch_text(client, f"{BASE_URL}/problems/{contest_id}")
+ timeout_ms, memory_mb = _parse_limits(html)
+ title_m = re.search(r"([^<]+)", html)
+ name = (
+ title_m.group(1).split("\u2013")[0].strip() if title_m else contest_id
+ )
+ return MetadataResult(
+ success=True,
+ error="",
+ contest_id=contest_id,
+ problems=[ProblemSummary(id=contest_id, name=name)],
+ url=f"{BASE_URL}/problems/%s",
+ )
+ except Exception as e:
+ return self._metadata_error(str(e))
+
+ async def scrape_contest_list(self) -> ContestListResult:
+ try:
+ async with httpx.AsyncClient(
+ limits=httpx.Limits(max_connections=CONNECTIONS)
+ ) as client:
+ first_html = await _fetch_text(
+ client, f"{BASE_URL}/problems?page=0&order=problem_difficulty"
+ )
+ last = _parse_last_page(first_html)
+ rows = _parse_problem_rows(first_html)
+
+ sem = asyncio.Semaphore(CONNECTIONS)
+
+ async def fetch_page(page: int) -> list[tuple[str, str]]:
+ async with sem:
+ html = await _fetch_text(
+ client,
+ f"{BASE_URL}/problems?page={page}&order=problem_difficulty",
+ )
+ return _parse_problem_rows(html)
+
+ tasks = [fetch_page(p) for p in range(1, last + 1)]
+ for coro in asyncio.as_completed(tasks):
+ rows.extend(await coro)
+
+ seen: set[str] = set()
+ contests: list[ContestSummary] = []
+ for pid, name in rows:
+ if pid not in seen:
+ seen.add(pid)
+ contests.append(
+ ContestSummary(id=pid, name=name, display_name=name)
+ )
+ if not contests:
+ return self._contests_error("No problems found")
+ return ContestListResult(success=True, error="", contests=contests)
+ except Exception as e:
+ return self._contests_error(str(e))
+
+ async def stream_tests_for_category_async(self, category_id: str) -> None:
+ async with httpx.AsyncClient(
+ limits=httpx.Limits(max_connections=CONNECTIONS)
+ ) as client:
+ try:
+ html = await _fetch_text(client, f"{BASE_URL}/problems/{category_id}")
+ except Exception:
+ return
+
+ timeout_ms, memory_mb = _parse_limits(html)
+ interactive = _is_interactive(html)
+
+ tests: list[TestCase] = []
+ try:
+ zip_data = await _fetch_bytes(
+ client,
+ f"{BASE_URL}/problems/{category_id}/file/statement/samples.zip",
+ )
+ tests = _parse_samples_zip(zip_data)
+ except Exception:
+ tests = _parse_samples_html(html)
+
+ combined_input = "\n".join(t.input for t in tests) if tests else ""
+ combined_expected = "\n".join(t.expected for t in tests) if tests else ""
+
+ print(
+ json.dumps(
+ {
+ "problem_id": category_id,
+ "combined": {
+ "input": combined_input,
+ "expected": combined_expected,
+ },
+ "tests": [
+ {"input": t.input, "expected": t.expected} for t in tests
+ ],
+ "timeout_ms": timeout_ms,
+ "memory_mb": memory_mb,
+ "interactive": interactive,
+ "multi_test": False,
+ }
+ ),
+ flush=True,
+ )
+
+
+ async def submit(self, contest_id: str, problem_id: str, source_code: str, language_id: str, credentials: dict[str, str]) -> SubmitResult:
+ return SubmitResult(success=False, error="Kattis submit not yet implemented", submission_id="", verdict="")
+
+
+if __name__ == "__main__":
+ KattisScraper().run_cli()
diff --git a/scrapers/usaco.py b/scrapers/usaco.py
new file mode 100644
index 0000000..b8c6c9f
--- /dev/null
+++ b/scrapers/usaco.py
@@ -0,0 +1,288 @@
+#!/usr/bin/env python3
+
+import asyncio
+import json
+import re
+from typing import Any
+
+import httpx
+
+from .base import BaseScraper
+from .models import (
+ ContestListResult,
+ ContestSummary,
+ MetadataResult,
+ ProblemSummary,
+ SubmitResult,
+ TestCase,
+)
+
+BASE_URL = "http://www.usaco.org"
+HEADERS = {
+ "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
+}
+TIMEOUT_S = 15.0
+CONNECTIONS = 4
+
+MONTHS = [
+ "dec",
+ "jan",
+ "feb",
+ "mar",
+ "open",
+]
+
+DIVISION_HEADING_RE = re.compile(
+ r".*?USACO\s+(\d{4})\s+(\w+)\s+Contest,\s+(\w+)\s*",
+ re.IGNORECASE,
+)
+PROBLEM_BLOCK_RE = re.compile(
+ r"([^<]+)\s* .*?"
+ r"viewproblem2&cpid=(\d+)",
+ re.DOTALL,
+)
+SAMPLE_IN_RE = re.compile(r"(.*?) ", re.DOTALL)
+SAMPLE_OUT_RE = re.compile(r"(.*?) ", re.DOTALL)
+TIME_NOTE_RE = re.compile(
+ r"time\s+limit\s+(?:for\s+this\s+problem\s+is\s+)?(\d+)s",
+ re.IGNORECASE,
+)
+MEMORY_NOTE_RE = re.compile(
+ r"memory\s+limit\s+(?:for\s+this\s+problem\s+is\s+)?(\d+)\s*MB",
+ re.IGNORECASE,
+)
+RESULTS_PAGE_RE = re.compile(
+ r'href="index\.php\?page=([a-z]+\d{2,4}results)"',
+ re.IGNORECASE,
+)
+
+
+async def _fetch_text(client: httpx.AsyncClient, url: str) -> str:
+ r = await client.get(url, headers=HEADERS, timeout=TIMEOUT_S, follow_redirects=True)
+ r.raise_for_status()
+ return r.text
+
+
+def _parse_results_page(html: str) -> dict[str, list[tuple[str, str]]]:
+ sections: dict[str, list[tuple[str, str]]] = {}
+ current_div: str | None = None
+
+ parts = re.split(r"(.*?)", html, flags=re.DOTALL)
+ for part in parts:
+ heading_m = DIVISION_HEADING_RE.search(part)
+ if heading_m:
+ current_div = heading_m.group(3).lower()
+ sections.setdefault(current_div, [])
+ continue
+ if current_div is not None:
+ for m in PROBLEM_BLOCK_RE.finditer(part):
+ name = m.group(1).strip()
+ cpid = m.group(2)
+ sections[current_div].append((cpid, name))
+
+ return sections
+
+
+def _parse_contest_id(contest_id: str) -> tuple[str, str]:
+ parts = contest_id.rsplit("_", 1)
+ if len(parts) != 2:
+ return contest_id, ""
+ return parts[0], parts[1].lower()
+
+
+def _results_page_slug(month_year: str) -> str:
+ return f"{month_year}results"
+
+
+def _parse_problem_page(html: str) -> dict[str, Any]:
+ inputs = SAMPLE_IN_RE.findall(html)
+ outputs = SAMPLE_OUT_RE.findall(html)
+ tests: list[TestCase] = []
+ for inp, out in zip(inputs, outputs):
+ tests.append(
+ TestCase(
+ input=inp.strip().replace("\r", ""),
+ expected=out.strip().replace("\r", ""),
+ )
+ )
+
+ tm = TIME_NOTE_RE.search(html)
+ mm = MEMORY_NOTE_RE.search(html)
+ timeout_ms = int(tm.group(1)) * 1000 if tm else 4000
+ memory_mb = int(mm.group(1)) if mm else 256
+
+ interactive = "interactive problem" in html.lower()
+
+ return {
+ "tests": tests,
+ "timeout_ms": timeout_ms,
+ "memory_mb": memory_mb,
+ "interactive": interactive,
+ }
+
+
+class USACOScraper(BaseScraper):
+ @property
+ def platform_name(self) -> str:
+ return "usaco"
+
+ async def scrape_contest_metadata(self, contest_id: str) -> MetadataResult:
+ try:
+ month_year, division = _parse_contest_id(contest_id)
+ if not division:
+ return self._metadata_error(
+ f"Invalid contest ID '{contest_id}'. "
+ "Expected format: _ (e.g. dec24_gold)"
+ )
+
+ slug = _results_page_slug(month_year)
+ async with httpx.AsyncClient() as client:
+ html = await _fetch_text(client, f"{BASE_URL}/index.php?page={slug}")
+ sections = _parse_results_page(html)
+ problems_raw = sections.get(division, [])
+ if not problems_raw:
+ return self._metadata_error(
+ f"No problems found for {contest_id} (division: {division})"
+ )
+ problems = [
+ ProblemSummary(id=cpid, name=name) for cpid, name in problems_raw
+ ]
+ return MetadataResult(
+ success=True,
+ error="",
+ contest_id=contest_id,
+ problems=problems,
+ url=f"{BASE_URL}/index.php?page=viewproblem2&cpid=%s",
+ )
+ except Exception as e:
+ return self._metadata_error(str(e))
+
+ async def scrape_contest_list(self) -> ContestListResult:
+ try:
+ async with httpx.AsyncClient(
+ limits=httpx.Limits(max_connections=CONNECTIONS)
+ ) as client:
+ html = await _fetch_text(client, f"{BASE_URL}/index.php?page=contests")
+
+ page_slugs: set[str] = set()
+ for m in RESULTS_PAGE_RE.finditer(html):
+ page_slugs.add(m.group(1))
+
+ recent_patterns = []
+ for year in range(15, 27):
+ for month in MONTHS:
+ recent_patterns.append(f"{month}{year:02d}results")
+ page_slugs.update(recent_patterns)
+
+ contests: list[ContestSummary] = []
+ sem = asyncio.Semaphore(CONNECTIONS)
+
+ async def check_page(slug: str) -> list[ContestSummary]:
+ async with sem:
+ try:
+ page_html = await _fetch_text(
+ client, f"{BASE_URL}/index.php?page={slug}"
+ )
+ except Exception:
+ return []
+ sections = _parse_results_page(page_html)
+ if not sections:
+ return []
+ month_year = slug.replace("results", "")
+ out: list[ContestSummary] = []
+ for div in sections:
+ cid = f"{month_year}_{div}"
+ year_m = re.search(r"\d{2,4}", month_year)
+ month_m = re.search(r"[a-z]+", month_year)
+ year_str = year_m.group() if year_m else ""
+ month_str = month_m.group().capitalize() if month_m else ""
+ if len(year_str) == 2:
+ year_str = f"20{year_str}"
+ display = (
+ f"USACO {year_str} {month_str} - {div.capitalize()}"
+ )
+ out.append(
+ ContestSummary(id=cid, name=cid, display_name=display)
+ )
+ return out
+
+ tasks = [check_page(slug) for slug in sorted(page_slugs)]
+ for coro in asyncio.as_completed(tasks):
+ contests.extend(await coro)
+
+ if not contests:
+ return self._contests_error("No contests found")
+ return ContestListResult(success=True, error="", contests=contests)
+ except Exception as e:
+ return self._contests_error(str(e))
+
+ async def stream_tests_for_category_async(self, category_id: str) -> None:
+ month_year, division = _parse_contest_id(category_id)
+ if not division:
+ return
+
+ slug = _results_page_slug(month_year)
+ async with httpx.AsyncClient(
+ limits=httpx.Limits(max_connections=CONNECTIONS)
+ ) as client:
+ try:
+ html = await _fetch_text(client, f"{BASE_URL}/index.php?page={slug}")
+ except Exception:
+ return
+
+ sections = _parse_results_page(html)
+ problems_raw = sections.get(division, [])
+ if not problems_raw:
+ return
+
+ sem = asyncio.Semaphore(CONNECTIONS)
+
+ async def run_one(cpid: str) -> dict[str, Any]:
+ async with sem:
+ try:
+ problem_html = await _fetch_text(
+ client,
+ f"{BASE_URL}/index.php?page=viewproblem2&cpid={cpid}",
+ )
+ info = _parse_problem_page(problem_html)
+ except Exception:
+ info = {
+ "tests": [],
+ "timeout_ms": 4000,
+ "memory_mb": 256,
+ "interactive": False,
+ }
+
+ tests: list[TestCase] = info["tests"]
+ combined_input = "\n".join(t.input for t in tests) if tests else ""
+ combined_expected = (
+ "\n".join(t.expected for t in tests) if tests else ""
+ )
+
+ return {
+ "problem_id": cpid,
+ "combined": {
+ "input": combined_input,
+ "expected": combined_expected,
+ },
+ "tests": [
+ {"input": t.input, "expected": t.expected} for t in tests
+ ],
+ "timeout_ms": info["timeout_ms"],
+ "memory_mb": info["memory_mb"],
+ "interactive": info["interactive"],
+ "multi_test": False,
+ }
+
+ tasks = [run_one(cpid) for cpid, _ in problems_raw]
+ for coro in asyncio.as_completed(tasks):
+ payload = await coro
+ print(json.dumps(payload), flush=True)
+
+
+ async def submit(self, contest_id: str, problem_id: str, source_code: str, language_id: str, credentials: dict[str, str]) -> SubmitResult:
+ return SubmitResult(success=False, error="USACO submit not yet implemented", submission_id="", verdict="")
+
+
+if __name__ == "__main__":
+ USACOScraper().run_cli()
|