Compare commits

..

97 commits

Author SHA1 Message Date
90eedeb55f
ci: format 2026-03-06 13:21:49 -05:00
297c71e7c7
fix: replace curl_cffi with scrapling in codeforces metadata
Problem: `codeforces.py` used `curl_cffi` to bypass Cloudflare when
fetching contest problem HTML, making it unavailable in the nix python
env and requiring an extra dependency.

Solution: rewrite `_fetch_problems_html` to use scrapling
`StealthySession` with `solve_cloudflare=True`, matching the existing
CF submit pattern. Extend `needs_browser` in `scraper.lua` to route CF
`metadata` and `tests` through the FHS env on NixOS. Remove `curl-cffi`
from `pyproject.toml`, `flake.nix`, and test mocks.
2026-03-06 13:18:06 -05:00
Barrett Ruth
543480a4fe
feat: implement login and submit for USACO, Kattis, and CodeChef (#325)
## Problem

Login and submit were stub-only for USACO, Kattis, and CodeChef, leaving
three platforms without a working solve loop.

## Solution

Three commits, one per platform:

**USACO** — httpx-based login via `login-session.php` with cookie cache.
Submit fetches the problem page and parses the form dynamically (action
URL, hidden fields, language select) before POSTing multipart with
`sub_file[]`.

**Kattis** — httpx-based login via `/login/email` (official Kattis CLI
API). Submit is a multipart POST to `/submit`; the `contest` field is
included only when `contest_id != problem_id`. `scrape_contest_list`
URL updated to filter
`kattis_original=on&kattis_recycled=off&user_created=off`.

**CodeChef** — StealthySession browser-based login and submit following
the AtCoder/CF pattern. Login checks `__NEXT_DATA__` for current user
and fills the email/password form. Submit navigates to
`/{contest_id}/submit/{problem_id}`, selects language (standard
`<select>` first, custom dropdown fallback), sets code via
Monaco/CodeMirror/textarea JS, and clicks submit. Retry-on-relogin
pattern matches existing CF behaviour.

Language IDs added to `language_ids.py` for all three platforms.
2026-03-06 00:09:16 -05:00
Barrett Ruth
2d50f0a52a
refactor: remove open_url config option in favour of :CP open (#322)
## Problem

`open_url` automatically opened the browser on contest load and problem
change, which is now redundant with `:CP open`.

## Solution

Remove the `open_url` field from `cp.Config`, its default, its
validation, and the call site in `setup_contest`. Remove documentation
from `cp.nvim.txt`.
2026-03-05 22:55:43 -05:00
Barrett Ruth
401afb205e
feat(commands): document :CP open [problem|contest|standings] (#321)
forgot to document `:CP open`
2026-03-05 19:51:06 -05:00
Barrett Ruth
7bf4cf2538
feat(commands): implement :CP open [problem|contest|standings] (#319)
## Problem

There was no way to open a problem, contest, or standings page in the
browser from within the plugin.

## Solution

Add `contest_url` and `standings_url` to `MetadataResult` and persist
them in the cache. Add `cache.get_open_urls` to resolve all three URLs.
Wire up `:CP open [problem|contest|standings]` in `commands/init.lua`
to call `vim.ui.open`, warning when a URL is unavailable (e.g. CSES
has no standings). Closes #315.
2026-03-05 19:16:05 -05:00
Barrett Ruth
b959f29bd4
fix(scrapers): harden CSES and CF submit edge cases (#295) (#318)
## Problem

CSES `_web_login` used bare dict indexing on the API response, raising
an opaque `KeyError` on missing fields. `_check_token` swallowed all
exceptions as `False`, conflating transient network errors with invalid
tokens. CF persisted cookies unconditionally and silently swallowed
`_solve_turnstile` failures in `submit_action`.

## Solution

CSES API fields now use `.get()` with a descriptive `RuntimeError` on
absence. `_check_token` re-raises `httpx` network/timeout exceptions
so callers see real failures. CF cookie writes are guarded by an
`X-User-Handle` check (the CF auth cookie). `_solve_turnstile` errors
propagate to the outer error handler instead of being silenced.
2026-03-05 19:00:35 -05:00
Barrett Ruth
6c036a7b2e
fix: systematic context guards and full panel lifecycle (#317)
## Problem

The `CONTEST_ACTIONS` list in `handle_command` was a manually maintained
parallel of `ACTIONS` that had already drifted (`pick` was incorrectly
included, breaking `:CP pick` cold). `navigate_problem` only cancelled
the `'run'` panel on `next`/`prev`, leaving interactive and stress
terminals alive and in-flight `run_io_view` callbacks unguarded.
`pick` and `cache` also appeared twice in tab-completion when a contest
was active.

## Solution

Replace `CONTEST_ACTIONS` with a `requires_context` field on
`ParsedCommand`,
set at each parse-site in `parse_command` so the semantics live with the
action definition. Expand `navigate_problem` to call `cancel_io_view()`
and
dispatch `cancel_interactive`/`stress.cancel` for all panel types,
mirroring
the contest-switch logic. Filter already-present `pick` and `cache` from
the
context-conditional completion candidates.
2026-03-05 18:29:41 -05:00
Barrett Ruth
7e7e135681
fix: cancel active process on contest switch (#316)
## Problem

Switching contests while a run, interactive session, or stress test was
active left orphaned callbacks and terminal jobs alive. Running `:CP
run`
twice also let the first run's stale output overwrite the buffer.

## Solution

Replace the `io_view_running` bool in `views.lua` with a generation
counter so concurrent `run_io_view` calls self-cancel via stale-gen
checks in async callbacks. Add `cancel_io_view`, `cancel_interactive`,
and `stress.cancel` for forceful teardown, and call them in
`setup_contest` whenever `is_new_contest` is true.
2026-03-05 17:42:50 -05:00
Barrett Ruth
916c9174a6
fix(commands): canonicalize CF contest IDs and auto-restore on action (#314)
## Problem

Two gaps in `commands/init.lua`. Codeforces contest IDs were passed
through raw, so full URLs (e.g.
`https://codeforces.com/contest/1933/problem/A`) or problem IDs with
trailing letters (e.g. `1933A`) caused scraper URL construction to fail.
Separately, action commands like `:CP run` silently failed when invoked
with no active contest instead of attempting to recover from the current
file's cached state.

## Solution

Add `canonicalize_cf_contest` to normalize URLs and strip trailing
problem letters in `parse_command`. Add a guard in `handle_command` that
calls `restore_from_current_file()` before dispatching any
contest-requiring action when no platform is active, returning early
only if no cached state is found.

Closes #306. Closes #308.
2026-03-05 15:37:33 -05:00
Barrett Ruth
b3014e9c86
fix(setup): clear output buffer when switching contests (#313)
## Problem

`setup_problem` only cleared the output buffer when `old_problem_id ~=
problem_id`. If two different contests share a problem with the same ID
(e.g. both have `a`), the condition is false and stale output from the
previous contest remains visible.

## Solution

Clear the output buffer at the top of `proceed()` in `setup_contest`
whenever `is_new_contest` is true, before any problem setup runs.

Closes #303.
2026-03-05 15:31:47 -05:00
Barrett Ruth
924601ce99
fix(codeforces): persist cookies after submit regardless of name (#312)
## Problem

The submit path in `codeforces.py` guarded `cookie_cache.write_text` on
the presence of a `JSESSIONID` cookie. Codeforces does not use
`JSESSIONID`, so the cookie file was never written after submit,
breaking the fast-path on subsequent submits.

## Solution

Replace the name-specific guard with a non-empty check (`if
browser_cookies:`), matching the unconditional save already used in the
login path.

Closes #301.
2026-03-05 15:31:29 -05:00
Barrett Ruth
2c119774df
feat: validate credentials on :CP <platform> login (#310)
## Problem

`:CP <platform> login` blindly caches username/password without
server-side
validation. Bad credentials are only discovered at submit time, which is
confusing and wastes a browser session.

## Solution

Wire `:CP <platform> login` through the scraper pipeline so each
platform
actually authenticates before persisting credentials. On failure, the
user
sees an error and nothing is cached.

- CSES: reuses `_check_token` (fast path) and `_web_login`; returns API
token
  in `LoginResult.credentials` so subsequent submits skip re-auth.
- AtCoder/Codeforces: new `_login_headless` functions open a
StealthySession,
solve Turnstile/Cloudflare, fill the login form, and validate success by
  checking for the logout link. Cookies only persist on confirmed login.
- CodeChef/Kattis/USACO: return "not yet implemented" errors.
- `scraper.lua`: generalizes submit-only guards (`needs_browser` flag)
to
  cover both `submit` and `login` subcommands.
- `credentials.lua`: prompts for username/password, passes cached token
for
CSES fast path, shows ndjson status notifications, only caches on
success.
2026-03-05 15:12:09 -05:00
Barrett Ruth
a202725cc5
fix(submit): use file path over stdin; fix CF CodeMirror textarea (#305)
## Problem

After the initial submit hardening, two issues remained: source code was
read in Lua and piped as stdin to the scraper (unnecessary roundtrip
since
the file exists on disk), and CF's `page.fill()` timed out on the hidden
`textarea[name="source"]` because CodeMirror owns the editor state.

## Solution

Pass the source file path as a CLI arg instead — AtCoder calls
`page.set_input_files(file_path)` directly, CF reads it with
`Path(file_path).read_text()`. Fix CF source injection via
`page.evaluate()`
into the CodeMirror instance. Extract `BROWSER_SUBMIT_NAV_TIMEOUT` as a
per-platform `defaultdict` (CF defaults to 2× nav timeout). Save the
buffer
with `vim.cmd.update()` before submitting.
2026-03-05 14:34:14 -05:00
Barrett Ruth
127089c57f
fix(submit): harden atcoder and codeforces submit flow (#304)
## Problem

AtCoder file upload always wrote a `.cpp` temp file regardless of
language. CF submit used `solve_cloudflare=True` on the submit page,
causing a spurious "No Cloudflare challenge found" error;
`_wait_for_gate_reload` in `login_action` was dead code. Stale cookies
caused silent auth failures with no recovery path. The `uv.spawn` ndjson
path for submit had no overall timeout.

## Solution

Replace AtCoder's temp file with `page.set_input_files` using an
in-memory buffer and correct extension via `_LANGUAGE_ID_EXTENSION`.
Replace CF's temp-file/fallback dance with a direct
`textarea[name="source"]` fill and set `solve_cloudflare=False` on the
submit fetch. Add a login fast-path that skips the homepage check when
cookies exist, with automatic stale-cookie recovery via `_retried` flag
on redirect-to-login detection. Remove `_wait_for_gate_reload`. Fix
`_ensure_browser` to propagate install errors. Add a 120s kill timer to
the ndjson `uv.spawn` submit path in `scraper.lua`.
2026-03-05 11:18:34 -05:00
Barrett Ruth
6fcb5d1bbc
feat(codeforces): implement submit; cache CSES token (#300)
## Problem

Codeforces submit was a stub. CSES submit re-ran the full login flow on
every invocation (~1.5s overhead).

## Solution

**Codeforces**: headless browser submit via StealthySession (same
pattern as AtCoder). Solves Cloudflare Turnstile on login, uploads
source via file input, caches cookies at
`~/.cache/cp-nvim/codeforces-cookies.json` so repeat submits skip login.

**CSES**: persist the API token in credentials via a `credentials`
ndjson event. Subsequent submits validate the cached token with a single
GET before falling back to full login.

Also includes a vimdoc table of contents.
2026-03-05 10:37:39 -05:00
Barrett Ruth
e9f72dfbbc
feat(cses): implement submit via REST API (#299)
## Problem

CSES submit was a stub returning "not yet implemented".

## Solution

Authenticate via web login + API token bridge (POST `/login` form, then
POST `/api/login` and confirm the auth page), submit source to
`/api/courses/problemset/submissions` with base64-encoded content, and
poll for verdict. Uses the same username/password credential model as
AtCoder — no browser dependencies needed. Tested end-to-end with a real
CSES account (verdict: `ACCEPTED`).

Also updates `scraper.lua` to pass the full ndjson event object to
`on_status` and handle `credentials` events for future platform use.
2026-03-05 01:07:57 -05:00
Barrett Ruth
e674265527
fix(setup): prevent spurious swap file warnings on :CP (#297)
## Problem

`setup_problem` explicitly set `swapfile = true` on provisional buffers,
overriding the user's global `noswapfile` setting. The resulting `.swp`
files triggered E325 warnings on subsequent `:e` calls — especially
during the restore path, which redundantly re-opened the current buffer.

## Solution

Remove the `swapfile` override so the user's setting is respected, and
skip the `:e` call in `setup_problem` when the current buffer already
matches the target source file.
2026-03-05 00:37:29 -05:00
Barrett Ruth
c194f12eee
feat(atcoder): extract submit helpers; add live status notifications (#294)
## Problem

`_submit_sync` was a 170-line nested closure with `_solve_turnstile` and
the browser-install block further nested inside it. Status events went
to
stderr, which `run_scraper()` silently discards, leaving the user with a
10–30s silent hang after credential entry. The NDJSON spawn path also
lacked stdin support, so submit had no streaming path at all.

## Solution

Extract `_TURNSTILE_JS`, `_solve_turnstile`, `_ensure_browser`, and
`_submit_headless` to module level in `atcoder.py`; status events
(`installing_browser`, `checking_login`, `logging_in`, `submitting`) now
print to stdout as NDJSON. Add stdin pipe support to the NDJSON spawn
path in `scraper.lua` and switch `M.submit` to streaming with an
`on_status` callback. Wire `on_status` in `submit.lua` to fire
`vim.notify` for each phase transition.
2026-03-04 19:27:29 -05:00
Barrett Ruth
1bc0aa41b6
refactor(cache): nest credentials under platform namespace (#293)
## Problem

Credentials lived in a top-level `_credentials` namespace, requiring
special
preservation logic in `clear_all()` and a separate key hierarchy from
the
platform data they belong to.

## Solution

Move credentials from `_credentials.<platform>` to
`<platform>._credentials`.
Migrate v1 caches on load, skip underscore-prefixed keys when
enumerating
contest IDs and summaries, and simplify `clear_all()` now that no
special
preservation is needed.

Stacked on #292.
2026-03-04 13:37:22 -05:00
Barrett Ruth
49e0ae3885
refactor(credentials): promote login/logout to top-level actions (#292)
## Problem

`:CP credentials login/logout/clear` is verbose and inconsistent with
other
actions that are all top-level (`:CP run`, `:CP submit`, etc.). The
clear-all
subcommand is also unnecessary since re-logging in overwrites existing
credentials.

## Solution

Replace `:CP credentials {login,logout,clear}` with `:CP login
[platform]`
and `:CP logout [platform]`. Remove the clear-all command and the
credentials
subcommand dispatch — login/logout are now regular actions routed
through the
standard action dispatcher.
2026-03-04 13:09:32 -05:00
Barrett Ruth
98ac0aa7a7
refactor(credentials): rename set/clear to login/logout/clear (#291)
## Problem

The `set` and `clear` subcommands don't clearly convey their intent —
`set`
reads like a generic setter rather than an auth action, and `clear`
overloads
single-platform and all-platform semantics in one subcommand.

## Solution

Rename `set` to `login`, split `clear` into `logout` (per-platform,
defaults
to active) and `clear` (all platforms).

New API:
- `:CP credentials login [platform]` — prompt and save credentials
- `:CP credentials logout [platform]` — remove credentials for one
platform
- `:CP credentials clear` — remove all stored credentials
2026-03-04 12:53:37 -05:00
Barrett Ruth
18a60da2d8
misc (#290)
fix atcoder :CP logins
propagate scraper error codes
2026-03-04 12:47:48 -05:00
baaaa95b27 ci: format 2026-03-04 00:50:21 -05:00
900fd70935 fix(edit): clean up buffers on close and support :w to save
Problem: closing the test editor left cp://test-N-* buffers alive,
causing E95 on reopen. The nofile buftype also rejected :w, which
was counterintuitive in an editable grid.

Solution: delete all test buffers in toggle_edit teardown. Switch
buftype to acwrite with a BufWriteCmd autocmd that persists test
cases and clears the modified flag. Hoist save_all_tests above
setup_keybindings so the autocmd closure can reference it.
2026-03-04 00:50:21 -05:00
f17eb32e8c fix: pass in index to :CP panel <n> 2026-03-04 00:30:39 -05:00
4f88b19a82 refactor(run): remove I/O view test navigation keymaps
Problem: <c-n>/<c-p> in the I/O view buffers required the cursor
to leave the source file to work, re-ran the solution on each
press, and gave no indication of which test was active. The
workflow is better served by :CP run <n> for a specific test or
:CP panel for full inspection.

Solution: remove navigate_test, next_test_key/prev_test_key config
options, and the associated current_test_index state field.
2026-03-04 00:26:22 -05:00
217476f5f3 fix(scraper): coerce vim.NIL precision to nil before cache write
Problem: vim.json.decode maps JSON null to vim.NIL (userdata), but
cache.set_test_cases validates precision as number|nil, causing a
type error on every scrape where precision is absent.

Solution: guard the precision field when building the callback
table, converting vim.NIL to nil.
2026-03-04 00:26:22 -05:00
488260f769 ci: format 2026-03-03 16:46:07 -05:00
a04702d87c refactor: replace :CP login with :CP credentials subcommand
Problem: :CP login was a poor API — no way to clear credentials without
raw Lua, and the single command didn't scale to multiple operations.

Solution: replace login with a :CP credentials subcommand following the
same pattern as :CP cache. :CP credentials set [platform] prompts and
saves; :CP credentials clear [platform] removes one or all platforms.
Add cache.clear_credentials(), rename login.lua to credentials.lua,
update parse/dispatch/tab-complete, and rewrite vimdoc accordingly.
2026-03-03 16:46:07 -05:00
3e0b7beabf feat: add :CP login command for explicit credential management
Problem: credentials were only set implicitly on first :CP submit.
There was no way to update wrong credentials, log out, or set
credentials ahead of time without editing the cache JSON manually.

Solution: add :CP login [platform] which always prompts for username
and password and overwrites any saved credentials for that platform.
Omitting the platform falls back to the active platform. Wire the
command through constants, parse_command, handle_command, and add
tab-completion (suggests platform names). Document in vimdoc under
the SUBMIT section and in the commands reference.
2026-03-03 16:28:54 -05:00
a08d1f0c5e refactor(submit): consolidate credentials into main cache file
Problem: credentials were stored in a separate file,
cp-nvim-credentials.json, alongside the main cp-nvim.json cache.
Two files for one plugin's persistent state was unnecessary.

Solution: add get_credentials/set_credentials to cache.lua, storing
credentials under _credentials[platform] in the shared cache. Update
clear_all() to preserve _credentials across cache wipes. Remove the
separate file, load_credentials, and save_credentials from submit.lua.
2026-03-03 16:24:12 -05:00
52cf54d05c docs: document new platforms, commands, and features
Problem: vimdoc only covered AtCoder, Codeforces, and CSES, and had no
entries for race, stress, or submit — all of which shipped in this
branch. The platform list was also stale and the workflow example
pointed users to the AtCoder website to submit manually.

Solution: add CodeChef, USACO, and Kattis to the supported platforms
list and platform-specific usage section (including Kattis's
dual single-problem/full-contest behavior). Document :CP stress,
:CP race, and :CP submit in the commands section, add their <Plug>
mappings, and add dedicated STRESS TESTING, RACE, and SUBMIT sections.
Update get_active_panel() to list its return values, add the
cp.race.status() API under the statusline section, and update the
workflow example step 8 to use :CP submit.
2026-03-03 16:02:09 -05:00
7e48ba05cf feat(kattis): rewrite scraper to support real contests
Problem: scrape_contest_list paginated the entire Kattis problem database
(3000+ problems) treating each as a "contest". scrape_contest_metadata
only handled single-problem access. stream_tests_for_category_async could
not fetch tests for multiple problems in a real contest.

Solution: replace the paginated problem loop with a single GET to
/contests that returns ~150 real timed contests. Add contest-aware path
to scrape_contest_metadata that fetches /contests/{id}/problems and
returns all problem slugs; fall back to single-problem path when the ID
is not a contest. Add _stream_single_problem helper and update
stream_tests_for_category_async to fan out concurrently over all contest
problem slugs before falling back to the single-problem path.
2026-03-03 16:02:09 -05:00
e79f992e0b fix: resolve lua typecheck warnings in race and scraper
Problem: luals flagged undefined-field on uv timer methods because
race_state.timer was untyped, and undefined-field on env_extra/stdin
because they were missing from the run_scraper opts annotation.

Solution: hoist race_state.timer into a typed local before the nil
check so luals can narrow through it; add env_extra and stdin to the
opts inline type in run_scraper.
2026-03-03 15:09:41 -05:00
de5a20c567 fix: resolve typecheck errors in cache, atcoder, cses, and usaco
Problem: lua typecheck flagged missing start_time field on ContestSummary;
ty flagged BeautifulSoup Tag/NavigableString union on csrf_input.get(),
a 3-tuple unpack where _extract_problem_info now returns 4 values in
cses.py, and an untyped list assignment in usaco.py.

Solution: add start_time? to ContestSummary LuaDoc, guard csrf_input
with hasattr check and type: ignore, unpack precision from
_extract_problem_info in cses.py callers, and use cast() in usaco.py.
2026-03-03 15:09:41 -05:00
bad219e578 ci: format 2026-03-03 15:09:41 -05:00
ad90d564ca fix(views): fix interactive guard logic and add stress panel support
Problem: toggle_interactive() had its condition inverted — it blocked
:CP interact on non-interactive problems while showing the message "This
problem is interactive", and passed through on interactive ones. The
panel guard in toggle_panel() was also missing a nil-check on
contest_data.index_map, which could crash if the index map was absent.

Solution: invert the toggle_interactive() guard to match the symmetrical
pattern in toggle_view(), fix the error message to say "not interactive",
and add the missing index_map guard. Also handle the stress panel type
in M.disable() so :CP stress can be toggled off.
2026-03-03 15:09:41 -05:00
bfa2cf893c feat: wire race, stress, and submit commands and keymaps
Add command parsing and dispatch for :CP race, :CP race stop, :CP stress,
and :CP submit. Add tab-completion for race (platform/contest/--lang),
stress (cwd executables at arg 2 and 3), and race stop. Add
<Plug>(cp-stress), <Plug>(cp-submit), and <Plug>(cp-race-stop) keymaps.
2026-03-03 15:09:41 -05:00
a75694e9e0 feat(submit): add solution submission UI
Add submit.lua that reads credentials from a local JSON store (prompting
via vim.ui.input/inputsecret on first use), reads the source file, and
delegates to scraper.submit(). Add language_ids.py with platform-to-
language-ID mappings for atcoder, codeforces, and cses.
2026-03-03 15:09:41 -05:00
39b7b3d83f feat(stress): add stress test loop
Add stress.lua that auto-detects or accepts generator and brute solution
files, compiles C++ if needed, and launches scripts/stress.py in a
terminal buffer with session save/restore and cleanup autocmds.

Add scripts/stress.py as a standalone loop that runs generator → brute →
candidate, comparing outputs and exiting on the first mismatch.
2026-03-03 15:09:41 -05:00
f5c1b978a2 feat(race): add contest countdown timer
Add race.lua with a 1-second vim.uv timer that counts down to a contest
start time and auto-calls setup.setup_contest() at T=0. Exposes
M.start(), M.stop(), and M.status() for command dispatch and statusline
integration.
2026-03-03 15:09:41 -05:00
4e8da84882 feat(platforms): add kattis and usaco scrapers
Add KattisScraper and USACOScraper with contest list, metadata, and
test case fetching. Register kattis and usaco in PLATFORMS,
PLATFORM_DISPLAY_NAMES, and default platform configs.
2026-03-03 15:09:41 -05:00
90bd13580b feat(scraper): add precision extraction, start_time, and submit support
Problem: problem pages contain floating-point precision requirements and
contest start timestamps that were not being extracted or stored. The
submit workflow also needed a foundation in the scraper layer.

Solution: add extract_precision() to base.py and propagate through all
scrapers into cache. Add start_time to ContestSummary and extract it
from AtCoder and Codeforces. Add SubmitResult model, abstract submit()
method, submit CLI case with get_language_id() resolution, stdin/env_extra
support in run_scraper, and a full AtCoder submit implementation; stub
the remaining platforms.
2026-03-03 15:09:41 -05:00
865e3b5928 refactor: rename epsilon to precision in runner
Problem: the tolerance field for floating-point comparison was named
`epsilon`, which is an implementation detail, not the user-visible concept.

Solution: rename to `precision` in run.lua type annotations, internal
variables, and comparison logic.
2026-03-03 15:09:41 -05:00
dc9cb10f3a doc: update 2026-03-03 00:54:13 -05:00
72ea6249f4 ci: format 2026-03-03 00:46:59 -05:00
0e88e0f182 fix(utils): skip uv python setup on NixOS
Problem: uv downloads glibc-linked Python binaries that NixOS cannot
run, causing setup_python_env to fail with exit status 127.

Solution: detect NixOS via /etc/NIXOS and bypass the uv sync path,
falling through directly to nix-based Python discovery.
2026-03-03 00:46:59 -05:00
add022af8c refactor(hooks): replace flat hooks API with setup/on namespaces
Problem: the hooks API conflated distinct lifecycle scopes under a flat
table with inconsistent naming (setup_code, before_run, setup_io_input),
making it hard to reason about when each hook fires.

Solution: introduce two namespaces — hooks.setup.{contest,code,io} for
one-time initialization and hooks.on.{enter,run,debug} for recurring
events. hooks.setup.contest fires once when a contest dir is newly
created; hooks.on.enter is registered as a buffer-scoped BufEnter
autocmd and fires immediately after setup.code. The provisional buffer
setup_code callsite is removed as it ran on an unresolved temp buffer.
2026-03-03 00:46:59 -05:00
6a395af98f feat(config): add templates.cursor_marker for post-template cursor placement
Problem: after apply_template writes a file's content to the buffer,
cursor positioning was left entirely to the user's setup_code hook,
forcing everyone to reimplement the same placeholder-stripping logic.

Solution: add an optional templates.cursor_marker config key. When set,
apply_template scans the written lines for the marker, strips it, and
positions the cursor there via bufwinid so it works in both the
provisional and existing-file paths.
2026-03-03 00:46:59 -05:00
d3324aafa3 fix(config): propagate template through platform overrides
Problem: CpPlatformOverrides lacked a template field and merge_lang()
never copied ov.template into the effective language config, so
per-platform template overrides were silently dropped.

Solution: add template? to CpPlatformOverrides and forward it in
merge_lang(), matching how extension is handled.
2026-03-03 00:46:59 -05:00
24b088e8e9 style(runner): add luacats to compare_outputs 2026-02-26 23:02:40 -05:00
e685a8089f feat: add epsilon tolerance for floating-point output comparison
Problem: output comparison used exact string equality after whitespace
normalisation, causing correct solutions to fail on problems where
floating-point answers are accepted within a tolerance (e.g. 1e-6).

Solution: add an optional ui.panel.epsilon config value. When set,
actual and expected output are compared token-by-token: numeric tokens
are compared with math.abs(a - b) <= epsilon, non-numeric tokens fall
back to exact string equality. Per-problem epsilon can also be stored
in the cache and takes precedence over the global default.
2026-02-26 23:00:35 -05:00
84d12758c2 style(setup): apply stylua formatting 2026-02-26 22:57:39 -05:00
2c25ec616a feat: add per-language template file support
Problem: new solution files were always created empty, requiring users
to manually paste boilerplate or rely on editor snippets that fire
outside cp.nvim's control.

Solution: add an optional template field to the language config. When
set to a file path, its contents are written into every newly created
solution buffer before the setup_code hook runs. Existing files are
never overwritten.
2026-02-26 22:57:39 -05:00
ce5648f9cf docs: add statusline integration recipes
Problem: cp.nvim exposed no documentation showing how to integrate its
runtime state into a statusline. Users had to discover the state module
API by reading source.

Solution: add a STATUSLINE INTEGRATION section to the vimdoc with a
state API reference and recipes for vanilla statusline, lualine, and
heirline. Also anchors the *cp.State* help tag referenced in prose
elsewhere in the doc.
2026-02-26 22:56:36 -05:00
585cf2a077 feat(runner): run test cases in parallel
Problem: test cases were executed sequentially, each waiting for the
previous process to finish before starting the next. On problems with
many test cases this meant wall-clock run time scaled linearly.

Solution: fan out all test case processes simultaneously. A remaining
counter fires on_done once all callbacks have returned. on_each is
called per completion as before; callers that pass on_each ignore its
arguments so the index semantics change is non-breaking.
2026-02-26 22:55:50 -05:00
81f5273840 chore: convert .luarc.json to nested format and add busted library
Problem: .luarc.json used the flat dotted-key format which is not the
canonical LuaLS schema. The busted library was also missing, so LuaLS
could not resolve types in test files.

Solution: rewrite .luarc.json using nested objects and add
${3rd}/busted/library to workspace.library.
2026-02-26 22:47:05 -05:00
d274e0c117 fix(cache): replace stale M._cache field with get_raw_cache accessor
Problem: M._cache = cache_data captured the initial empty table reference
at module load time. After M.load() reassigns cache_data to the decoded
JSON, M._cache is permanently stale and returns the wrong table.

Solution: remove the field assignment and expose get_raw_cache() which
closes over cache_data and always returns the current table.
2026-02-26 22:46:06 -05:00
3cb872a65f fix: replace deprecated vim.loop with vim.uv
Problem: vim.loop is deprecated since Neovim 0.10 in favour of vim.uv.
Five call sites across scraper.lua, setup.lua, utils.lua, and health.lua
still referenced the old alias.

Solution: replace every vim.loop reference with vim.uv directly.
2026-02-26 22:45:07 -05:00
4ccab9ee1f
fix(config): add bit to ignored filetypes 2026-02-26 19:09:16 -05:00
48c08825b2
ci: add missing packages 2026-02-23 18:16:22 -05:00
4d58db8520
ci: nix config migration 2026-02-23 18:04:17 -05:00
591f70a237
build(flake): add lua-language-server to devShell
Problem: lua-language-server is not available in the dev shell, making
it impossible to run local type-checking diagnostics.

Solution: add lua-language-server to the devShell packages.
2026-02-23 17:37:47 -05:00
Harivansh Rathi
e989897c77 style(lua): format URL-open condition in setup
Co-authored-by: Codex <noreply@openai.com>
2026-02-22 22:19:51 -05:00
Harivansh Rathi
1c31abe3d6 fix(open_url): open problem URL when switching problems
Also fix contest-change detection so URL open logic triggers when either platform or contest changes. This makes :CP next/:CP prev and problem jumps open the correct page when open_url is enabled.

Co-authored-by: Codex <noreply@openai.com>
2026-02-22 22:19:51 -05:00
d3ac300ea0 fix(cache): remove unused logger import
Problem: the logger import became unused after replacing the error log
with a silent cache wipe.

Solution: drop the require.
2026-02-22 22:19:51 -05:00
db5bd791f9 fix(cache): invalidate stale cache on version mismatch
Problem: after an install or update, the on-disk cache may contain data
written by an older version of the plugin whose format no longer matches
what the current code expects.

Solution: embed a CACHE_VERSION in every saved cache file. On load, if
the stored version is missing or differs from the current one, wipe the
cache and rewrite it. Corrupt (non-decodable) cache files are handled
the same way instead of only logging an error.
2026-02-22 22:19:51 -05:00
Harivansh Rathi
9fc34cb6fd chore(scraper): add LuaCATS types for env helper
Add LuaCATS annotations to the env conversion helper and drop the table.sort call since ordering is not required by uv.spawn.

Co-authored-by: Codex <noreply@openai.com>
2026-02-22 12:06:01 -05:00
Harivansh Rathi
484a4a56d0 fix(scraper): pass uv.spawn env as KEY=VALUE list
Neovim/libuv spawn expects env as a list of KEY=VALUE strings. Passing the map from vim.fn.environ() can fail process startup with ENOENT, which breaks NDJSON test scraping and surfaces as 'Failed to start scraper process'.\n\nConvert env map to a deterministic list before uv.spawn in the NDJSON scraper path.

Co-authored-by: Codex <noreply@openai.com>
2026-02-22 12:06:01 -05:00
ff5ba39a59 docs: fix dependencies section in readme
Problem: time and timeout were listed as optional dependencies despite
being required for plugin initialization. nix was not mentioned as an
alternative to uv for the Python scraping environment.

Solution: rename section to "Dependencies", list time/timeout first,
and add nix as an alternative to uv for scraping.
2026-02-21 23:59:40 -05:00
760e7d7731 fix(ci): format 2026-02-20 17:49:34 -05:00
49e4233b3f fix: decouple python env setup from config init
Problem: setup_python_env() is called from check_required_runtime()
during config.setup(), which runs on the very first :CP command. The
uv sync and nix build calls use vim.system():wait(), blocking the
Neovim event loop. During the block the UI is frozen and
vim.schedule-based log messages never render, so the user sees an
unresponsive editor with no feedback.

Solution: remove setup_python_env() from check_required_runtime() so
config init is instant. Call it lazily from run_scraper() instead,
only when a scraper subprocess is actually needed. Use vim.notify +
vim.cmd.redraw() before blocking calls so the notification renders
immediately via a forced screen repaint, rather than being queued
behind vim.schedule.
2026-02-18 17:49:04 -05:00
622620f6d0 feat: add debug logging to python env, scraper, and runner
Problem: with debug = true, there is not enough diagnostic output to
troubleshoot environment or execution issues. The resolved python path,
scraper commands, and compile/run shell commands are not logged.

Solution: add logger.log calls at key decision points: python env
resolution (nix vs uv vs discovery), uv sync stderr output, scraper
subprocess commands, and compile/run shell strings. All gated behind
the existing debug flag so they only appear when debug = true.
2026-02-18 17:40:06 -05:00
976838d981 fix: always run uv sync to recover from partial installs
Problem: setup_python_env() skips uv sync when .venv/ exists. If a
previous sync was interrupted (e.g. network timeout), the directory
exists but is broken, and every subsequent session silently uses a
corrupt environment.

Solution: remove the isdirectory guard and always run uv sync. It is
idempotent and near-instant when dependencies are already installed,
so the only cost is one subprocess call per session.
2026-02-18 17:32:12 -05:00
06f72bbe2b fix: only show user-configured platforms in picker
Problem: tbl_deep_extend merges user platforms on top of defaults, so
all four default platforms survive even when the user only configures a
subset. The picker then shows platforms the user never intended to use.

Solution: before the deep merge, prune any default platform not present
in the user's platforms table. This preserves per-platform default
filling (the user doesn't have to re-specify every field) while ensuring
only explicitly configured platforms appear.
2026-02-18 17:29:41 -05:00
6045042dfb fix: surface runtime check failures as clean notifications
Problem: when required dependencies (GNU time/timeout, Python env) are
missing, config.setup() throws a raw error() that surfaces as a Lua
traceback. On macOS without coreutils the message is also redundant
("GNU time not found: GNU time not found") and offers no install hint.

Solution: wrap config.setup() in pcall inside ensure_initialized(),
strip the Lua source-location prefix, and emit a vim.notify at ERROR
level. Add Darwin-specific install guidance to the GNU time/timeout
not-found messages. Pass capability reasons directly instead of
wrapping them in a redundant outer message.
2026-02-18 17:25:50 -05:00
c192afc5d7 fix(ci): format 2026-02-18 14:13:37 -05:00
b6f3398bbc fix(ci): formatting and typing 2026-02-18 14:13:37 -05:00
e02a29bd40 fix(ci): remove duplicate workflows 2026-02-18 14:13:37 -05:00
0f9715298e fix(ci): remove deprecated setups 2026-02-18 14:13:37 -05:00
2148d9bd07 feat(nix): add health 2026-02-18 14:13:37 -05:00
1162e7046b try to fix the setup 2026-02-18 14:13:37 -05:00
b36ffba63a feat(nix): initial flake config; 2026-02-18 14:13:37 -05:00
04d0c124cf
fix: remove flake config 2026-02-17 21:11:11 -05:00
da433068ef
remove straggler file 2026-02-17 21:10:56 -05:00
51504b0121
fix: flake config; 2026-02-17 21:10:29 -05:00
49df7e015d docs: add setup section and reorder vimdoc
Problem: the vimdoc had no setup section, and configuration was buried
after commands and mappings.

Solution: add a cp-setup section with lazy.nvim example and move both
setup and configuration above commands for better discoverability.
2026-02-17 21:09:58 -05:00
029ea125b9 feat: add <Plug> mappings for all primary actions
Problem: users who want keybindings must call vim.cmd('CP run') or
reach into internal Lua modules directly. There is no stable,
discoverable, lazy-load-friendly public API for key binding.

Solution: define 7 <Plug> mappings in plugin/cp.lua that dispatch
through the same handle_command() code path as :CP. Document them
in a new MAPPINGS section in the vimdoc with helptags and an example
config block.
2026-02-07 13:23:45 -05:00
Barrett Ruth
43193c3762
Merge pull request #239 from barrettruth/refactor/remove-cp-config-compat
Some checks are pending
luarocks / ci (push) Waiting to run
luarocks / publish (push) Blocked by required conditions
refactor: remove vim.g.cp_config compatibility shim
2026-02-06 16:42:41 -05:00
de2bc07532 refactor: remove vim.g.cp_config compatibility shim
Problem: the deprecated vim.g.cp_config fallback was kept for
backwards compatibility after the rename to vim.g.cp in v0.7.6.

Solution: drop the shim entirely and update the setup() deprecation
target to v0.7.7.
2026-02-06 16:40:39 -05:00
Barrett Ruth
041e09ac04
Merge pull request #238 from barrettruth/fix/setup-code-hook-language
fix(setup): set language state before setup_code hook on first open
2026-02-06 16:38:41 -05:00
d23b4e59d1 fix(setup): set language state before setup_code hook on first open
Problem: when opening a contest for the first time (metadata not
cached), the setup_code hook fired before state.set_language() was
called, causing state.get_language() to return nil inside the hook.

Solution: call state.set_language(lang) before the hook in the
provisional-buffer branch of setup_contest(). The value is already
computed at that point and is identical to what setup_problem() sets
later, so the early write is idempotent.
2026-02-06 16:29:46 -05:00
Barrett Ruth
19e71ac7fa
Merge pull request #237 from barrettruth/feat/vim-g-update
Some checks are pending
luarocks / ci (push) Waiting to run
luarocks / publish (push) Blocked by required conditions
refactor: rename `vim.g.cp_config` to `vim.g.cp`
2026-02-06 16:07:03 -05:00
a54a06f939 refactor: rename vim.g.cp_config to vim.g.cp 2026-02-06 15:16:21 -05:00
Barrett Ruth
b2c7f16890
Merge pull request #234 from barrettruth/fix/deprecation-warning
Some checks are pending
luarocks / ci (push) Waiting to run
luarocks / publish (push) Blocked by required conditions
fix: add deprecation warning for setup()
2026-02-03 21:51:19 -05:00
276241447c fix: add deprecation warning for setup()
Some checks are pending
luarocks / ci (push) Waiting to run
luarocks / publish (push) Blocked by required conditions
2026-02-03 21:46:47 -05:00
55 changed files with 5466 additions and 1984 deletions

3
.envrc
View file

@ -1,3 +0,0 @@
VIRTUAL_ENV="$PWD/.venv"
PATH_add "$VIRTUAL_ENV/bin"
export VIRTUAL_ENV

View file

@ -1,112 +0,0 @@
name: ci
on:
workflow_call:
pull_request:
branches: [main]
push:
branches: [main]
jobs:
changes:
runs-on: ubuntu-latest
outputs:
lua: ${{ steps.changes.outputs.lua }}
python: ${{ steps.changes.outputs.python }}
steps:
- uses: actions/checkout@v4
- uses: dorny/paths-filter@v3
id: changes
with:
filters: |
lua:
- 'lua/**'
- 'spec/**'
- 'plugin/**'
- 'after/**'
- 'ftdetect/**'
- '*.lua'
- '.luarc.json'
- 'stylua.toml'
- 'selene.toml'
python:
- 'scripts/**'
- 'scrapers/**'
- 'tests/**'
- 'pyproject.toml'
- 'uv.lock'
lua-format:
runs-on: ubuntu-latest
needs: changes
if: ${{ needs.changes.outputs.lua == 'true' }}
steps:
- uses: actions/checkout@v4
- uses: JohnnyMorganz/stylua-action@v4
with:
token: ${{ secrets.GITHUB_TOKEN }}
version: 2.1.0
args: --check .
lua-lint:
runs-on: ubuntu-latest
needs: changes
if: ${{ needs.changes.outputs.lua == 'true' }}
steps:
- uses: actions/checkout@v4
- uses: NTBBloodbath/selene-action@v1.0.0
with:
token: ${{ secrets.GITHUB_TOKEN }}
args: --display-style quiet .
lua-typecheck:
runs-on: ubuntu-latest
needs: changes
if: ${{ needs.changes.outputs.lua == 'true' }}
steps:
- uses: actions/checkout@v4
- uses: mrcjkb/lua-typecheck-action@v0
with:
checklevel: Warning
directories: lua
configpath: .luarc.json
python-format:
runs-on: ubuntu-latest
needs: changes
if: ${{ needs.changes.outputs.python == 'true' }}
steps:
- uses: actions/checkout@v4
- uses: astral-sh/setup-uv@v4
- run: uv tool install ruff
- run: ruff format --check .
python-lint:
runs-on: ubuntu-latest
needs: changes
if: ${{ needs.changes.outputs.python == 'true' }}
steps:
- uses: actions/checkout@v4
- uses: astral-sh/setup-uv@v4
- run: uv tool install ruff
- run: ruff check .
python-typecheck:
runs-on: ubuntu-latest
needs: changes
if: ${{ needs.changes.outputs.python == 'true' }}
steps:
- uses: actions/checkout@v4
- uses: astral-sh/setup-uv@v4
- run: uv sync --dev
- run: uvx ty check .
python-test:
runs-on: ubuntu-latest
needs: changes
if: ${{ needs.changes.outputs.python == 'true' }}
steps:
- uses: actions/checkout@v4
- uses: astral-sh/setup-uv@v4
- run: uv sync --dev
- run: uv run camoufox fetch
- run: uv run pytest tests/ -v

View file

@ -28,6 +28,7 @@ jobs:
- '*.lua'
- '.luarc.json'
- '*.toml'
- 'vim.yaml'
python:
- 'scripts/**/.py'
- 'scrapers/**/*.py'
@ -45,11 +46,8 @@ jobs:
if: ${{ needs.changes.outputs.lua == 'true' }}
steps:
- uses: actions/checkout@v4
- uses: JohnnyMorganz/stylua-action@v4
with:
token: ${{ secrets.GITHUB_TOKEN }}
version: 2.1.0
args: --check .
- uses: cachix/install-nix-action@v31
- run: nix develop --command stylua --check .
lua-lint:
name: Lua Lint Check
@ -58,11 +56,8 @@ jobs:
if: ${{ needs.changes.outputs.lua == 'true' }}
steps:
- uses: actions/checkout@v4
- name: Lint with Selene
uses: NTBBloodbath/selene-action@v1.0.0
with:
token: ${{ secrets.GITHUB_TOKEN }}
args: --display-style quiet .
- uses: cachix/install-nix-action@v31
- run: nix develop --command selene --display-style quiet .
lua-typecheck:
name: Lua Type Check
@ -127,15 +122,5 @@ jobs:
if: ${{ needs.changes.outputs.markdown == 'true' }}
steps:
- uses: actions/checkout@v4
- name: Setup pnpm
uses: pnpm/action-setup@v4
with:
version: 8
- name: Setup Node.js
uses: actions/setup-node@v4
with:
node-version: '20'
- name: Install prettier
run: pnpm add -g prettier@3.1.0
- name: Check markdown formatting with prettier
run: prettier --check .
- uses: cachix/install-nix-action@v31
- run: nix develop --command prettier --check .

View file

@ -44,9 +44,7 @@ jobs:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v4
- name: Install dependencies with pytest
- name: Install dependencies
run: uv sync --dev
- name: Fetch camoufox data
run: uv run camoufox fetch
- name: Run Python tests
run: uv run pytest tests/ -v

3
.gitignore vendored
View file

@ -14,3 +14,6 @@ __pycache__
.claude/
node_modules/
.envrc
.direnv/

View file

@ -1,8 +1,21 @@
{
"runtime.version": "Lua 5.1",
"runtime.path": ["lua/?.lua", "lua/?/init.lua"],
"diagnostics.globals": ["vim"],
"workspace.library": ["$VIMRUNTIME/lua", "${3rd}/luv/library"],
"workspace.checkThirdParty": false,
"completion.callSnippet": "Replace"
"runtime": {
"version": "LuaJIT",
"path": ["lua/?.lua", "lua/?/init.lua"]
},
"diagnostics": {
"globals": ["vim"]
},
"workspace": {
"library": [
"$VIMRUNTIME/lua",
"${3rd}/luv/library",
"${3rd}/busted/library"
],
"checkThirdParty": false,
"ignoreDir": [".direnv"]
},
"completion": {
"callSnippet": "Replace"
}
}

1
.styluaignore Normal file
View file

@ -0,0 +1 @@
.direnv/

View file

@ -28,11 +28,12 @@ Install using your package manager of choice or via
luarocks install cp.nvim
```
## Optional Dependencies
## Dependencies
- [uv](https://docs.astral.sh/uv/) for problem scraping
- GNU [time](https://www.gnu.org/software/time/) and
[timeout](https://www.gnu.org/software/coreutils/manual/html_node/timeout-invocation.html)
- [uv](https://docs.astral.sh/uv/) or [nix](https://nixos.org/) for problem
scraping
## Quick Start

View file

@ -3,13 +3,45 @@
Author: Barrett Ruth <br.barrettruth@gmail.com>
License: Same terms as Vim itself (see |license|)
==============================================================================
CONTENTS *cp-contents*
1. Introduction .................................................. |cp.nvim|
2. Requirements ........................................ |cp-requirements|
3. Setup ........................................................ |cp-setup|
4. Configuration ................................................ |cp-config|
5. Commands .................................................. |cp-commands|
6. Mappings .................................................. |cp-mappings|
7. Language Selection .................................. |cp-lang-selection|
8. Workflow .................................................. |cp-workflow|
9. Workflow Example ............................................ |cp-example|
10. Verdict Formatting ................................. |cp-verdict-format|
11. Picker Integration .......................................... |cp-picker|
12. Picker Keymaps ........................................ |cp-picker-keys|
13. Panel ........................................................ |cp-panel|
14. Interactive Mode .......................................... |cp-interact|
15. Stress Testing .............................................. |cp-stress|
16. Race .......................................................... |cp-race|
17. Credentials ............................................ |cp-credentials|
18. Submit ...................................................... |cp-submit|
19. Open ......................................................... |cp-open|
20. ANSI Colors ................................................... |cp-ansi|
21. Highlight Groups ........................................ |cp-highlights|
22. Terminal Colors .................................... |cp-terminal-colors|
23. Highlight Customization .......................... |cp-highlight-custom|
24. Helpers .................................................... |cp-helpers|
25. Statusline Integration .................................. |cp-statusline|
26. Panel Keymaps .......................................... |cp-panel-keys|
27. File Structure ................................................ |cp-files|
28. Health Check ................................................ |cp-health|
==============================================================================
INTRODUCTION *cp.nvim*
cp.nvim is a competitive programming plugin that automates problem setup,
compilation, and testing workflow for online judges.
Supported platforms (for now!): AtCoder, Codeforces, CSES
Supported platforms: AtCoder, CodeChef, Codeforces, CSES, Kattis, USACO
==============================================================================
REQUIREMENTS *cp-requirements*
@ -19,195 +51,20 @@ REQUIREMENTS *cp-requirements*
- uv package manager (https://docs.astral.sh/uv/)
==============================================================================
COMMANDS *cp-commands*
SETUP *cp-setup*
:CP *:CP*
cp.nvim uses a single :CP command with intelligent argument parsing:
Setup Commands ~
:CP {platform} {contest_id} [--lang {language}]
Full setup: set platform and load contest metadata.
Scrapes test cases and creates source file.
--lang: Use specific language (default: platform default)
Examples: >
:CP codeforces 1933
:CP codeforces 1933 --lang python
<
View Commands ~
:CP run [all|n|n,m,...] [--debug]
Run tests in I/O view (see |cp-io-view|).
Lightweight split showing test verdicts.
Execution modes:
• :CP run Combined: single execution with all tests
(auto-switches to individual when multiple samples)
• :CP run all Individual: N separate executions
• :CP run n Individual: run test n only
• :CP run n,m,... Individual: run specific tests (e.g. nth and mth)
--debug: Use debug build (builds to build/<name>.dbg)
Combined mode runs all test inputs in one execution (matching
platform behavior for multi-test problems). When a problem has
multiple independent sample test cases, :CP run auto-switches to
individual mode to run each sample separately.
Examples: >
:CP run " Combined: all tests, one execution
:CP run all " Individual: all tests, N executions
:CP run 2 " Individual: test 2 only
:CP run 1,3,5 " Individual: tests 1, 3, and 5
:CP run all --debug " Individual with debug build
<
:CP panel [--debug] [n]
Open full-screen test panel (see |cp-panel|).
Aggregate table with diff modes for detailed analysis.
Optional [n] focuses on specific test.
--debug: Use debug build (with sanitizers, etc.)
Examples: >
:CP panel " All tests
:CP panel --debug 3 " Test 3, debug build
<
:CP pick [--lang {language}]
Launch configured picker for interactive
platform/contest selection.
--lang: Pre-select language for chosen contest.
Example: >
:CP pick
:CP pick --lang python
<
:CP interact [script]
Open an interactive terminal for the current problem.
If an executable interactor is provided, runs the compiled
binary against the source file (see
*cp-interact*). Otherwise, runs the source
file. Only valid for interactive problems.
Navigation Commands ~
:CP next [--lang {language}]
Navigate to next problem in current contest.
Stops at last problem (no wrapping).
--lang: Use specific language for next problem.
By default, preserves current file's language if
enabled for the new problem, otherwise uses platform
default.
Examples: >
:CP next
:CP next --lang python
<
:CP prev [--lang {language}]
Navigate to previous problem in current contest.
Stops at first problem (no wrapping).
--lang: Use specific language for previous problem.
By default, preserves current file's language if
enabled for the new problem, otherwise uses platform
default.
Examples: >
:CP prev
:CP prev --lang cpp
<
:CP {problem_id} [--lang {language}]
Jump to problem {problem_id} in a contest.
Requires that a contest has already been set up.
--lang: Use specific language for this problem.
Examples: >
:CP B
:CP C --lang python
<
Edit Commands ~
:CP edit [n]
Open grid test editor showing all test cases.
Tests displayed as 2×N grid (2 rows, N columns):
• Top row: Test inputs (editable)
• Bottom row: Expected outputs (editable)
Optional [n]: Jump cursor to test n's input buffer
Changes saved to both cache and disk on exit,
taking effect immediately in :CP run and CLI.
Keybindings (configurable via |EditConfig|):
q Save all and exit editor
]t Jump to next test column
[t Jump to previous test column
gd Delete current test column
ga Add new test column at end
<c-w> Normal window navigation
Examples: >
:CP edit " Edit all tests
:CP edit 3 " Edit all, start at test 3
<
State Restoration ~
:CP Restore state from current file.
Automatically detects platform, contest, problem,
and language from cached state. Use this after
switching files to restore your CP environment.
Cache Commands ~
:CP cache clear [platform] [contest]
Clear cache data at different granularities:
• No args: Clear all cached data
• [platform]: Clear all data for a platform
• [platform] [contest]: Clear specific contest
Examples: >
:CP cache clear
:CP cache clear codeforces
:CP cache clear codeforces 1848
<
:CP cache read
View the cache in a pretty-printed lua buffer.
Exit with q.
Template Variables ~
*cp-template-vars*
Command templates support variable substitution using {variable} syntax:
• {source} Source file path (e.g. "abc324a.cpp")
• {binary} Output binary path (e.g. "build/abc324a.run" or
"build/abc324a.dbg" for debug builds)
Example template: >
build = { 'g++', '{source}', '-o', '{binary}', '-std=c++17' }
< Would expand to: >
g++ abc324a.cpp -o build/abc324a.run -std=c++17
<
Debug Builds ~
*cp-debug-builds*
The --debug flag uses the debug command configuration instead of build:
• Normal build: commands.build → outputs to build/<name>.run
• Debug build: commands.debug → outputs to build/<name>.dbg
Debug builds typically include sanitizers (address, undefined behavior) to
catch memory errors, buffer overflows, and other issues. Both binaries
coexist, so you can switch between normal and debug mode without
recompiling.
Example debug configuration: >
languages = {
cpp = {
extension = 'cc',
commands = {
build = { 'g++', '-std=c++17', '{source}', '-o', '{binary}' },
run = { '{binary}' },
debug = { 'g++', '-std=c++17', '-fsanitize=address,undefined',
'{source}', '-o', '{binary}' },
}
}
}
Load cp.nvim with your package manager. For example, with lazy.nvim: >lua
{ 'barrettruth/cp.nvim' }
<
The plugin works automatically with no configuration required. For
customization, see |cp-config|.
==============================================================================
CONFIGURATION *cp-config*
Configuration is done via `vim.g.cp_config`. Set this before using the plugin:
Configuration is done via `vim.g.cp`. Set this before using the plugin:
>lua
vim.g.cp_config = {
vim.g.cp = {
languages = {
cpp = {
extension = 'cc',
@ -243,8 +100,19 @@ Configuration is done via `vim.g.cp_config`. Set this before using the plugin:
enabled_languages = { 'cpp', 'python' },
default_language = 'cpp',
},
codechef = {
enabled_languages = { 'cpp', 'python' },
default_language = 'cpp',
},
usaco = {
enabled_languages = { 'cpp', 'python' },
default_language = 'cpp',
},
kattis = {
enabled_languages = { 'cpp', 'python' },
default_language = 'cpp',
},
},
open_url = true,
debug = false,
ui = {
ansi = true,
@ -274,7 +142,7 @@ the default; per-platform overrides can tweak 'extension' or 'commands'.
For example, to run CodeForces contests with Python by default:
>lua
vim.g.cp_config = {
vim.g.cp = {
platforms = {
codeforces = {
default_language = 'python',
@ -285,7 +153,7 @@ For example, to run CodeForces contests with Python by default:
Any language is supported provided the proper configuration. For example, to
run CSES problems with Rust using the single schema:
>lua
vim.g.cp_config = {
vim.g.cp = {
languages = {
rust = {
extension = 'rs',
@ -317,8 +185,6 @@ run CSES problems with Rust using the single schema:
Should return full filename with extension.
(default: concatenates contest_id and problem_id, lowercased)
{ui} (|CpUI|) UI settings: panel, diff backend, picker.
{open_url} (boolean) Open the contest & problem url in the browser
when the contest is first opened.
*CpPlatform*
Fields: ~
@ -395,42 +261,353 @@ run CSES problems with Rust using the single schema:
*cp.Hooks*
Fields: ~
{before_run} (function, optional) Called before test panel opens.
function(state: cp.State)
{before_debug} (function, optional) Called before debug build/run.
function(state: cp.State)
{setup_code} (function, optional) Called after source file is opened.
function(state: cp.State)
{setup_io_input} (function, optional) Called when I/O input buffer created.
function(bufnr: integer, state: cp.State)
Default: helpers.clearcol (removes line numbers/columns)
{setup_io_output} (function, optional) Called when I/O output buffer created.
function(bufnr: integer, state: cp.State)
Default: helpers.clearcol (removes line numbers/columns)
{setup} (|cp.CpSetupHooks|, optional) One-time initialization hooks.
{on} (|cp.CpOnHooks|, optional) Recurring event hooks.
Hook functions receive the cp.nvim state object (|cp.State|). See
*cp.CpSetupHooks*
Fields: ~
{contest} (function, optional) Called once when a contest directory
is first created (not on subsequent visits).
function(state: cp.State)
{code} (function, optional) Called after the source buffer is
opened for the first time (guarded by cp_setup_done).
function(state: cp.State)
{io} (|cp.CpSetupIOHooks|, optional) I/O buffer hooks.
*cp.CpSetupIOHooks*
Fields: ~
{input} (function, optional) Called when the I/O input buffer is
created. function(bufnr: integer, state: cp.State)
Default: helpers.clearcol
{output} (function, optional) Called when the I/O output buffer is
created. function(bufnr: integer, state: cp.State)
Default: helpers.clearcol
*cp.CpOnHooks*
Fields: ~
{enter} (function, optional) Called on every BufEnter on the
solution buffer. Registered as a buffer-scoped autocmd and
fired immediately after setup.code.
function(state: cp.State)
{run} (function, optional) Called before the test panel opens.
function(state: cp.State)
{debug} (function, optional) Called before a debug run.
function(state: cp.State)
All hook functions receive the cp.nvim state object (|cp.State|). See
|lua/cp/state.lua| for available methods and fields.
The I/O buffer hooks are called once when the buffers are first created
during problem setup. Use these to customize buffer appearance (e.g.,
remove line numbers, set custom options). Access helpers via:
>lua
local helpers = require('cp').helpers
<
Example usage:
>lua
hooks = {
setup_code = function(state)
print("Setting up " .. state.get_base_name())
print("Source file: " .. state.get_source_file())
end,
setup_io_input = function(bufnr, state)
-- Custom setup for input buffer
vim.api.nvim_set_option_value('number', false, { buf = bufnr })
end
setup = {
contest = function(state)
local dir = vim.fn.fnamemodify(
state.get_source_file(state.get_language()), ':h')
vim.fn.system({ 'cp', '~/.clang-format', dir .. '/.clang-format' })
end,
code = function(state)
vim.opt_local.foldmethod = 'marker'
vim.diagnostic.enable(false)
end,
},
on = {
enter = function(state) vim.opt_local.winbar = '' end,
run = function(state) require('config.lsp').format() end,
},
}
<
==============================================================================
COMMANDS *cp-commands*
:CP *:CP*
cp.nvim uses a single :CP command with intelligent argument parsing:
Setup Commands ~
:CP {platform} {contest_id} [--lang {language}]
Full setup: set platform and load contest metadata.
Scrapes test cases and creates source file.
--lang: Use specific language (default: platform default)
Examples: >
:CP codeforces 1933
:CP codeforces 1933 --lang python
<
View Commands ~
:CP run [all|n|n,m,...] [--debug]
Run tests in I/O view (see |cp-io-view|).
Lightweight split showing test verdicts.
Execution modes:
• :CP run Combined: single execution with all tests
(auto-switches to individual when multiple samples)
• :CP run all Individual: N separate executions
• :CP run n Individual: run test n only
• :CP run n,m,... Individual: run specific tests (e.g. nth and mth)
--debug: Use debug build (builds to build/<name>.dbg)
Combined mode runs all test inputs in one execution (matching
platform behavior for multi-test problems). When a problem has
multiple independent sample test cases, :CP run auto-switches to
individual mode to run each sample separately.
Examples: >
:CP run " Combined: all tests, one execution
:CP run all " Individual: all tests, N executions
:CP run 2 " Individual: test 2 only
:CP run 1,3,5 " Individual: tests 1, 3, and 5
:CP run all --debug " Individual with debug build
<
:CP panel [--debug] [n]
Open full-screen test panel (see |cp-panel|).
Aggregate table with diff modes for detailed analysis.
Optional [n] focuses on specific test.
--debug: Use debug build (with sanitizers, etc.)
Examples: >
:CP panel " All tests
:CP panel --debug 3 " Test 3, debug build
<
:CP pick [--lang {language}]
Launch configured picker for interactive
platform/contest selection.
--lang: Pre-select language for chosen contest.
Example: >
:CP pick
:CP pick --lang python
<
:CP interact [script]
Open an interactive terminal for the current problem.
If an executable interactor is provided, runs the compiled
binary against the source file (see
*cp-interact*). Otherwise, runs the source
file. Only valid for interactive problems.
:CP stress [generator] [brute]
Start an automated stress test loop against a
brute-force reference. Toggles off if already
running. Without arguments, auto-detects a
generator and brute script in the working
directory. See |cp-stress|.
Navigation Commands ~
:CP next [--lang {language}]
Navigate to next problem in current contest.
Stops at last problem (no wrapping).
--lang: Use specific language for next problem.
By default, preserves current file's language if
enabled for the new problem, otherwise uses platform
default.
Examples: >
:CP next
:CP next --lang python
<
:CP prev [--lang {language}]
Navigate to previous problem in current contest.
Stops at first problem (no wrapping).
--lang: Use specific language for previous problem.
By default, preserves current file's language if
enabled for the new problem, otherwise uses platform
default.
Examples: >
:CP prev
:CP prev --lang cpp
<
:CP {problem_id} [--lang {language}]
Jump to problem {problem_id} in a contest.
Requires that a contest has already been set up.
--lang: Use specific language for this problem.
Examples: >
:CP B
:CP C --lang python
<
Edit Commands ~
:CP edit [n]
Open grid test editor showing all test cases.
Tests displayed as 2×N grid (2 rows, N columns):
• Top row: Test inputs (editable)
• Bottom row: Expected outputs (editable)
Optional [n]: Jump cursor to test n's input buffer
Changes saved to both cache and disk on exit,
taking effect immediately in :CP run and CLI.
Keybindings (configurable via |EditConfig|):
q Save all and exit editor
]t Jump to next test column
[t Jump to previous test column
gd Delete current test column
ga Add new test column at end
<c-w> Normal window navigation
Examples: >
:CP edit " Edit all tests
:CP edit 3 " Edit all, start at test 3
<
Race Commands ~
:CP race {platform} {contest_id} [--lang {language}]
Start a countdown to the contest's scheduled
start time. At T=0, automatically runs:
:CP {platform} {contest_id} [--lang ...]
Examples: >
:CP race atcoder abc400
:CP race codeforces 2100 --lang python
<
:CP race stop
Cancel an active race countdown.
Credential Commands ~
:CP login [platform]
Set or update stored credentials for a platform.
Prompts for username and password, overwriting
any previously saved credentials.
If [platform] is omitted, uses the active platform.
Examples: >
:CP login atcoder
:CP login codeforces
<
:CP logout [platform]
Remove stored credentials for a platform.
If [platform] is omitted, uses the active platform.
Examples: >
:CP logout atcoder
<
Submit Commands ~
:CP submit [--lang {language}]
Submit the current solution to the online
judge. Uses stored credentials (set via
:CP login). Prompts on first use if no
credentials are saved.
--lang: Submit solution for a specific language.
:CP open [problem|contest|standings]
Open the URL for the current problem, contest,
or standings page in the browser via
|vim.ui.open|. Defaults to "problem" if no
argument is given. Warns if the URL is not
available (e.g. CSES has no standings).
Examples: >
:CP open
:CP open contest
:CP open standings
<
State Restoration ~
:CP Restore state from current file.
Automatically detects platform, contest, problem,
and language from cached state. Use this after
switching files to restore your CP environment.
Cache Commands ~
:CP cache clear [platform] [contest]
Clear cache data at different granularities:
• No args: Clear all cached data
• [platform]: Clear all data for a platform
• [platform] [contest]: Clear specific contest
Examples: >
:CP cache clear
:CP cache clear codeforces
:CP cache clear codeforces 1848
<
:CP cache read
View the cache in a pretty-printed lua buffer.
Exit with q.
Template Variables ~
*cp-template-vars*
Command templates support variable substitution using {variable} syntax:
• {source} Source file path (e.g. "abc324a.cpp")
• {binary} Output binary path (e.g. "build/abc324a.run" or
"build/abc324a.dbg" for debug builds)
Example template: >
build = { 'g++', '{source}', '-o', '{binary}', '-std=c++17' }
< Would expand to: >
g++ abc324a.cpp -o build/abc324a.run -std=c++17
<
Debug Builds ~
*cp-debug-builds*
The --debug flag uses the debug command configuration instead of build:
• Normal build: commands.build → outputs to build/<name>.run
• Debug build: commands.debug → outputs to build/<name>.dbg
Debug builds typically include sanitizers (address, undefined behavior) to
catch memory errors, buffer overflows, and other issues. Both binaries
coexist, so you can switch between normal and debug mode without
recompiling.
Example debug configuration: >
languages = {
cpp = {
extension = 'cc',
commands = {
build = { 'g++', '-std=c++17', '{source}', '-o', '{binary}' },
run = { '{binary}' },
debug = { 'g++', '-std=c++17', '-fsanitize=address,undefined',
'{source}', '-o', '{binary}' },
}
}
}
<
==============================================================================
MAPPINGS *cp-mappings*
cp.nvim provides <Plug> mappings for all primary actions. These dispatch
through the same code path as |:CP|.
*<Plug>(cp-run)*
<Plug>(cp-run) Run tests in I/O view. Equivalent to :CP run.
*<Plug>(cp-panel)*
<Plug>(cp-panel) Open full-screen test panel. Equivalent to :CP panel.
*<Plug>(cp-edit)*
<Plug>(cp-edit) Open the test case editor. Equivalent to :CP edit.
*<Plug>(cp-next)*
<Plug>(cp-next) Navigate to the next problem. Equivalent to :CP next.
*<Plug>(cp-prev)*
<Plug>(cp-prev) Navigate to the previous problem. Equivalent to :CP prev.
*<Plug>(cp-pick)*
<Plug>(cp-pick) Launch the contest picker. Equivalent to :CP pick.
*<Plug>(cp-interact)*
<Plug>(cp-interact) Open interactive mode. Equivalent to :CP interact.
*<Plug>(cp-stress)*
<Plug>(cp-stress) Run stress test loop. Equivalent to :CP stress.
*<Plug>(cp-submit)*
<Plug>(cp-submit) Submit current solution. Equivalent to :CP submit.
*<Plug>(cp-open)*
<Plug>(cp-open) Open current problem URL in browser. Equivalent to :CP open.
*<Plug>(cp-race-stop)*
<Plug>(cp-race-stop) Cancel active race countdown. Equivalent to :CP race stop.
Example configuration: >lua
vim.keymap.set('n', '<leader>cr', '<Plug>(cp-run)')
vim.keymap.set('n', '<leader>cp', '<Plug>(cp-panel)')
vim.keymap.set('n', '<leader>ce', '<Plug>(cp-edit)')
vim.keymap.set('n', '<leader>cn', '<Plug>(cp-next)')
vim.keymap.set('n', '<leader>cN', '<Plug>(cp-prev)')
vim.keymap.set('n', '<leader>cc', '<Plug>(cp-pick)')
vim.keymap.set('n', '<leader>ci', '<Plug>(cp-interact)')
vim.keymap.set('n', '<leader>cs', '<Plug>(cp-stress)')
vim.keymap.set('n', '<leader>cu', '<Plug>(cp-submit)')
vim.keymap.set('n', '<leader>cR', '<Plug>(cp-race-stop)')
<
==============================================================================
LANGUAGE SELECTION *cp-lang-selection*
@ -508,6 +685,41 @@ URL format: https://cses.fi/problemset/task/{problem_id}
Usage examples: >
:CP cses dynamic_programming " Set up all problems in dp category
CodeChef ~
*cp-codechef*
URL format: https://www.codechef.com/{contest_id}/problems/{problem_id}
The contest_id is the contest code from the URL (e.g. START209).
Usage examples: >
:CP codechef START209 " Set up codechef.com/START209
USACO ~
*cp-usaco*
URL format: https://usaco.org/index.php?page=viewproblem2&cpid={cpid}
The contest_id combines the abbreviated month, two-digit year, and division
in lowercase, joined by underscores (e.g. dec24_gold, feb23_silver).
Usage examples: >
:CP usaco dec24_gold " Set up December 2024 Gold division
:CP usaco feb23_silver " Set up February 2023 Silver division
Kattis ~
*cp-kattis*
Kattis supports single-problem and full-contest modes.
Single problem — the contest_id is the problem slug from the URL:
URL format: https://open.kattis.com/problems/{slug}
Full contest — the contest_id is the contest ID from the URL. All problems
are set up at once with :CP next/:CP prev navigation:
URL format: https://open.kattis.com/contests/{id}
Usage examples: >
:CP kattis primesieve " Single problem
:CP kattis t8tnpe " Full contest (all problems, AH navigation)
==============================================================================
COMPLETE WORKFLOW EXAMPLE *cp-example*
@ -542,7 +754,9 @@ Example: Setting up and solving AtCoder contest ABC324
:CP
< Automatically restores abc323 contest context
8. Submit solutions on AtCoder website
8. Submit solution: >
:CP submit
< Uses stored credentials and submits to AtCoder.
==============================================================================
I/O VIEW *cp-io-view*
@ -612,9 +826,9 @@ While in the I/O view buffers, use the configured keymaps to cycle through tests
Buffer Customization ~
Use the setup_io_input and setup_io_output hooks (see |cp.Hooks|) to customize
buffer appearance. By default, line numbers and columns are removed via
helpers.clearcol (see |cp-helpers|).
Use the hooks.setup.io.input and hooks.setup.io.output hooks (see |cp.Hooks|)
to customize buffer appearance. By default, line numbers and columns are
removed via helpers.clearcol (see |cp-helpers|).
==============================================================================
VERDICT FORMATTING *cp-verdict-format*
@ -753,6 +967,88 @@ When using :CP interact {interactor}, the interactor must be executable
Keymaps ~
<c-q> Close the terminal and restore the previous layout.
==============================================================================
STRESS TESTING *cp-stress*
Start an automated stress test loop to find inputs where your solution
disagrees with a brute-force reference.
:CP stress [generator] [brute]
Start the stress loop. Toggles off if the loop is already running.
{generator} Generator script path (default: auto-detected).
{brute} Brute-force solution path (default: auto-detected).
Auto-detection looks for files named gen.* and brute.* in the CWD.
The stress panel opens and streams results for each iteration.
On a mismatch, the failing input is displayed in the panel.
Keymaps ~
<c-q> Close the stress panel and restore the previous layout.
==============================================================================
RACE *cp-race*
Count down to a contest's start time and automatically run setup at T=0.
:CP race {platform} {contest_id} [--lang {language}]
Start a countdown timer. At T=0, automatically runs:
:CP {platform} {contest_id} [--lang {language}]
Examples: >
:CP race atcoder abc400
:CP race codeforces 2100 --lang python
<
:CP race stop
Cancel an active race countdown.
Statusline integration: see |cp-race-status|.
==============================================================================
CREDENTIALS *cp-credentials*
Manage stored login credentials for platform submission.
Credentials are stored under _credentials in the main cache file
(stdpath('data')/cp-nvim.json). Use :CP cache read to inspect them.
:CP login [platform]
Set or update credentials for a platform. Prompts for username
and password, overwriting any previously saved values.
Omit [platform] to use the currently active platform.
:CP logout [platform]
Remove stored credentials for a platform.
Omit [platform] to use the currently active platform.
==============================================================================
SUBMIT *cp-submit*
Submit the current solution to the online judge.
:CP submit [--lang {language}]
Submit the current solution. Uses stored credentials (set via
:CP login). Prompts on first use if no credentials are saved.
--lang: Override the language to submit.
Platform support:
AtCoder Fully implemented.
Others Not yet implemented.
==============================================================================
OPEN *cp-open*
Open a platform URL for the current contest in the browser.
:CP open [problem|contest|standings]
Open the URL for the active problem, contest page, or standings.
Defaults to "problem" if no argument is given. Uses |vim.ui.open|.
Warns if the URL is unavailable (e.g. CSES has no standings page).
Platform support:
AtCoder problem, contest, standings
Codeforces problem, contest, standings
CSES problem, contest (no standings)
Others Not yet implemented.
==============================================================================
ANSI COLORS AND HIGHLIGHTING *cp-ansi*
@ -843,6 +1139,124 @@ Functions ~
Parameters: ~
{bufnr} (integer) Buffer handle
==============================================================================
STATUSLINE INTEGRATION *cp-statusline*
cp.nvim exposes its runtime state through a public module that can be queried
from any statusline plugin. Import it with: >lua
local state = require('cp.state')
<
All getters return nil when no problem is active, so guard every value before
use. Calling any getter outside a CP context is safe and has no side effects.
State API ~
*cp.State*
The following getters are available for statusline use:
get_platform() (string?) Platform id. e.g. "codeforces", "atcoder"
get_contest_id() (string?) Contest id. e.g. "1933", "abc324"
get_problem_id() (string?) Problem id. e.g. "A", "B"
get_language() (string?) Language id. e.g. "cpp", "python"
get_base_name() (string?) Derived filename stem. e.g. "1933a"
get_source_file() (string?) Full source filename. e.g. "1933a.cc"
get_active_panel() (string?) One of 'run', 'interactive', 'stress', or
nil when no panel is open.
Race API ~
*cp-race-status*
require('cp.race').status() returns a table describing the race state:
{ active = false }
{ active = true, platform = string, contest_id = string,
remaining_seconds = number }
Recipe: vanilla statusline ~
Set vim.o.statusline from an autocommand so it is recalculated on every
BufEnter: >lua
local function cp_component()
local state = require('cp.state')
local platform = state.get_platform()
if not platform then
return ''
end
local parts = {
platform,
state.get_contest_id(),
state.get_problem_id(),
state.get_language(),
}
local filtered = {}
for _, v in ipairs(parts) do
if v then filtered[#filtered + 1] = v end
end
return '[' .. table.concat(filtered, ' · ') .. ']'
end
vim.api.nvim_create_autocmd({ 'BufEnter', 'User' }, {
callback = function()
vim.o.statusline = cp_component() .. ' %f %=%l:%c'
end
})
<
Recipe: lualine ~
Add a custom component to any lualine section. The cond field hides the
component entirely when no problem is active: >lua
local function cp_lualine()
local state = require('cp.state')
local parts = {
state.get_platform(),
state.get_contest_id(),
state.get_problem_id(),
state.get_language(),
}
local filtered = {}
for _, v in ipairs(parts) do
if v then filtered[#filtered + 1] = v end
end
return table.concat(filtered, ' · ')
end
require('lualine').setup({
sections = {
lualine_c = {
{
cp_lualine,
cond = function()
return require('cp.state').get_platform() ~= nil
end,
},
},
},
})
<
Recipe: heirline ~
Build a heirline component using a provider and condition: >lua
local CpComponent = {
condition = function()
return require('cp.state').get_platform() ~= nil
end,
provider = function()
local state = require('cp.state')
local parts = {
state.get_platform(),
state.get_contest_id(),
state.get_problem_id(),
state.get_language(),
}
local filtered = {}
for _, v in ipairs(parts) do
if v then filtered[#filtered + 1] = v end
end
return '[' .. table.concat(filtered, ' · ') .. ']'
end,
}
<
Include CpComponent in your heirline StatusLine spec wherever desired.
==============================================================================
PANEL KEYMAPS *cp-panel-keys*

43
flake.lock generated Normal file
View file

@ -0,0 +1,43 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1771008912,
"narHash": "sha256-gf2AmWVTs8lEq7z/3ZAsgnZDhWIckkb+ZnAo5RzSxJg=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "a82ccc39b39b621151d6732718e3e250109076fa",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs",
"systems": "systems"
}
},
"systems": {
"locked": {
"lastModified": 1689347949,
"narHash": "sha256-12tWmuL2zgBgZkdoB6qXZsgJEH9LR3oUgpaQq2RbI80=",
"owner": "nix-systems",
"repo": "default-linux",
"rev": "31732fcf5e8fea42e59c2488ad31a0e651500f68",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default-linux",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

137
flake.nix Normal file
View file

@ -0,0 +1,137 @@
{
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
systems.url = "github:nix-systems/default-linux";
};
outputs =
{
self,
nixpkgs,
systems,
}:
let
eachSystem = nixpkgs.lib.genAttrs (import systems);
pkgsFor = system: nixpkgs.legacyPackages.${system};
mkPythonEnv =
pkgs:
pkgs.python312.withPackages (ps: [
ps.backoff
ps.beautifulsoup4
ps.httpx
ps.ndjson
ps.pydantic
ps.requests
]);
mkDevPythonEnv =
pkgs:
pkgs.python312.withPackages (ps: [
ps.backoff
ps.beautifulsoup4
ps.httpx
ps.ndjson
ps.pydantic
ps.requests
ps.pytest
ps.pytest-mock
]);
mkSubmitEnv =
pkgs:
pkgs.buildFHSEnv {
name = "cp-nvim-submit";
targetPkgs =
pkgs: with pkgs; [
uv
alsa-lib
at-spi2-atk
cairo
cups
dbus
fontconfig
freetype
gdk-pixbuf
glib
gtk3
libdrm
libxkbcommon
mesa
libGL
nspr
nss
pango
libx11
libxcomposite
libxdamage
libxext
libxfixes
libxrandr
libxcb
at-spi2-core
expat
libgbm
systemdLibs
zlib
];
runScript = "${pkgs.uv}/bin/uv";
};
mkPlugin =
pkgs:
let
pythonEnv = mkPythonEnv pkgs;
submitEnv = mkSubmitEnv pkgs;
in
pkgs.vimUtils.buildVimPlugin {
pname = "cp-nvim";
version = "0-unstable-${self.shortRev or self.dirtyShortRev or "dev"}";
src = self;
postPatch = ''
substituteInPlace lua/cp/utils.lua \
--replace-fail "local _nix_python = nil" \
"local _nix_python = '${pythonEnv.interpreter}'"
substituteInPlace lua/cp/utils.lua \
--replace-fail "local _nix_submit_cmd = nil" \
"local _nix_submit_cmd = '${submitEnv}/bin/cp-nvim-submit'"
'';
nvimSkipModule = [
"cp.pickers.telescope"
"cp.version"
];
passthru = { inherit pythonEnv submitEnv; };
meta.description = "Competitive programming plugin for Neovim";
};
in
{
overlays.default = final: prev: {
vimPlugins = prev.vimPlugins // {
cp-nvim = mkPlugin final;
};
};
packages = eachSystem (system: {
default = mkPlugin (pkgsFor system);
pythonEnv = mkPythonEnv (pkgsFor system);
submitEnv = mkSubmitEnv (pkgsFor system);
});
formatter = eachSystem (system: (pkgsFor system).nixfmt-tree);
devShells = eachSystem (system: {
default = (pkgsFor system).mkShell {
packages = with (pkgsFor system); [
uv
(mkDevPythonEnv (pkgsFor system))
prettier
ruff
stylua
selene
lua-language-server
ty
];
};
});
};
}

View file

@ -10,11 +10,14 @@
---@field name string
---@field display_name string
---@field url string
---@field contest_url string
---@field standings_url string
---@class ContestSummary
---@field display_name string
---@field name string
---@field id string
---@field start_time? integer
---@class CombinedTest
---@field input string
@ -27,6 +30,7 @@
---@field multi_test? boolean
---@field memory_mb? number
---@field timeout_ms? number
---@field precision? number
---@field combined_test? CombinedTest
---@field test_cases TestCase[]
@ -38,7 +42,8 @@
local M = {}
local logger = require('cp.log')
local CACHE_VERSION = 2
local cache_file = vim.fn.stdpath('data') .. '/cp-nvim.json'
local cache_data = {}
local loaded = false
@ -64,10 +69,30 @@ function M.load()
end
local ok, decoded = pcall(vim.json.decode, table.concat(content, '\n'))
if ok then
if not ok then
cache_data = {}
M.save()
loaded = true
return
end
if decoded._version == 1 then
local old_creds = decoded._credentials
decoded._credentials = nil
if old_creds then
for platform, creds in pairs(old_creds) do
decoded[platform] = decoded[platform] or {}
decoded[platform]._credentials = creds
end
end
decoded._version = CACHE_VERSION
cache_data = decoded
M.save()
elseif decoded._version == CACHE_VERSION then
cache_data = decoded
else
logger.log('Could not decode json in cache file', vim.log.levels.ERROR)
cache_data = {}
M.save()
end
loaded = true
end
@ -78,6 +103,7 @@ function M.save()
vim.schedule(function()
vim.fn.mkdir(vim.fn.fnamemodify(cache_file, ':h'), 'p')
cache_data._version = CACHE_VERSION
local encoded = vim.json.encode(cache_data)
local lines = vim.split(encoded, '\n')
vim.fn.writefile(lines, cache_file)
@ -112,7 +138,9 @@ function M.get_cached_contest_ids(platform)
local contest_ids = {}
for contest_id, _ in pairs(cache_data[platform]) do
table.insert(contest_ids, contest_id)
if contest_id:sub(1, 1) ~= '_' then
table.insert(contest_ids, contest_id)
end
end
table.sort(contest_ids)
return contest_ids
@ -122,12 +150,16 @@ end
---@param contest_id string
---@param problems Problem[]
---@param url string
function M.set_contest_data(platform, contest_id, problems, url)
---@param contest_url string
---@param standings_url string
function M.set_contest_data(platform, contest_id, problems, url, contest_url, standings_url)
vim.validate({
platform = { platform, 'string' },
contest_id = { contest_id, 'string' },
problems = { problems, 'table' },
url = { url, 'string' },
contest_url = { contest_url, 'string' },
standings_url = { standings_url, 'string' },
})
cache_data[platform] = cache_data[platform] or {}
@ -139,6 +171,8 @@ function M.set_contest_data(platform, contest_id, problems, url)
problems = problems,
index_map = {},
url = url,
contest_url = contest_url,
standings_url = standings_url,
}
for i, p in ipairs(out.problems) do
out.index_map[p.id] = i
@ -148,6 +182,25 @@ function M.set_contest_data(platform, contest_id, problems, url)
M.save()
end
---@param platform string?
---@param contest_id string?
---@param problem_id string?
---@return { problem: string|nil, contest: string|nil, standings: string|nil }|nil
function M.get_open_urls(platform, contest_id, problem_id)
if not platform or not contest_id then
return nil
end
if not cache_data[platform] or not cache_data[platform][contest_id] then
return nil
end
local cd = cache_data[platform][contest_id]
return {
problem = cd.url ~= '' and problem_id and string.format(cd.url, problem_id) or nil,
contest = cd.contest_url ~= '' and cd.contest_url or nil,
standings = cd.standings_url ~= '' and cd.standings_url or nil,
}
end
---@param platform string
---@param contest_id string
function M.clear_contest_data(platform, contest_id)
@ -222,7 +275,8 @@ function M.set_test_cases(
timeout_ms,
memory_mb,
interactive,
multi_test
multi_test,
precision
)
vim.validate({
platform = { platform, 'string' },
@ -234,6 +288,7 @@ function M.set_test_cases(
memory_mb = { memory_mb, { 'number', 'nil' }, true },
interactive = { interactive, { 'boolean', 'nil' }, true },
multi_test = { multi_test, { 'boolean', 'nil' }, true },
precision = { precision, { 'number', 'nil' }, true },
})
local index = cache_data[platform][contest_id].index_map[problem_id]
@ -244,6 +299,7 @@ function M.set_test_cases(
cache_data[platform][contest_id].problems[index].memory_mb = memory_mb
cache_data[platform][contest_id].problems[index].interactive = interactive
cache_data[platform][contest_id].problems[index].multi_test = multi_test
cache_data[platform][contest_id].problems[index].precision = precision
M.save()
end
@ -265,6 +321,34 @@ function M.get_constraints(platform, contest_id, problem_id)
return problem_data.timeout_ms, problem_data.memory_mb
end
---@param platform string
---@param contest_id string
---@param problem_id? string
---@return number?
function M.get_precision(platform, contest_id, problem_id)
vim.validate({
platform = { platform, 'string' },
contest_id = { contest_id, 'string' },
problem_id = { problem_id, { 'string', 'nil' }, true },
})
if
not cache_data[platform]
or not cache_data[platform][contest_id]
or not cache_data[platform][contest_id].index_map
then
return nil
end
local index = cache_data[platform][contest_id].index_map[problem_id]
if not index then
return nil
end
local problem_data = cache_data[platform][contest_id].problems[index]
return problem_data and problem_data.precision or nil
end
---@param file_path string
---@return FileState|nil
function M.get_file_state(file_path)
@ -295,11 +379,13 @@ end
function M.get_contest_summaries(platform)
local contest_list = {}
for contest_id, contest_data in pairs(cache_data[platform] or {}) do
table.insert(contest_list, {
id = contest_id,
name = contest_data.name,
display_name = contest_data.display_name,
})
if contest_id:sub(1, 1) ~= '_' then
table.insert(contest_list, {
id = contest_id,
name = contest_data.name,
display_name = contest_data.display_name,
})
end
end
return contest_list
end
@ -312,11 +398,49 @@ function M.set_contest_summaries(platform, contests)
cache_data[platform][contest.id] = cache_data[platform][contest.id] or {}
cache_data[platform][contest.id].display_name = contest.display_name
cache_data[platform][contest.id].name = contest.name
if contest.start_time then
cache_data[platform][contest.id].start_time = contest.start_time
end
end
M.save()
end
---@param platform string
---@param contest_id string
---@return integer?
function M.get_contest_start_time(platform, contest_id)
if not cache_data[platform] or not cache_data[platform][contest_id] then
return nil
end
return cache_data[platform][contest_id].start_time
end
---@param platform string
---@return table?
function M.get_credentials(platform)
if not cache_data[platform] then
return nil
end
return cache_data[platform]._credentials
end
---@param platform string
---@param creds table
function M.set_credentials(platform, creds)
cache_data[platform] = cache_data[platform] or {}
cache_data[platform]._credentials = creds
M.save()
end
---@param platform string
function M.clear_credentials(platform)
if cache_data[platform] then
cache_data[platform]._credentials = nil
end
M.save()
end
function M.clear_all()
cache_data = {}
M.save()
@ -338,6 +462,8 @@ function M.get_data_pretty()
return vim.inspect(cache_data)
end
M._cache = cache_data
function M.get_raw_cache()
return cache_data
end
return M

View file

@ -47,26 +47,30 @@ function M.handle_cache_command(cmd)
constants.PLATFORM_DISPLAY_NAMES[cmd.platform],
cmd.contest
),
vim.log.levels.INFO,
true
{ level = vim.log.levels.INFO, override = true }
)
else
logger.log(("Unknown platform '%s'."):format(cmd.platform), vim.log.levels.ERROR)
logger.log(
("Unknown platform '%s'."):format(cmd.platform),
{ level = vim.log.levels.ERROR }
)
end
elseif cmd.platform then
if vim.tbl_contains(platforms, cmd.platform) then
cache.clear_platform(cmd.platform)
logger.log(
("Cache cleared for platform '%s'"):format(constants.PLATFORM_DISPLAY_NAMES[cmd.platform]),
vim.log.levels.INFO,
true
{ level = vim.log.levels.INFO, override = true }
)
else
logger.log(("Unknown platform '%s'."):format(cmd.platform), vim.log.levels.ERROR)
logger.log(
("Unknown platform '%s'."):format(cmd.platform),
{ level = vim.log.levels.ERROR }
)
end
else
cache.clear_all()
logger.log('Cache cleared', vim.log.levels.INFO, true)
logger.log('Cache cleared', { level = vim.log.levels.INFO, override = true })
end
end
end

View file

@ -11,11 +11,14 @@ local actions = constants.ACTIONS
---@field type string
---@field error string?
---@field action? string
---@field requires_context? boolean
---@field message? string
---@field contest? string
---@field platform? string
---@field problem_id? string
---@field interactor_cmd? string
---@field generator_cmd? string
---@field brute_cmd? string
---@field test_index? integer
---@field test_indices? integer[]
---@field mode? string
@ -23,6 +26,20 @@ local actions = constants.ACTIONS
---@field language? string
---@field subcommand? string
---@param str string
---@return string
local function canonicalize_cf_contest(str)
local id = str:match('/contest/(%d+)') or str:match('/problemset/problem/(%d+)')
if id then
return id
end
local num = str:match('^(%d+)[A-Za-z]')
if num then
return num
end
return str
end
--- Turn raw args into normalized structure to later dispatch
---@param args string[] The raw command-line mode args
---@return ParsedCommand
@ -53,13 +70,48 @@ local function parse_command(args)
else
return { type = 'error', message = 'unknown cache subcommand: ' .. subcommand }
end
elseif first == 'race' then
if args[2] == 'stop' then
return { type = 'action', action = 'race_stop', requires_context = false }
end
if not args[2] or not args[3] then
return {
type = 'error',
message = 'Usage: :CP race <platform> <contest_id> [--lang <lang>]',
}
end
local language = nil
if args[4] == '--lang' and args[5] then
language = args[5]
end
return {
type = 'action',
action = 'race',
requires_context = false,
platform = args[2],
contest = args[3],
language = language,
}
elseif first == 'interact' then
local inter = args[2]
if inter and inter ~= '' then
return { type = 'action', action = 'interact', interactor_cmd = inter }
return {
type = 'action',
action = 'interact',
requires_context = true,
interactor_cmd = inter,
}
else
return { type = 'action', action = 'interact' }
return { type = 'action', action = 'interact', requires_context = true }
end
elseif first == 'stress' then
return {
type = 'action',
action = 'stress',
requires_context = true,
generator_cmd = args[2],
brute_cmd = args[3],
}
elseif first == 'edit' then
local test_index = nil
if #args >= 2 then
@ -75,7 +127,7 @@ local function parse_command(args)
end
test_index = idx
end
return { type = 'action', action = 'edit', test_index = test_index }
return { type = 'action', action = 'edit', requires_context = true, test_index = test_index }
elseif first == 'run' or first == 'panel' then
local debug = false
local test_indices = nil
@ -188,10 +240,28 @@ local function parse_command(args)
return {
type = 'action',
action = first,
requires_context = true,
test_indices = test_indices,
debug = debug,
mode = mode,
}
elseif first == 'open' then
local target = args[2] or 'problem'
if not vim.tbl_contains({ 'problem', 'contest', 'standings' }, target) then
return { type = 'error', message = 'Usage: :CP open [problem|contest|standings]' }
end
return { type = 'action', action = 'open', requires_context = true, subcommand = target }
elseif first == 'pick' then
local language = nil
if #args >= 3 and args[2] == '--lang' then
language = args[3]
elseif #args >= 2 and args[2] ~= nil and args[2]:sub(1, 2) ~= '--' then
return {
type = 'error',
message = ("Unknown argument '%s' for action '%s'"):format(args[2], first),
}
end
return { type = 'action', action = 'pick', requires_context = false, language = language }
else
local language = nil
if #args >= 3 and args[2] == '--lang' then
@ -202,7 +272,7 @@ local function parse_command(args)
message = ("Unknown argument '%s' for action '%s'"):format(args[2], first),
}
end
return { type = 'action', action = first, language = language }
return { type = 'action', action = first, requires_context = true, language = language }
end
end
@ -213,16 +283,27 @@ local function parse_command(args)
message = 'Too few arguments - specify a contest.',
}
elseif #args == 2 then
if args[2] == 'login' or args[2] == 'logout' then
return { type = 'action', action = args[2], requires_context = false, platform = first }
end
local contest = args[2]
if first == 'codeforces' then
contest = canonicalize_cf_contest(contest)
end
return {
type = 'contest_setup',
platform = first,
contest = args[2],
contest = contest,
}
elseif #args == 4 and args[3] == '--lang' then
local contest = args[2]
if first == 'codeforces' then
contest = canonicalize_cf_contest(contest)
end
return {
type = 'contest_setup',
platform = first,
contest = args[2],
contest = contest,
language = args[4],
}
else
@ -255,7 +336,7 @@ function M.handle_command(opts)
local cmd = parse_command(opts.fargs)
if cmd.type == 'error' then
logger.log(cmd.message, vim.log.levels.ERROR)
logger.log(cmd.message, { level = vim.log.levels.ERROR })
return
end
@ -263,6 +344,13 @@ function M.handle_command(opts)
local restore = require('cp.restore')
restore.restore_from_current_file()
elseif cmd.type == 'action' then
if cmd.requires_context and not state.get_platform() then
local restore = require('cp.restore')
if not restore.restore_from_current_file() then
return
end
end
local setup = require('cp.setup')
local ui = require('cp.ui.views')
@ -285,6 +373,32 @@ function M.handle_command(opts)
elseif cmd.action == 'edit' then
local edit = require('cp.ui.edit')
edit.toggle_edit(cmd.test_index)
elseif cmd.action == 'stress' then
require('cp.stress').toggle(cmd.generator_cmd, cmd.brute_cmd)
elseif cmd.action == 'submit' then
require('cp.submit').submit({ language = cmd.language })
elseif cmd.action == 'race' then
require('cp.race').start(cmd.platform, cmd.contest, cmd.language)
elseif cmd.action == 'race_stop' then
require('cp.race').stop()
elseif cmd.action == 'open' then
local cache = require('cp.cache')
cache.load()
local urls =
cache.get_open_urls(state.get_platform(), state.get_contest_id(), state.get_problem_id())
local url = urls and urls[cmd.subcommand]
if not url or url == '' then
logger.log(
("No URL available for '%s'"):format(cmd.subcommand),
{ level = vim.log.levels.WARN }
)
return
end
vim.ui.open(url)
elseif cmd.action == 'login' then
require('cp.credentials').login(cmd.platform)
elseif cmd.action == 'logout' then
require('cp.credentials').logout(cmd.platform)
end
elseif cmd.type == 'problem_jump' then
local platform = state.get_platform()
@ -292,7 +406,7 @@ function M.handle_command(opts)
local problem_id = cmd.problem_id
if not (platform and contest_id) then
logger.log('No contest is currently active.', vim.log.levels.ERROR)
logger.log('No contest is currently active.', { level = vim.log.levels.ERROR })
return
end
@ -307,7 +421,7 @@ function M.handle_command(opts)
contest_id,
problem_id
),
vim.log.levels.ERROR
{ level = vim.log.levels.ERROR }
)
return
end

View file

@ -12,7 +12,7 @@ function M.handle_pick_action(language)
if not (config.ui and config.ui.picker) then
logger.log(
'No picker configured. Set ui.picker = "{telescope,fzf-lua}" in your config.',
vim.log.levels.ERROR
{ level = vim.log.levels.ERROR }
)
return
end
@ -25,13 +25,13 @@ function M.handle_pick_action(language)
if not ok then
logger.log(
'telescope.nvim is not available. Install telescope.nvim xor change your picker config.',
vim.log.levels.ERROR
{ level = vim.log.levels.ERROR }
)
return
end
local ok_cp, telescope_picker = pcall(require, 'cp.pickers.telescope')
if not ok_cp then
logger.log('Failed to load telescope integration.', vim.log.levels.ERROR)
logger.log('Failed to load telescope integration.', { level = vim.log.levels.ERROR })
return
end
@ -41,13 +41,13 @@ function M.handle_pick_action(language)
if not ok then
logger.log(
'fzf-lua is not available. Install fzf-lua or change your picker config',
vim.log.levels.ERROR
{ level = vim.log.levels.ERROR }
)
return
end
local ok_cp, fzf_picker = pcall(require, 'cp.pickers.fzf_lua')
if not ok_cp then
logger.log('Failed to load fzf-lua integration.', vim.log.levels.ERROR)
logger.log('Failed to load fzf-lua integration.', { level = vim.log.levels.ERROR })
return
end

View file

@ -7,10 +7,15 @@
---@class CpLanguage
---@field extension string
---@field commands CpLangCommands
---@field template? string
---@class CpTemplatesConfig
---@field cursor_marker? string
---@class CpPlatformOverrides
---@field extension? string
---@field commands? CpLangCommands
---@field template? string
---@class CpPlatform
---@field enabled_languages string[]
@ -20,6 +25,7 @@
---@class PanelConfig
---@field diff_modes string[]
---@field max_output_lines integer
---@field precision number?
---@class DiffGitConfig
---@field args string[]
@ -27,12 +33,23 @@
---@class DiffConfig
---@field git DiffGitConfig
---@class CpSetupIOHooks
---@field input? fun(bufnr: integer, state: cp.State)
---@field output? fun(bufnr: integer, state: cp.State)
---@class CpSetupHooks
---@field contest? fun(state: cp.State)
---@field code? fun(state: cp.State)
---@field io? CpSetupIOHooks
---@class CpOnHooks
---@field enter? fun(state: cp.State)
---@field run? fun(state: cp.State)
---@field debug? fun(state: cp.State)
---@class Hooks
---@field before_run? fun(state: cp.State)
---@field before_debug? fun(state: cp.State)
---@field setup_code? fun(state: cp.State)
---@field setup_io_input? fun(bufnr: integer, state: cp.State)
---@field setup_io_output? fun(bufnr: integer, state: cp.State)
---@field setup? CpSetupHooks
---@field on? CpOnHooks
---@class VerdictFormatData
---@field index integer
@ -61,8 +78,6 @@
---@class RunConfig
---@field width number
---@field next_test_key string|nil
---@field prev_test_key string|nil
---@field format_verdict VerdictFormatter
---@class EditConfig
@ -83,9 +98,9 @@
---@class cp.Config
---@field languages table<string, CpLanguage>
---@field platforms table<string, CpPlatform>
---@field templates? CpTemplatesConfig
---@field hooks Hooks
---@field debug boolean
---@field open_url boolean
---@field scrapers string[]
---@field filename? fun(contest: string, contest_id: string, problem_id?: string, config: cp.Config, language?: string): string
---@field ui CpUI
@ -102,7 +117,6 @@ local utils = require('cp.utils')
-- defaults per the new single schema
---@type cp.Config
M.defaults = {
open_url = false,
languages = {
cpp = {
extension = 'cc',
@ -147,13 +161,29 @@ M.defaults = {
enabled_languages = { 'cpp', 'python' },
default_language = 'cpp',
},
kattis = {
enabled_languages = { 'cpp', 'python' },
default_language = 'cpp',
},
usaco = {
enabled_languages = { 'cpp', 'python' },
default_language = 'cpp',
},
},
hooks = {
before_run = nil,
before_debug = nil,
setup_code = nil,
setup_io_input = helpers.clearcol,
setup_io_output = helpers.clearcol,
setup = {
contest = nil,
code = nil,
io = {
input = helpers.clearcol,
output = helpers.clearcol,
},
},
on = {
enter = nil,
run = nil,
debug = nil,
},
},
debug = false,
scrapers = constants.PLATFORMS,
@ -162,8 +192,6 @@ M.defaults = {
ansi = true,
run = {
width = 0.3,
next_test_key = '<c-n>',
prev_test_key = '<c-p>',
format_verdict = helpers.default_verdict_formatter,
},
edit = {
@ -173,7 +201,11 @@ M.defaults = {
add_test_key = 'ga',
save_and_exit_key = 'q',
},
panel = { diff_modes = { 'side-by-side', 'git', 'vim' }, max_output_lines = 50 },
panel = {
diff_modes = { 'side-by-side', 'git', 'vim' },
max_output_lines = 50,
precision = nil,
},
diff = {
git = {
args = { 'diff', '--no-index', '--word-diff=plain', '--word-diff-regex=.', '--no-prefix' },
@ -215,6 +247,10 @@ local function validate_language(id, lang)
commands = { lang.commands, { 'table' } },
})
if lang.template ~= nil then
vim.validate({ template = { lang.template, 'string' } })
end
if not lang.commands.run then
error(('[cp.nvim] languages.%s.commands.run is required'):format(id))
end
@ -253,6 +289,9 @@ local function merge_lang(base, ov)
if ov.commands then
out.commands = vim.tbl_deep_extend('force', out.commands or {}, ov.commands or {})
end
if ov.template then
out.template = ov.template
end
return out
end
@ -292,7 +331,15 @@ end
---@return cp.Config
function M.setup(user_config)
vim.validate({ user_config = { user_config, { 'table', 'nil' }, true } })
local cfg = vim.tbl_deep_extend('force', vim.deepcopy(M.defaults), user_config or {})
local defaults = vim.deepcopy(M.defaults)
if user_config and user_config.platforms then
for plat in pairs(defaults.platforms) do
if not user_config.platforms[plat] then
defaults.platforms[plat] = nil
end
end
end
local cfg = vim.tbl_deep_extend('force', defaults, user_config or {})
if not next(cfg.languages) then
error('[cp.nvim] At least one language must be configured')
@ -302,11 +349,17 @@ function M.setup(user_config)
error('[cp.nvim] At least one platform must be configured')
end
if cfg.templates ~= nil then
vim.validate({ templates = { cfg.templates, 'table' } })
if cfg.templates.cursor_marker ~= nil then
vim.validate({ cursor_marker = { cfg.templates.cursor_marker, 'string' } })
end
end
vim.validate({
hooks = { cfg.hooks, { 'table' } },
ui = { cfg.ui, { 'table' } },
debug = { cfg.debug, { 'boolean', 'nil' }, true },
open_url = { cfg.open_url, { 'boolean', 'nil' }, true },
filename = { cfg.filename, { 'function', 'nil' }, true },
scrapers = {
cfg.scrapers,
@ -323,12 +376,29 @@ function M.setup(user_config)
end,
('one of {%s}'):format(table.concat(constants.PLATFORMS, ',')),
},
before_run = { cfg.hooks.before_run, { 'function', 'nil' }, true },
before_debug = { cfg.hooks.before_debug, { 'function', 'nil' }, true },
setup_code = { cfg.hooks.setup_code, { 'function', 'nil' }, true },
setup_io_input = { cfg.hooks.setup_io_input, { 'function', 'nil' }, true },
setup_io_output = { cfg.hooks.setup_io_output, { 'function', 'nil' }, true },
})
if cfg.hooks.setup ~= nil then
vim.validate({ setup = { cfg.hooks.setup, 'table' } })
vim.validate({
contest = { cfg.hooks.setup.contest, { 'function', 'nil' }, true },
code = { cfg.hooks.setup.code, { 'function', 'nil' }, true },
})
if cfg.hooks.setup.io ~= nil then
vim.validate({ io = { cfg.hooks.setup.io, 'table' } })
vim.validate({
input = { cfg.hooks.setup.io.input, { 'function', 'nil' }, true },
output = { cfg.hooks.setup.io.output, { 'function', 'nil' }, true },
})
end
end
if cfg.hooks.on ~= nil then
vim.validate({ on = { cfg.hooks.on, 'table' } })
vim.validate({
enter = { cfg.hooks.on.enter, { 'function', 'nil' }, true },
run = { cfg.hooks.on.run, { 'function', 'nil' }, true },
debug = { cfg.hooks.on.debug, { 'function', 'nil' }, true },
})
end
local layouts = require('cp.ui.layouts')
vim.validate({
@ -355,6 +425,13 @@ function M.setup(user_config)
end,
'positive integer',
},
precision = {
cfg.ui.panel.precision,
function(v)
return v == nil or (type(v) == 'number' and v >= 0)
end,
'nil or non-negative number',
},
git = { cfg.ui.diff.git, { 'table' } },
git_args = { cfg.ui.diff.git.args, is_string_list, 'string[]' },
width = {
@ -364,20 +441,6 @@ function M.setup(user_config)
end,
'decimal between 0 and 1',
},
next_test_key = {
cfg.ui.run.next_test_key,
function(v)
return v == nil or (type(v) == 'string' and #v > 0)
end,
'nil or non-empty string',
},
prev_test_key = {
cfg.ui.run.prev_test_key,
function(v)
return v == nil or (type(v) == 'string' and #v > 0)
end,
'nil or non-empty string',
},
format_verdict = {
cfg.ui.run.format_verdict,
'function',

View file

@ -1,13 +1,28 @@
local M = {}
M.PLATFORMS = { 'atcoder', 'codechef', 'codeforces', 'cses' }
M.ACTIONS = { 'run', 'panel', 'next', 'prev', 'pick', 'cache', 'interact', 'edit' }
M.PLATFORMS = { 'atcoder', 'codechef', 'codeforces', 'cses', 'kattis', 'usaco' }
M.ACTIONS = {
'run',
'panel',
'next',
'prev',
'pick',
'cache',
'interact',
'edit',
'race',
'stress',
'submit',
'open',
}
M.PLATFORM_DISPLAY_NAMES = {
atcoder = 'AtCoder',
codechef = 'CodeChef',
codeforces = 'CodeForces',
cses = 'CSES',
kattis = 'Kattis',
usaco = 'USACO',
}
M.CPP = 'cpp'

86
lua/cp/credentials.lua Normal file
View file

@ -0,0 +1,86 @@
local M = {}
local cache = require('cp.cache')
local constants = require('cp.constants')
local logger = require('cp.log')
local state = require('cp.state')
local STATUS_MESSAGES = {
checking_login = 'Checking existing session...',
logging_in = 'Logging in...',
installing_browser = 'Installing browser...',
}
function M.login(platform)
platform = platform or state.get_platform()
if not platform then
logger.log(
'No platform specified. Usage: :CP <platform> login',
{ level = vim.log.levels.ERROR }
)
return
end
local display = constants.PLATFORM_DISPLAY_NAMES[platform] or platform
vim.ui.input({ prompt = display .. ' username: ' }, function(username)
if not username or username == '' then
logger.log('Cancelled', { level = vim.log.levels.WARN })
return
end
vim.fn.inputsave()
local password = vim.fn.inputsecret(display .. ' password: ')
vim.fn.inputrestore()
if not password or password == '' then
logger.log('Cancelled', { level = vim.log.levels.WARN })
return
end
cache.load()
local existing = cache.get_credentials(platform) or {}
local credentials = {
username = username,
password = password,
}
if existing.token then
credentials.token = existing.token
end
local scraper = require('cp.scraper')
scraper.login(platform, credentials, function(ev)
vim.schedule(function()
local msg = STATUS_MESSAGES[ev.status] or ev.status
logger.log(display .. ': ' .. msg, { level = vim.log.levels.INFO, override = true })
end)
end, function(result)
vim.schedule(function()
if result.success then
logger.log(
display .. ' login successful',
{ level = vim.log.levels.INFO, override = true }
)
else
local err = result.error or 'unknown error'
logger.log(display .. ' login failed: ' .. err, { level = vim.log.levels.ERROR })
end
end)
end)
end)
end
function M.logout(platform)
platform = platform or state.get_platform()
if not platform then
logger.log(
'No platform specified. Usage: :CP <platform> logout',
{ level = vim.log.levels.ERROR }
)
return
end
local display = constants.PLATFORM_DISPLAY_NAMES[platform] or platform
cache.load()
cache.clear_credentials(platform)
logger.log(display .. ' credentials cleared', { level = vim.log.levels.INFO, override = true })
end
return M

View file

@ -5,33 +5,50 @@ local utils = require('cp.utils')
local function check()
vim.health.start('cp.nvim [required] ~')
utils.setup_python_env()
if vim.fn.has('nvim-0.10.0') == 1 then
vim.health.ok('Neovim 0.10.0+ detected')
else
vim.health.error('cp.nvim requires Neovim 0.10.0+')
end
local uname = vim.loop.os_uname()
local uname = vim.uv.os_uname()
if uname.sysname == 'Windows_NT' then
vim.health.error('Windows is not supported')
end
if vim.fn.executable('uv') == 1 then
vim.health.ok('uv executable found')
local r = vim.system({ 'uv', '--version' }, { text = true }):wait()
if utils.is_nix_build() then
local source = utils.is_nix_discovered() and 'runtime discovery' or 'flake install'
vim.health.ok('Nix Python environment detected (' .. source .. ')')
local py = utils.get_nix_python()
vim.health.info('Python: ' .. py)
local r = vim.system({ py, '--version' }, { text = true }):wait()
if r.code == 0 then
vim.health.info('uv version: ' .. r.stdout:gsub('\n', ''))
vim.health.info('Python version: ' .. r.stdout:gsub('\n', ''))
end
else
vim.health.warn('uv not found (install https://docs.astral.sh/uv/ for scraping)')
end
if vim.fn.executable('uv') == 1 then
vim.health.ok('uv executable found')
local r = vim.system({ 'uv', '--version' }, { text = true }):wait()
if r.code == 0 then
vim.health.info('uv version: ' .. r.stdout:gsub('\n', ''))
end
else
vim.health.warn('uv not found (install https://docs.astral.sh/uv/ for scraping)')
end
local plugin_path = utils.get_plugin_path()
local venv_dir = plugin_path .. '/.venv'
if vim.fn.isdirectory(venv_dir) == 1 then
vim.health.ok('Python virtual environment found at ' .. venv_dir)
else
vim.health.info('Python virtual environment not set up (created on first scrape)')
if vim.fn.executable('nix') == 1 then
vim.health.info('nix available but Python environment not resolved via nix')
end
local plugin_path = utils.get_plugin_path()
local venv_dir = plugin_path .. '/.venv'
if vim.fn.isdirectory(venv_dir) == 1 then
vim.health.ok('Python virtual environment found at ' .. venv_dir)
else
vim.health.info('Python virtual environment not set up (created on first scrape)')
end
end
local time_cap = utils.time_capability()
@ -41,7 +58,7 @@ local function check()
vim.health.error('GNU time not found: ' .. (time_cap.reason or ''))
end
local timeout_cap = utils.time_capability()
local timeout_cap = utils.timeout_capability()
if timeout_cap.ok then
vim.health.ok('GNU timeout found: ' .. timeout_cap.path)
else

View file

@ -7,7 +7,7 @@ local logger = require('cp.log')
M.helpers = helpers
if vim.fn.has('nvim-0.10.0') == 0 then
logger.log('Requires nvim-0.10.0+', vim.log.levels.ERROR)
logger.log('Requires nvim-0.10.0+', { level = vim.log.levels.ERROR })
return {}
end
@ -15,17 +15,25 @@ local initialized = false
local function ensure_initialized()
if initialized then
return
return true
end
local user_config = vim.g.cp_config or {}
local config = config_module.setup(user_config)
config_module.set_current_config(config)
local user_config = vim.g.cp or {}
local ok, result = pcall(config_module.setup, user_config)
if not ok then
local msg = tostring(result):gsub('^.+:%d+: ', '')
vim.notify(msg, vim.log.levels.ERROR)
return false
end
config_module.set_current_config(result)
initialized = true
return true
end
---@return nil
function M.handle_command(opts)
ensure_initialized()
if not ensure_initialized() then
return
end
local commands = require('cp.commands')
commands.handle_command(opts)
end
@ -34,4 +42,13 @@ function M.is_initialized()
return initialized
end
---@deprecated Use `vim.g.cp` instead
function M.setup(user_config)
vim.deprecate('require("cp").setup()', 'vim.g.cp', 'v0.7.7', 'cp.nvim', false)
if user_config then
vim.g.cp = vim.tbl_deep_extend('force', vim.g.cp or {}, user_config)
end
end
return M

View file

@ -1,12 +1,27 @@
local M = {}
function M.log(msg, level, override)
---@class LogOpts
---@field level? integer
---@field override? boolean
---@field sync? boolean
---@param msg string
---@param opts? LogOpts
function M.log(msg, opts)
local debug = require('cp.config').get_config().debug or false
level = level or vim.log.levels.INFO
opts = opts or {}
local level = opts.level or vim.log.levels.INFO
local override = opts.override or false
local sync = opts.sync or false
if level >= vim.log.levels.WARN or override or debug then
vim.schedule(function()
local notify = function()
vim.notify(('[cp.nvim]: %s'):format(msg), level)
end)
end
if sync then
notify()
else
vim.schedule(notify)
end
end
end

View file

@ -42,10 +42,10 @@ function M.get_platform_contests(platform, refresh)
local picker_contests = cache.get_contest_summaries(platform)
if refresh or vim.tbl_isempty(picker_contests) then
local display_name = constants.PLATFORM_DISPLAY_NAMES[platform]
logger.log(
('Loading %s contests...'):format(constants.PLATFORM_DISPLAY_NAMES[platform]),
vim.log.levels.INFO,
true
('Fetching %s contests...'):format(display_name),
{ level = vim.log.levels.INFO, override = true, sync = true }
)
local contests = scraper.scrape_contest_list(platform)
@ -53,12 +53,8 @@ function M.get_platform_contests(platform, refresh)
picker_contests = cache.get_contest_summaries(platform)
logger.log(
('Loaded %d %s contests.'):format(
#picker_contests,
constants.PLATFORM_DISPLAY_NAMES[platform]
),
vim.log.levels.INFO,
true
('Fetched %d %s contests.'):format(#picker_contests, display_name),
{ level = vim.log.levels.INFO, override = true }
)
end

146
lua/cp/race.lua Normal file
View file

@ -0,0 +1,146 @@
local M = {}
local cache = require('cp.cache')
local constants = require('cp.constants')
local logger = require('cp.log')
local scraper = require('cp.scraper')
local race_state = {
timer = nil,
platform = nil,
contest_id = nil,
language = nil,
start_time = nil,
}
local function format_countdown(seconds)
local h = math.floor(seconds / 3600)
local m = math.floor((seconds % 3600) / 60)
local s = seconds % 60
return string.format('%02d:%02d:%02d', h, m, s)
end
function M.start(platform, contest_id, language)
if not platform or not vim.tbl_contains(constants.PLATFORMS, platform) then
logger.log('Invalid platform', { level = vim.log.levels.ERROR })
return
end
if not contest_id or contest_id == '' then
logger.log('Contest ID required', { level = vim.log.levels.ERROR })
return
end
if race_state.timer then
logger.log('Race already active. Use :CP race stop first.', { level = vim.log.levels.WARN })
return
end
cache.load()
local start_time = cache.get_contest_start_time(platform, contest_id)
if not start_time then
logger.log('Fetching contest list...', { level = vim.log.levels.INFO, override = true })
local contests = scraper.scrape_contest_list(platform)
if contests and #contests > 0 then
cache.set_contest_summaries(platform, contests)
start_time = cache.get_contest_start_time(platform, contest_id)
end
end
if not start_time then
logger.log(
('No start time found for %s contest %s'):format(
constants.PLATFORM_DISPLAY_NAMES[platform] or platform,
contest_id
),
{ level = vim.log.levels.ERROR }
)
return
end
local remaining = start_time - os.time()
if remaining <= 0 then
logger.log(
'Contest has already started, setting up...',
{ level = vim.log.levels.INFO, override = true }
)
require('cp.setup').setup_contest(platform, contest_id, nil, language)
return
end
race_state.platform = platform
race_state.contest_id = contest_id
race_state.language = language
race_state.start_time = start_time
logger.log(
('Race started for %s %s — %s remaining'):format(
constants.PLATFORM_DISPLAY_NAMES[platform] or platform,
contest_id,
format_countdown(remaining)
),
{ level = vim.log.levels.INFO, override = true }
)
local timer = vim.uv.new_timer()
race_state.timer = timer
timer:start(
1000,
1000,
vim.schedule_wrap(function()
local r = race_state.start_time - os.time()
if r <= 0 then
timer:stop()
timer:close()
race_state.timer = nil
local p = race_state.platform
local c = race_state.contest_id
local l = race_state.language
race_state.platform = nil
race_state.contest_id = nil
race_state.language = nil
race_state.start_time = nil
logger.log('Contest started!', { level = vim.log.levels.INFO, override = true })
require('cp.setup').setup_contest(p, c, nil, l)
else
vim.notify(
('[cp.nvim] %s %s — %s'):format(
constants.PLATFORM_DISPLAY_NAMES[race_state.platform] or race_state.platform,
race_state.contest_id,
format_countdown(r)
),
vim.log.levels.INFO
)
end
end)
)
end
function M.stop()
local timer = race_state.timer
if not timer then
logger.log('No active race', { level = vim.log.levels.WARN })
return
end
timer:stop()
timer:close()
race_state.timer = nil
race_state.platform = nil
race_state.contest_id = nil
race_state.language = nil
race_state.start_time = nil
logger.log('Race cancelled', { level = vim.log.levels.INFO, override = true })
end
function M.status()
if not race_state.timer or not race_state.start_time then
return { active = false }
end
return {
active = true,
platform = race_state.platform,
contest_id = race_state.contest_id,
remaining_seconds = math.max(0, race_state.start_time - os.time()),
}
end
return M

View file

@ -11,7 +11,7 @@ function M.restore_from_current_file()
local current_file = (vim.uv.fs_realpath(vim.fn.expand('%:p')) or vim.fn.expand('%:p'))
local file_state = cache.get_file_state(current_file)
if not file_state then
logger.log('No cached state found for current file.', vim.log.levels.ERROR)
logger.log('No cached state found for current file.', { level = vim.log.levels.ERROR })
return false
end

View file

@ -43,6 +43,7 @@ end
function M.compile(compile_cmd, substitutions, on_complete)
local cmd = substitute_template(compile_cmd, substitutions)
local sh = table.concat(cmd, ' ') .. ' 2>&1'
logger.log('compile: ' .. sh)
local t0 = vim.uv.hrtime()
vim.system({ 'sh', '-c', sh }, { text = false }, function(r)
@ -51,7 +52,7 @@ function M.compile(compile_cmd, substitutions, on_complete)
r.stdout = ansi.bytes_to_string(r.stdout or '')
if r.code == 0 then
logger.log(('Compilation successful in %.1fms.'):format(dt), vim.log.levels.INFO)
logger.log(('Compilation successful in %.1fms.'):format(dt), { level = vim.log.levels.INFO })
else
logger.log(('Compilation failed in %.1fms.'):format(dt))
end
@ -119,6 +120,7 @@ function M.run(cmd, stdin, timeout_ms, memory_mb, on_complete)
local sec = math.ceil(timeout_ms / 1000)
local timeout_prefix = ('%s -k 1s %ds '):format(timeout_bin, sec)
local sh = prefix .. timeout_prefix .. ('%s -v sh -c %q 2>&1'):format(time_bin, prog)
logger.log('run: ' .. sh)
local t0 = vim.uv.hrtime()
vim.system({ 'sh', '-c', sh }, { stdin = stdin, text = true }, function(r)

View file

@ -19,6 +19,7 @@
---@class ProblemConstraints
---@field timeout_ms number
---@field memory_mb number
---@field precision number?
---@class PanelState
---@field test_cases RanTestCase[]
@ -56,7 +57,8 @@ local function load_constraints_from_cache(platform, contest_id, problem_id)
cache.load()
local timeout_ms, memory_mb = cache.get_constraints(platform, contest_id, problem_id)
if timeout_ms and memory_mb then
return { timeout_ms = timeout_ms, memory_mb = memory_mb }
local precision = cache.get_precision(platform, contest_id, problem_id)
return { timeout_ms = timeout_ms, memory_mb = memory_mb, precision = precision }
end
return nil
end
@ -99,6 +101,53 @@ local function build_command(cmd, substitutions)
return execute.build_command(cmd, substitutions)
end
---@param actual string
---@param expected string
---@param precision number?
---@return boolean
local function compare_outputs(actual, expected, precision)
local norm_actual = normalize_lines(actual)
local norm_expected = normalize_lines(expected)
if precision == nil or precision == 0 then
return norm_actual == norm_expected
end
local actual_lines = vim.split(norm_actual, '\n', { plain = true })
local expected_lines = vim.split(norm_expected, '\n', { plain = true })
if #actual_lines ~= #expected_lines then
return false
end
for i = 1, #actual_lines do
local a_tokens = vim.split(actual_lines[i], '%s+', { plain = false, trimempty = true })
local e_tokens = vim.split(expected_lines[i], '%s+', { plain = false, trimempty = true })
if #a_tokens ~= #e_tokens then
return false
end
for j = 1, #a_tokens do
local a_tok, e_tok = a_tokens[j], e_tokens[j]
local a_num = tonumber(a_tok)
local e_num = tonumber(e_tok)
if a_num ~= nil and e_num ~= nil then
if math.abs(a_num - e_num) > precision then
return false
end
else
if a_tok ~= e_tok then
return false
end
end
end
end
return true
end
---@param test_case RanTestCase
---@param debug boolean?
---@param on_complete fun(result: { status: "pass"|"fail"|"tle"|"mle", actual: string, actual_highlights: Highlight[], error: string, stderr: string, time_ms: number, code: integer, ok: boolean, signal: string?, tled: boolean, mled: boolean, rss_mb: number })
@ -143,7 +192,9 @@ local function run_single_test_case(test_case, debug, on_complete)
end
local expected = test_case.expected or ''
local ok = normalize_lines(out) == normalize_lines(expected)
local precision = (panel_state.constraints and panel_state.constraints.precision)
or config.ui.panel.precision
local ok = compare_outputs(out, expected, precision)
local signal = r.signal
if not signal and r.code and r.code >= 128 then
@ -194,7 +245,7 @@ function M.load_test_cases()
state.get_problem_id()
)
logger.log(('Loaded %d test case(s)'):format(#tcs), vim.log.levels.INFO)
logger.log(('Loaded %d test case(s)'):format(#tcs), { level = vim.log.levels.INFO })
return #tcs > 0
end
@ -208,7 +259,7 @@ function M.run_combined_test(debug, on_complete)
)
if not combined then
logger.log('No combined test found', vim.log.levels.ERROR)
logger.log('No combined test found', { level = vim.log.levels.ERROR })
on_complete(nil)
return
end
@ -276,26 +327,33 @@ function M.run_all_test_cases(indices, debug, on_each, on_done)
end
end
local function run_next(pos)
if pos > #to_run then
logger.log(
('Finished %s %d test cases.'):format(debug and 'debugging' or 'running', #to_run),
vim.log.levels.INFO,
true
)
on_done(panel_state.test_cases)
return
end
M.run_test_case(to_run[pos], debug, function()
if on_each then
on_each(pos, #to_run)
end
run_next(pos + 1)
end)
if #to_run == 0 then
logger.log(
('Finished %s %d test cases.'):format(debug and 'debugging' or 'running', 0),
{ level = vim.log.levels.INFO, override = true }
)
on_done(panel_state.test_cases)
return
end
run_next(1)
local total = #to_run
local remaining = total
for _, idx in ipairs(to_run) do
M.run_test_case(idx, debug, function()
if on_each then
on_each(idx, total)
end
remaining = remaining - 1
if remaining == 0 then
logger.log(
('Finished %s %d test cases.'):format(debug and 'debugging' or 'running', total),
{ level = vim.log.levels.INFO, override = true }
)
on_done(panel_state.test_cases)
end
end)
end
end
---@return PanelState

View file

@ -5,55 +5,141 @@ local logger = require('cp.log')
local utils = require('cp.utils')
local function syshandle(result)
local ok, data = pcall(vim.json.decode, result.stdout or '')
if ok then
return { success = true, data = data }
end
if result.code ~= 0 then
local msg = 'Scraper failed: ' .. (result.stderr or 'Unknown error')
return { success = false, error = msg }
end
local ok, data = pcall(vim.json.decode, result.stdout)
if not ok then
local msg = 'Failed to parse scraper output: ' .. tostring(data)
logger.log(msg, vim.log.levels.ERROR)
return { success = false, error = msg }
end
local msg = 'Failed to parse scraper output: ' .. tostring(data)
logger.log(msg, { level = vim.log.levels.ERROR })
return { success = false, error = msg }
end
return { success = true, data = data }
---@param env_map table<string, string>
---@return string[]
local function spawn_env_list(env_map)
local out = {}
for key, value in pairs(env_map) do
out[#out + 1] = tostring(key) .. '=' .. tostring(value)
end
return out
end
---@param platform string
---@param subcommand string
---@param args string[]
---@param opts { sync?: boolean, ndjson?: boolean, on_event?: fun(ev: table), on_exit?: fun(result: table) }
---@param opts { sync?: boolean, ndjson?: boolean, on_event?: fun(ev: table), on_exit?: fun(result: table), env_extra?: table<string, string>, stdin?: string }
local function run_scraper(platform, subcommand, args, opts)
if not utils.setup_python_env() then
local msg = 'no Python environment available (install uv or nix)'
logger.log(msg, { level = vim.log.levels.ERROR })
if opts and opts.on_exit then
opts.on_exit({ success = false, error = msg })
end
return { success = false, error = msg }
end
local needs_browser = subcommand == 'submit'
or subcommand == 'login'
or (platform == 'codeforces' and (subcommand == 'metadata' or subcommand == 'tests'))
if needs_browser then
utils.setup_nix_submit_env()
end
local plugin_path = utils.get_plugin_path()
local cmd = { 'uv', 'run', '--directory', plugin_path, '-m', 'scrapers.' .. platform, subcommand }
local cmd
if needs_browser then
cmd = utils.get_python_submit_cmd(platform, plugin_path)
else
cmd = utils.get_python_cmd(platform, plugin_path)
end
vim.list_extend(cmd, { subcommand })
vim.list_extend(cmd, args)
logger.log('scraper cmd: ' .. table.concat(cmd, ' '))
local env = vim.fn.environ()
env.VIRTUAL_ENV = ''
env.PYTHONPATH = ''
env.CONDA_PREFIX = ''
if opts and opts.env_extra then
for k, v in pairs(opts.env_extra) do
env[k] = v
end
end
if needs_browser and utils.is_nix_build() then
env.UV_PROJECT_ENVIRONMENT = vim.fn.stdpath('cache') .. '/cp-nvim/submit-env'
end
if opts and opts.ndjson then
local uv = vim.loop
local uv = vim.uv
local stdin_pipe = nil
if opts.stdin then
stdin_pipe = uv.new_pipe(false)
end
local stdout = uv.new_pipe(false)
local stderr = uv.new_pipe(false)
local buf = ''
local timer = nil
local handle
handle = uv.spawn(
cmd[1],
{ args = vim.list_slice(cmd, 2), stdio = { nil, stdout, stderr }, env = env },
function(code, signal)
if buf ~= '' and opts.on_event then
local ok_tail, ev_tail = pcall(vim.json.decode, buf)
if ok_tail then
opts.on_event(ev_tail)
end
buf = ''
handle = uv.spawn(cmd[1], {
args = vim.list_slice(cmd, 2),
stdio = { stdin_pipe, stdout, stderr },
env = spawn_env_list(env),
cwd = plugin_path,
}, function(code, signal)
if timer and not timer:is_closing() then
timer:stop()
timer:close()
end
if buf ~= '' and opts.on_event then
local ok_tail, ev_tail = pcall(vim.json.decode, buf)
if ok_tail then
opts.on_event(ev_tail)
end
if opts.on_exit then
opts.on_exit({ success = (code == 0), code = code, signal = signal })
buf = ''
end
if opts.on_exit then
opts.on_exit({ success = (code == 0), code = code, signal = signal })
end
if stdin_pipe and not stdin_pipe:is_closing() then
stdin_pipe:close()
end
if not stdout:is_closing() then
stdout:close()
end
if not stderr:is_closing() then
stderr:close()
end
if handle and not handle:is_closing() then
handle:close()
end
end)
if not handle then
if stdin_pipe and not stdin_pipe:is_closing() then
stdin_pipe:close()
end
logger.log('Failed to start scraper process', { level = vim.log.levels.ERROR })
return { success = false, error = 'spawn failed' }
end
if needs_browser then
timer = uv.new_timer()
timer:start(120000, 0, function()
timer:stop()
timer:close()
if stdin_pipe and not stdin_pipe:is_closing() then
stdin_pipe:close()
end
if not stdout:is_closing() then
stdout:close()
@ -62,14 +148,21 @@ local function run_scraper(platform, subcommand, args, opts)
stderr:close()
end
if handle and not handle:is_closing() then
handle:kill(15)
handle:close()
end
end
)
if opts.on_exit then
opts.on_exit({ success = false, error = 'submit timed out' })
end
end)
end
if not handle then
logger.log('Failed to start scraper process', vim.log.levels.ERROR)
return { success = false, error = 'spawn failed' }
if stdin_pipe then
uv.write(stdin_pipe, opts.stdin, function()
uv.shutdown(stdin_pipe, function()
stdin_pipe:close()
end)
end)
end
uv.read_start(stdout, function(_, data)
@ -102,7 +195,15 @@ local function run_scraper(platform, subcommand, args, opts)
return
end
local sysopts = { text = true, timeout = 30000, env = env }
local sysopts = {
text = true,
timeout = needs_browser and 120000 or 30000,
env = env,
cwd = plugin_path,
}
if opts and opts.stdin then
sysopts.stdin = opts.stdin
end
if opts and opts.sync then
local result = vim.system(cmd, sysopts):wait()
return syshandle(result)
@ -124,7 +225,7 @@ function M.scrape_contest_metadata(platform, contest_id, callback)
constants.PLATFORM_DISPLAY_NAMES[platform],
contest_id
),
vim.log.levels.ERROR
{ level = vim.log.levels.ERROR }
)
return
end
@ -135,7 +236,7 @@ function M.scrape_contest_metadata(platform, contest_id, callback)
constants.PLATFORM_DISPLAY_NAMES[platform],
contest_id
),
vim.log.levels.ERROR
{ level = vim.log.levels.ERROR }
)
return
end
@ -154,7 +255,7 @@ function M.scrape_contest_list(platform)
platform,
(result and result.error) or 'unknown'
),
vim.log.levels.ERROR
{ level = vim.log.levels.ERROR }
)
return {}
end
@ -164,9 +265,15 @@ end
---@param platform string
---@param contest_id string
---@param callback fun(data: table)|nil
function M.scrape_all_tests(platform, contest_id, callback)
---@param on_done fun()|nil
function M.scrape_all_tests(platform, contest_id, callback, on_done)
run_scraper(platform, 'tests', { contest_id }, {
ndjson = true,
on_exit = function()
if type(on_done) == 'function' then
vim.schedule(on_done)
end
end,
on_event = function(ev)
if ev.done then
return
@ -178,7 +285,7 @@ function M.scrape_all_tests(platform, contest_id, callback)
contest_id,
ev.error
),
vim.log.levels.WARN
{ level = vim.log.levels.WARN }
)
return
end
@ -205,6 +312,7 @@ function M.scrape_all_tests(platform, contest_id, callback)
memory_mb = ev.memory_mb or 0,
interactive = ev.interactive or false,
multi_test = ev.multi_test or false,
precision = ev.precision ~= vim.NIL and ev.precision or nil,
problem_id = ev.problem_id,
})
end
@ -213,4 +321,75 @@ function M.scrape_all_tests(platform, contest_id, callback)
})
end
function M.login(platform, credentials, on_status, callback)
local done = false
run_scraper(platform, 'login', {}, {
ndjson = true,
env_extra = { CP_CREDENTIALS = vim.json.encode(credentials) },
on_event = function(ev)
if ev.credentials ~= nil and next(ev.credentials) ~= nil then
require('cp.cache').set_credentials(platform, ev.credentials)
end
if ev.status ~= nil then
if type(on_status) == 'function' then
on_status(ev)
end
elseif ev.success ~= nil then
done = true
if type(callback) == 'function' then
callback(ev)
end
end
end,
on_exit = function(proc)
if not done and type(callback) == 'function' then
callback({
success = false,
error = 'login process exited (code=' .. tostring(proc.code) .. ')',
})
end
end,
})
end
function M.submit(
platform,
contest_id,
problem_id,
language,
source_file,
credentials,
on_status,
callback
)
local done = false
run_scraper(platform, 'submit', { contest_id, problem_id, language, source_file }, {
ndjson = true,
env_extra = { CP_CREDENTIALS = vim.json.encode(credentials) },
on_event = function(ev)
if ev.credentials ~= nil then
require('cp.cache').set_credentials(platform, ev.credentials)
end
if ev.status ~= nil then
if type(on_status) == 'function' then
on_status(ev)
end
elseif ev.success ~= nil then
done = true
if type(callback) == 'function' then
callback(ev)
end
end
end,
on_exit = function(proc)
if not done and type(callback) == 'function' then
callback({
success = false,
error = 'submit process exited (code=' .. tostring(proc.code) .. ')',
})
end
end,
})
end
return M

View file

@ -8,6 +8,39 @@ local logger = require('cp.log')
local scraper = require('cp.scraper')
local state = require('cp.state')
local function apply_template(bufnr, lang_id, platform)
local config = config_module.get_config()
local eff = config.runtime.effective[platform] and config.runtime.effective[platform][lang_id]
if not eff or not eff.template then
return
end
local path = vim.fn.expand(eff.template)
if vim.fn.filereadable(path) ~= 1 then
logger.log(
('[cp.nvim] template not readable: %s'):format(path),
{ level = vim.log.levels.WARN }
)
return
end
local lines = vim.fn.readfile(path)
vim.api.nvim_buf_set_lines(bufnr, 0, -1, false, lines)
local marker = config.templates and config.templates.cursor_marker
if marker then
for lnum, line in ipairs(lines) do
local col = line:find(marker, 1, true)
if col then
local new_line = line:sub(1, col - 1) .. line:sub(col + #marker)
vim.api.nvim_buf_set_lines(bufnr, lnum - 1, lnum, false, { new_line })
local winid = vim.fn.bufwinid(bufnr)
if winid ~= -1 then
vim.api.nvim_win_set_cursor(winid, { lnum, col - 1 })
end
break
end
end
end
end
---Get the language of the current file from cache
---@return string?
local function get_current_file_language()
@ -82,11 +115,15 @@ local function start_tests(platform, contest_id, problems)
return not vim.tbl_isempty(cache.get_test_cases(platform, contest_id, p.id))
end, problems)
if cached_len ~= #problems then
local to_fetch = #problems - cached_len
logger.log(('Fetching %s/%s problem tests...'):format(cached_len, #problems))
scraper.scrape_all_tests(platform, contest_id, function(ev)
local cached_tests = {}
if not ev.interactive and vim.tbl_isempty(ev.tests) then
logger.log(("No tests found for problem '%s'."):format(ev.problem_id), vim.log.levels.WARN)
logger.log(
("No tests found for problem '%s'."):format(ev.problem_id),
{ level = vim.log.levels.WARN }
)
end
for i, t in ipairs(ev.tests) do
cached_tests[i] = { index = i, input = t.input, expected = t.expected }
@ -100,7 +137,8 @@ local function start_tests(platform, contest_id, problems)
ev.timeout_ms or 0,
ev.memory_mb or 0,
ev.interactive,
ev.multi_test
ev.multi_test,
ev.precision
)
local io_state = state.get_io_view_state()
@ -111,6 +149,11 @@ local function start_tests(platform, contest_id, problems)
require('cp.utils').update_buffer_content(io_state.input_buf, input_lines, nil, nil)
end
end
end, function()
logger.log(
('Loaded %d test%s.'):format(to_fetch, to_fetch == 1 and '' or 's'),
{ level = vim.log.levels.INFO, override = true }
)
end)
end
end
@ -128,24 +171,39 @@ function M.setup_contest(platform, contest_id, problem_id, language)
if language then
local lang_result = config_module.get_language_for_platform(platform, language)
if not lang_result.valid then
logger.log(lang_result.error, vim.log.levels.ERROR)
logger.log(lang_result.error, { level = vim.log.levels.ERROR })
return
end
end
local is_new_contest = old_platform ~= platform and old_contest_id ~= contest_id
local is_new_contest = old_platform ~= platform or old_contest_id ~= contest_id
if is_new_contest then
local views = require('cp.ui.views')
views.cancel_io_view()
local active = state.get_active_panel()
if active == 'interactive' then
views.cancel_interactive()
elseif active == 'stress' then
require('cp.stress').cancel()
elseif active == 'run' then
views.disable()
end
end
cache.load()
local function proceed(contest_data)
if is_new_contest then
local io_state = state.get_io_view_state()
if io_state and io_state.output_buf and vim.api.nvim_buf_is_valid(io_state.output_buf) then
require('cp.utils').update_buffer_content(io_state.output_buf, {}, nil, nil)
end
end
local problems = contest_data.problems
local pid = problem_id and problem_id or problems[1].id
M.setup_problem(pid, language)
start_tests(platform, contest_id, problems)
if config_module.get_config().open_url and is_new_contest and contest_data.url then
vim.ui.open(contest_data.url:format(pid))
end
end
local contest_data = cache.get_contest_data(platform, contest_id)
@ -160,12 +218,7 @@ function M.setup_contest(platform, contest_id, problem_id, language)
vim.bo[bufnr].buftype = ''
vim.bo[bufnr].swapfile = false
if cfg.hooks and cfg.hooks.setup_code and not vim.b[bufnr].cp_setup_done then
local ok = pcall(cfg.hooks.setup_code, state)
if ok then
vim.b[bufnr].cp_setup_done = true
end
end
state.set_language(lang)
state.set_provisional({
bufnr = bufnr,
@ -173,16 +226,23 @@ function M.setup_contest(platform, contest_id, problem_id, language)
contest_id = contest_id,
language = lang,
requested_problem_id = problem_id,
token = vim.loop.hrtime(),
token = vim.uv.hrtime(),
})
logger.log('Fetching contests problems...', vim.log.levels.INFO, true)
logger.log('Fetching contests problems...', { level = vim.log.levels.INFO, override = true })
scraper.scrape_contest_metadata(
platform,
contest_id,
vim.schedule_wrap(function(result)
local problems = result.problems or {}
cache.set_contest_data(platform, contest_id, problems, result.url)
cache.set_contest_data(
platform,
contest_id,
problems,
result.url,
result.contest_url or '',
result.standings_url or ''
)
local prov = state.get_provisional()
if not prov or prov.platform ~= platform or prov.contest_id ~= contest_id then
return
@ -212,7 +272,7 @@ end
function M.setup_problem(problem_id, language)
local platform = state.get_platform()
if not platform then
logger.log('No platform/contest/problem configured.', vim.log.levels.ERROR)
logger.log('No platform/contest/problem configured.', { level = vim.log.levels.ERROR })
return
end
@ -233,7 +293,7 @@ function M.setup_problem(problem_id, language)
if language then
local lang_result = config_module.get_language_for_platform(platform, language)
if not lang_result.valid then
logger.log(lang_result.error, vim.log.levels.ERROR)
logger.log(lang_result.error, { level = vim.log.levels.ERROR })
return
end
end
@ -245,7 +305,38 @@ function M.setup_problem(problem_id, language)
return
end
vim.fn.mkdir(vim.fn.fnamemodify(source_file, ':h'), 'p')
if vim.fn.filereadable(source_file) == 1 then
local existing = cache.get_file_state(vim.fn.fnamemodify(source_file, ':p'))
if
existing
and (
existing.platform ~= platform
or existing.contest_id ~= (state.get_contest_id() or '')
or existing.problem_id ~= problem_id
)
then
logger.log(
('File %q already exists for %s/%s %s.'):format(
source_file,
existing.platform,
existing.contest_id,
existing.problem_id
),
{ level = vim.log.levels.ERROR }
)
return
end
end
local contest_dir = vim.fn.fnamemodify(source_file, ':h')
local is_new_dir = vim.fn.isdirectory(contest_dir) == 0
vim.fn.mkdir(contest_dir, 'p')
if is_new_dir then
local s = config.hooks and config.hooks.setup
if s and s.contest then
pcall(s.contest, state)
end
end
local prov = state.get_provisional()
if prov and prov.platform == platform and prov.contest_id == (state.get_contest_id() or '') then
@ -256,7 +347,6 @@ function M.setup_problem(problem_id, language)
state.set_provisional(nil)
else
vim.api.nvim_buf_set_name(prov.bufnr, source_file)
vim.bo[prov.bufnr].swapfile = true
-- selene: allow(mixed_table)
vim.cmd.write({
vim.fn.fnameescape(source_file),
@ -264,14 +354,29 @@ function M.setup_problem(problem_id, language)
mods = { silent = true, noautocmd = true, keepalt = true },
})
state.set_solution_win(vim.api.nvim_get_current_win())
if config.hooks and config.hooks.setup_code and not vim.b[prov.bufnr].cp_setup_done then
local ok = pcall(config.hooks.setup_code, state)
if ok then
if not vim.b[prov.bufnr].cp_setup_done then
apply_template(prov.bufnr, lang, platform)
local s = config.hooks and config.hooks.setup
if s and s.code then
local ok = pcall(s.code, state)
if ok then
vim.b[prov.bufnr].cp_setup_done = true
end
else
helpers.clearcol(prov.bufnr)
vim.b[prov.bufnr].cp_setup_done = true
end
elseif not vim.b[prov.bufnr].cp_setup_done then
helpers.clearcol(prov.bufnr)
vim.b[prov.bufnr].cp_setup_done = true
local o = config.hooks and config.hooks.on
if o and o.enter then
local bufnr = prov.bufnr
vim.api.nvim_create_autocmd('BufEnter', {
buffer = bufnr,
callback = function()
pcall(o.enter, state)
end,
})
pcall(o.enter, state)
end
end
cache.set_file_state(
vim.fn.fnamemodify(source_file, ':p'),
@ -290,18 +395,39 @@ function M.setup_problem(problem_id, language)
end
vim.cmd.only({ mods = { silent = true } })
vim.cmd.e(source_file)
local current_file = vim.fn.expand('%:p')
if current_file ~= vim.fn.fnamemodify(source_file, ':p') then
vim.cmd.e(source_file)
end
local bufnr = vim.api.nvim_get_current_buf()
state.set_solution_win(vim.api.nvim_get_current_win())
require('cp.ui.views').ensure_io_view()
if config.hooks and config.hooks.setup_code and not vim.b[bufnr].cp_setup_done then
local ok = pcall(config.hooks.setup_code, state)
if ok then
if not vim.b[bufnr].cp_setup_done then
local is_new = vim.api.nvim_buf_line_count(bufnr) == 1
and vim.api.nvim_buf_get_lines(bufnr, 0, 1, false)[1] == ''
if is_new then
apply_template(bufnr, lang, platform)
end
local s = config.hooks and config.hooks.setup
if s and s.code then
local ok = pcall(s.code, state)
if ok then
vim.b[bufnr].cp_setup_done = true
end
else
helpers.clearcol(bufnr)
vim.b[bufnr].cp_setup_done = true
end
elseif not vim.b[bufnr].cp_setup_done then
helpers.clearcol(bufnr)
vim.b[bufnr].cp_setup_done = true
local o = config.hooks and config.hooks.on
if o and o.enter then
vim.api.nvim_create_autocmd('BufEnter', {
buffer = bufnr,
callback = function()
pcall(o.enter, state)
end,
})
pcall(o.enter, state)
end
end
cache.set_file_state(
vim.fn.expand('%:p'),
@ -324,7 +450,7 @@ function M.navigate_problem(direction, language)
local contest_id = state.get_contest_id()
local current_problem_id = state.get_problem_id()
if not platform or not contest_id or not current_problem_id then
logger.log('No platform configured.', vim.log.levels.ERROR)
logger.log('No platform configured.', { level = vim.log.levels.ERROR })
return
end
@ -336,7 +462,7 @@ function M.navigate_problem(direction, language)
constants.PLATFORM_DISPLAY_NAMES[platform],
contest_id
),
vim.log.levels.ERROR
{ level = vim.log.levels.ERROR }
)
return
end
@ -350,9 +476,15 @@ function M.navigate_problem(direction, language)
logger.log(('navigate_problem: %s -> %s'):format(current_problem_id, problems[new_index].id))
local views = require('cp.ui.views')
views.cancel_io_view()
local active_panel = state.get_active_panel()
if active_panel == 'run' then
require('cp.ui.views').disable()
views.disable()
elseif active_panel == 'interactive' then
views.cancel_interactive()
elseif active_panel == 'stress' then
require('cp.stress').cancel()
end
local lang = nil
@ -360,7 +492,7 @@ function M.navigate_problem(direction, language)
if language then
local lang_result = config_module.get_language_for_platform(platform, language)
if not lang_result.valid then
logger.log(lang_result.error, vim.log.levels.ERROR)
logger.log(lang_result.error, { level = vim.log.levels.ERROR })
return
end
lang = language

View file

@ -9,7 +9,6 @@
---@class cp.IoViewState
---@field output_buf integer
---@field input_buf integer
---@field current_test_index integer?
---@field source_buf integer?
---@class cp.State

249
lua/cp/stress.lua Normal file
View file

@ -0,0 +1,249 @@
local M = {}
local logger = require('cp.log')
local state = require('cp.state')
local utils = require('cp.utils')
local GENERATOR_PATTERNS = {
'gen.py',
'gen.cc',
'gen.cpp',
'generator.py',
'generator.cc',
'generator.cpp',
}
local BRUTE_PATTERNS = {
'brute.py',
'brute.cc',
'brute.cpp',
'slow.py',
'slow.cc',
'slow.cpp',
}
local function find_file(patterns)
for _, pattern in ipairs(patterns) do
if vim.fn.filereadable(pattern) == 1 then
return pattern
end
end
return nil
end
local function compile_cpp(source, output)
local result = vim.system({ 'sh', '-c', 'g++ -O2 -o ' .. output .. ' ' .. source }):wait()
if result.code ~= 0 then
logger.log(
('Failed to compile %s: %s'):format(source, result.stderr or ''),
{ level = vim.log.levels.ERROR }
)
return false
end
return true
end
local function build_run_cmd(file)
local ext = file:match('%.([^%.]+)$')
if ext == 'cc' or ext == 'cpp' then
local base = file:gsub('%.[^%.]+$', '')
local bin = base .. '_bin'
if not compile_cpp(file, bin) then
return nil
end
return './' .. bin
elseif ext == 'py' then
return 'python3 ' .. file
end
return './' .. file
end
function M.toggle(generator_cmd, brute_cmd)
if state.get_active_panel() == 'stress' then
if state.stress_buf and vim.api.nvim_buf_is_valid(state.stress_buf) then
local job = vim.b[state.stress_buf].terminal_job_id
if job then
vim.fn.jobstop(job)
end
end
if state.saved_stress_session then
vim.cmd.source(state.saved_stress_session)
vim.fn.delete(state.saved_stress_session)
state.saved_stress_session = nil
end
state.set_active_panel(nil)
return
end
if state.get_active_panel() then
logger.log('Another panel is already active.', { level = vim.log.levels.WARN })
return
end
local gen_file = generator_cmd
local brute_file = brute_cmd
if not gen_file then
gen_file = find_file(GENERATOR_PATTERNS)
end
if not brute_file then
brute_file = find_file(BRUTE_PATTERNS)
end
if not gen_file then
logger.log(
'No generator found. Pass generator as first arg or add gen.{py,cc,cpp}.',
{ level = vim.log.levels.ERROR }
)
return
end
if not brute_file then
logger.log(
'No brute solution found. Pass brute as second arg or add brute.{py,cc,cpp}.',
{ level = vim.log.levels.ERROR }
)
return
end
local gen_cmd = build_run_cmd(gen_file)
if not gen_cmd then
return
end
local brute_run_cmd = build_run_cmd(brute_file)
if not brute_run_cmd then
return
end
state.saved_stress_session = vim.fn.tempname()
-- selene: allow(mixed_table)
vim.cmd.mksession({ state.saved_stress_session, bang = true })
vim.cmd.only({ mods = { silent = true } })
local execute = require('cp.runner.execute')
local function restore_session()
if state.saved_stress_session then
vim.cmd.source(state.saved_stress_session)
vim.fn.delete(state.saved_stress_session)
state.saved_stress_session = nil
end
end
execute.compile_problem(false, function(compile_result)
if not compile_result.success then
local run = require('cp.runner.run')
run.handle_compilation_failure(compile_result.output)
restore_session()
return
end
local binary = state.get_binary_file()
if not binary or binary == '' then
logger.log('No binary produced.', { level = vim.log.levels.ERROR })
restore_session()
return
end
local script = vim.fn.fnamemodify(utils.get_plugin_path() .. '/scripts/stress.py', ':p')
local cmdline
if utils.is_nix_build() then
cmdline = table.concat({
vim.fn.shellescape(utils.get_nix_python()),
vim.fn.shellescape(script),
vim.fn.shellescape(gen_cmd),
vim.fn.shellescape(brute_run_cmd),
vim.fn.shellescape(binary),
}, ' ')
else
cmdline = table.concat({
'uv',
'run',
vim.fn.shellescape(script),
vim.fn.shellescape(gen_cmd),
vim.fn.shellescape(brute_run_cmd),
vim.fn.shellescape(binary),
}, ' ')
end
vim.cmd.terminal(cmdline)
local term_buf = vim.api.nvim_get_current_buf()
local term_win = vim.api.nvim_get_current_win()
local cleaned = false
local function cleanup()
if cleaned then
return
end
cleaned = true
if term_buf and vim.api.nvim_buf_is_valid(term_buf) then
local job = vim.b[term_buf] and vim.b[term_buf].terminal_job_id or nil
if job then
pcall(vim.fn.jobstop, job)
end
end
restore_session()
state.stress_buf = nil
state.stress_win = nil
state.set_active_panel(nil)
end
vim.api.nvim_create_autocmd({ 'BufWipeout', 'BufUnload' }, {
buffer = term_buf,
callback = cleanup,
})
vim.api.nvim_create_autocmd('WinClosed', {
callback = function()
if cleaned then
return
end
local any = false
for _, win in ipairs(vim.api.nvim_list_wins()) do
if vim.api.nvim_win_is_valid(win) and vim.api.nvim_win_get_buf(win) == term_buf then
any = true
break
end
end
if not any then
cleanup()
end
end,
})
vim.api.nvim_create_autocmd('TermClose', {
buffer = term_buf,
callback = function()
vim.b[term_buf].cp_stress_exited = true
end,
})
vim.keymap.set('t', '<c-q>', function()
cleanup()
end, { buffer = term_buf, silent = true })
vim.keymap.set('n', '<c-q>', function()
cleanup()
end, { buffer = term_buf, silent = true })
state.stress_buf = term_buf
state.stress_win = term_win
state.set_active_panel('stress')
end)
end
function M.cancel()
if state.stress_buf and vim.api.nvim_buf_is_valid(state.stress_buf) then
local job = vim.b[state.stress_buf].terminal_job_id
if job then
vim.fn.jobstop(job)
end
end
if state.saved_stress_session then
vim.fn.delete(state.saved_stress_session)
state.saved_stress_session = nil
end
state.set_active_panel(nil)
end
return M

91
lua/cp/submit.lua Normal file
View file

@ -0,0 +1,91 @@
local M = {}
local cache = require('cp.cache')
local logger = require('cp.log')
local state = require('cp.state')
local STATUS_MSGS = {
installing_browser = 'Installing browser (first time setup)...',
checking_login = 'Checking login...',
logging_in = 'Logging in...',
submitting = 'Submitting...',
}
local function prompt_credentials(platform, callback)
local saved = cache.get_credentials(platform)
if saved and saved.username and saved.password then
callback(saved)
return
end
vim.ui.input({ prompt = platform .. ' username: ' }, function(username)
if not username or username == '' then
logger.log('Submit cancelled', { level = vim.log.levels.WARN })
return
end
vim.fn.inputsave()
local password = vim.fn.inputsecret(platform .. ' password: ')
vim.fn.inputrestore()
vim.cmd.redraw()
if not password or password == '' then
logger.log('Submit cancelled', { level = vim.log.levels.WARN })
return
end
local creds = { username = username, password = password }
cache.set_credentials(platform, creds)
callback(creds)
end)
end
function M.submit(opts)
local platform = state.get_platform()
local contest_id = state.get_contest_id()
local problem_id = state.get_problem_id()
local language = (opts and opts.language) or state.get_language()
if not platform or not contest_id or not problem_id or not language then
logger.log(
'No active problem. Use :CP <platform> <contest> first.',
{ level = vim.log.levels.ERROR }
)
return
end
local source_file = state.get_source_file()
if not source_file or vim.fn.filereadable(source_file) ~= 1 then
logger.log('Source file not found', { level = vim.log.levels.ERROR })
return
end
prompt_credentials(platform, function(creds)
vim.cmd.update()
vim.notify('[cp.nvim] Submitting...', vim.log.levels.INFO)
require('cp.scraper').submit(
platform,
contest_id,
problem_id,
language,
source_file,
creds,
function(ev)
vim.schedule(function()
vim.notify('[cp.nvim] ' .. (STATUS_MSGS[ev.status] or ev.status), vim.log.levels.INFO)
end)
end,
function(result)
vim.schedule(function()
if result and result.success then
logger.log('Submitted successfully', { level = vim.log.levels.INFO, override = true })
else
local err = result and result.error or 'unknown error'
if err:match('^Login failed') then
cache.clear_credentials(platform)
end
logger.log('Submit failed: ' .. err, { level = vim.log.levels.ERROR })
end
end)
end
)
end)
end
return M

View file

@ -90,7 +90,7 @@ local function delete_current_test()
return
end
if #edit_state.test_buffers == 1 then
logger.log('Problems must have at least one test case.', vim.log.levels.ERROR)
logger.log('Problems must have at least one test case.', { level = vim.log.levels.ERROR })
return
end
@ -144,7 +144,7 @@ local function add_new_test()
vim.api.nvim_win_set_buf(input_win, input_buf)
vim.bo[input_buf].modifiable = true
vim.bo[input_buf].readonly = false
vim.bo[input_buf].buftype = 'nofile'
vim.bo[input_buf].buftype = 'acwrite'
vim.bo[input_buf].buflisted = false
helpers.clearcol(input_buf)
@ -155,7 +155,7 @@ local function add_new_test()
vim.api.nvim_win_set_buf(expected_win, expected_buf)
vim.bo[expected_buf].modifiable = true
vim.bo[expected_buf].readonly = false
vim.bo[expected_buf].buftype = 'nofile'
vim.bo[expected_buf].buftype = 'acwrite'
vim.bo[expected_buf].buflisted = false
helpers.clearcol(expected_buf)
@ -177,6 +177,80 @@ local function add_new_test()
logger.log(('Added test %d'):format(new_index))
end
local function save_all_tests()
if not edit_state then
return
end
local platform = state.get_platform()
local contest_id = state.get_contest_id()
local problem_id = state.get_problem_id()
if not platform or not contest_id or not problem_id then
return
end
for i, pair in ipairs(edit_state.test_buffers) do
if
vim.api.nvim_buf_is_valid(pair.input_buf) and vim.api.nvim_buf_is_valid(pair.expected_buf)
then
local input_lines = vim.api.nvim_buf_get_lines(pair.input_buf, 0, -1, false)
local expected_lines = vim.api.nvim_buf_get_lines(pair.expected_buf, 0, -1, false)
edit_state.test_cases[i].input = table.concat(input_lines, '\n')
edit_state.test_cases[i].expected = table.concat(expected_lines, '\n')
end
end
local contest_data = cache.get_contest_data(platform, contest_id)
local is_multi_test = contest_data.problems[contest_data.index_map[problem_id]].multi_test
or false
local combined_input = table.concat(
vim.tbl_map(function(tc)
return tc.input
end, edit_state.test_cases),
'\n'
)
local combined_expected = table.concat(
vim.tbl_map(function(tc)
return tc.expected
end, edit_state.test_cases),
'\n'
)
cache.set_test_cases(
platform,
contest_id,
problem_id,
{ input = combined_input, expected = combined_expected },
edit_state.test_cases,
edit_state.constraints and edit_state.constraints.timeout_ms or 0,
edit_state.constraints and edit_state.constraints.memory_mb or 0,
false,
is_multi_test
)
local config = config_module.get_config()
local base_name = config.filename and config.filename(platform, contest_id, problem_id, config)
or config_module.default_filename(contest_id, problem_id)
vim.fn.mkdir('io', 'p')
for i, tc in ipairs(edit_state.test_cases) do
local input_file = string.format('io/%s.%d.cpin', base_name, i)
local expected_file = string.format('io/%s.%d.cpout', base_name, i)
local input_content = (tc.input or ''):gsub('\r', '')
local expected_content = (tc.expected or ''):gsub('\r', '')
vim.fn.writefile(vim.split(input_content, '\n', { trimempty = true }), input_file)
vim.fn.writefile(vim.split(expected_content, '\n', { trimempty = true }), expected_file)
end
logger.log('Saved all test cases')
end
---@param buf integer
setup_keybindings = function(buf)
local config = config_module.get_config()
@ -237,92 +311,39 @@ setup_keybindings = function(buf)
end
if is_tracked then
logger.log('Test buffer closed unexpectedly. Exiting editor.', vim.log.levels.WARN)
logger.log(
'Test buffer closed unexpectedly. Exiting editor.',
{ level = vim.log.levels.WARN }
)
M.toggle_edit()
end
end)
end,
})
end
local function save_all_tests()
if not edit_state then
return
end
local platform = state.get_platform()
local contest_id = state.get_contest_id()
local problem_id = state.get_problem_id()
if not platform or not contest_id or not problem_id then
return
end
for i, pair in ipairs(edit_state.test_buffers) do
if
vim.api.nvim_buf_is_valid(pair.input_buf) and vim.api.nvim_buf_is_valid(pair.expected_buf)
then
local input_lines = vim.api.nvim_buf_get_lines(pair.input_buf, 0, -1, false)
local expected_lines = vim.api.nvim_buf_get_lines(pair.expected_buf, 0, -1, false)
edit_state.test_cases[i].input = table.concat(input_lines, '\n')
edit_state.test_cases[i].expected = table.concat(expected_lines, '\n')
end
end
local contest_data = cache.get_contest_data(platform, contest_id)
local is_multi_test = contest_data.problems[contest_data.index_map[problem_id]].multi_test
or false
-- Generate combined test from individual test cases
local combined_input = table.concat(
vim.tbl_map(function(tc)
return tc.input
end, edit_state.test_cases),
'\n'
)
local combined_expected = table.concat(
vim.tbl_map(function(tc)
return tc.expected
end, edit_state.test_cases),
'\n'
)
cache.set_test_cases(
platform,
contest_id,
problem_id,
{ input = combined_input, expected = combined_expected },
edit_state.test_cases,
edit_state.constraints and edit_state.constraints.timeout_ms or 0,
edit_state.constraints and edit_state.constraints.memory_mb or 0,
false,
is_multi_test
)
local config = config_module.get_config()
local base_name = config.filename and config.filename(platform, contest_id, problem_id, config)
or config_module.default_filename(contest_id, problem_id)
vim.fn.mkdir('io', 'p')
for i, tc in ipairs(edit_state.test_cases) do
local input_file = string.format('io/%s.%d.cpin', base_name, i)
local expected_file = string.format('io/%s.%d.cpout', base_name, i)
local input_content = (tc.input or ''):gsub('\r', '')
local expected_content = (tc.expected or ''):gsub('\r', '')
vim.fn.writefile(vim.split(input_content, '\n', { trimempty = true }), input_file)
vim.fn.writefile(vim.split(expected_content, '\n', { trimempty = true }), expected_file)
end
logger.log('Saved all test cases')
vim.api.nvim_create_autocmd('BufWriteCmd', {
group = augroup,
buffer = buf,
callback = function()
save_all_tests()
vim.bo[buf].modified = false
end,
})
end
function M.toggle_edit(test_index)
if edit_state then
save_all_tests()
for _, pair in ipairs(edit_state.test_buffers) do
if vim.api.nvim_buf_is_valid(pair.input_buf) then
vim.api.nvim_buf_delete(pair.input_buf, { force = true })
end
if vim.api.nvim_buf_is_valid(pair.expected_buf) then
vim.api.nvim_buf_delete(pair.expected_buf, { force = true })
end
end
edit_state = nil
pcall(vim.api.nvim_clear_autocmds, { group = 'cp_edit_guard' })
@ -350,7 +371,10 @@ function M.toggle_edit(test_index)
state.get_platform(), state.get_contest_id(), state.get_problem_id()
if not platform or not contest_id or not problem_id then
logger.log('No problem context. Run :CP <platform> <contest> first.', vim.log.levels.ERROR)
logger.log(
'No problem context. Run :CP <platform> <contest> first.',
{ level = vim.log.levels.ERROR }
)
return
end
@ -358,7 +382,7 @@ function M.toggle_edit(test_index)
local test_cases = cache.get_test_cases(platform, contest_id, problem_id)
if not test_cases or #test_cases == 0 then
logger.log('No test cases available for editing.', vim.log.levels.ERROR)
logger.log('No test cases available for editing.', { level = vim.log.levels.ERROR })
return
end
@ -371,7 +395,7 @@ function M.toggle_edit(test_index)
if target_index < 1 or target_index > #test_cases then
logger.log(
('Test %d does not exist (only %d tests available)'):format(target_index, #test_cases),
vim.log.levels.ERROR
{ level = vim.log.levels.ERROR }
)
return
end
@ -411,7 +435,7 @@ function M.toggle_edit(test_index)
vim.api.nvim_win_set_buf(input_win, input_buf)
vim.bo[input_buf].modifiable = true
vim.bo[input_buf].readonly = false
vim.bo[input_buf].buftype = 'nofile'
vim.bo[input_buf].buftype = 'acwrite'
vim.bo[input_buf].buflisted = false
helpers.clearcol(input_buf)
@ -421,7 +445,7 @@ function M.toggle_edit(test_index)
vim.api.nvim_win_set_buf(expected_win, expected_buf)
vim.bo[expected_buf].modifiable = true
vim.bo[expected_buf].readonly = false
vim.bo[expected_buf].buftype = 'nofile'
vim.bo[expected_buf].buftype = 'acwrite'
vim.bo[expected_buf].buflisted = false
helpers.clearcol(expected_buf)

View file

@ -2,6 +2,7 @@ local M = {}
---@class PanelOpts
---@field debug? boolean
---@field test_index? integer
local cache = require('cp.cache')
local config_module = require('cp.config')
@ -13,7 +14,7 @@ local utils = require('cp.utils')
local current_diff_layout = nil
local current_mode = nil
local io_view_running = false
local _run_gen = 0
function M.disable()
local active_panel = state.get_active_panel()
@ -26,6 +27,8 @@ function M.disable()
M.toggle_panel()
elseif active_panel == 'interactive' then
M.toggle_interactive()
elseif active_panel == 'stress' then
require('cp.stress').toggle()
else
logger.log(('Unknown panel type: %s'):format(tostring(active_panel)))
end
@ -50,7 +53,7 @@ function M.toggle_interactive(interactor_cmd)
end
if state.get_active_panel() then
logger.log('Another panel is already active.', vim.log.levels.WARN)
logger.log('Another panel is already active.', { level = vim.log.levels.WARN })
return
end
@ -59,7 +62,7 @@ function M.toggle_interactive(interactor_cmd)
if not platform or not contest_id or not problem_id then
logger.log(
'No platform/contest/problem configured. Use :CP <platform> <contest> [...] first.',
vim.log.levels.ERROR
{ level = vim.log.levels.ERROR }
)
return
end
@ -67,11 +70,14 @@ function M.toggle_interactive(interactor_cmd)
cache.load()
local contest_data = cache.get_contest_data(platform, contest_id)
if
not contest_data
or not contest_data.index_map
or not contest_data.problems[contest_data.index_map[problem_id]].interactive
contest_data
and contest_data.index_map
and not contest_data.problems[contest_data.index_map[problem_id]].interactive
then
logger.log('This problem is interactive. Use :CP interact.', vim.log.levels.ERROR)
logger.log(
'This problem is not interactive. Use :CP {run,panel}.',
{ level = vim.log.levels.ERROR }
)
return
end
@ -100,7 +106,7 @@ function M.toggle_interactive(interactor_cmd)
local binary = state.get_binary_file()
if not binary or binary == '' then
logger.log('No binary produced.', vim.log.levels.ERROR)
logger.log('No binary produced.', { level = vim.log.levels.ERROR })
restore_session()
return
end
@ -114,20 +120,29 @@ function M.toggle_interactive(interactor_cmd)
if vim.fn.executable(interactor) ~= 1 then
logger.log(
("Interactor '%s' is not executable."):format(interactor_cmd),
vim.log.levels.ERROR
{ level = vim.log.levels.ERROR }
)
restore_session()
return
end
local orchestrator =
vim.fn.fnamemodify(utils.get_plugin_path() .. '/scripts/interact.py', ':p')
cmdline = table.concat({
'uv',
'run',
vim.fn.shellescape(orchestrator),
vim.fn.shellescape(interactor),
vim.fn.shellescape(binary),
}, ' ')
if utils.is_nix_build() then
cmdline = table.concat({
vim.fn.shellescape(utils.get_nix_python()),
vim.fn.shellescape(orchestrator),
vim.fn.shellescape(interactor),
vim.fn.shellescape(binary),
}, ' ')
else
cmdline = table.concat({
'uv',
'run',
vim.fn.shellescape(orchestrator),
vim.fn.shellescape(interactor),
vim.fn.shellescape(binary),
}, ' ')
end
else
cmdline = vim.fn.shellescape(binary)
end
@ -229,7 +244,6 @@ local function get_or_create_io_buffers()
state.set_io_view_state({
output_buf = output_buf,
input_buf = input_buf,
current_test_index = 1,
source_buf = current_source_buf,
})
@ -294,49 +308,6 @@ local function get_or_create_io_buffers()
end,
})
local cfg = config_module.get_config()
local platform = state.get_platform()
local contest_id = state.get_contest_id()
local problem_id = state.get_problem_id()
local function navigate_test(delta)
local io_view_state = state.get_io_view_state()
if not io_view_state then
return
end
if not platform or not contest_id or not problem_id then
return
end
local test_cases = cache.get_test_cases(platform, contest_id, problem_id)
if not test_cases or #test_cases == 0 then
return
end
local new_index = (io_view_state.current_test_index or 1) + delta
if new_index < 1 or new_index > #test_cases then
return
end
io_view_state.current_test_index = new_index
M.run_io_view(new_index)
end
if cfg.ui.run.next_test_key then
vim.keymap.set('n', cfg.ui.run.next_test_key, function()
navigate_test(1)
end, { buffer = output_buf, silent = true, desc = 'Next test' })
vim.keymap.set('n', cfg.ui.run.next_test_key, function()
navigate_test(1)
end, { buffer = input_buf, silent = true, desc = 'Next test' })
end
if cfg.ui.run.prev_test_key then
vim.keymap.set('n', cfg.ui.run.prev_test_key, function()
navigate_test(-1)
end, { buffer = output_buf, silent = true, desc = 'Previous test' })
vim.keymap.set('n', cfg.ui.run.prev_test_key, function()
navigate_test(-1)
end, { buffer = input_buf, silent = true, desc = 'Previous test' })
end
return output_buf, input_buf
end
@ -386,7 +357,7 @@ function M.ensure_io_view()
if not platform or not contest_id or not problem_id then
logger.log(
'No platform/contest/problem configured. Use :CP <platform> <contest> [...] first.',
vim.log.levels.ERROR
{ level = vim.log.levels.ERROR }
)
return
end
@ -415,7 +386,10 @@ function M.ensure_io_view()
and contest_data.index_map
and contest_data.problems[contest_data.index_map[problem_id]].interactive
then
logger.log('This problem is not interactive. Use :CP {run,panel}.', vim.log.levels.ERROR)
logger.log(
'This problem is not interactive. Use :CP {run,panel}.',
{ level = vim.log.levels.ERROR }
)
return
end
@ -435,12 +409,12 @@ function M.ensure_io_view()
local cfg = config_module.get_config()
if cfg.hooks and cfg.hooks.setup_io_output then
pcall(cfg.hooks.setup_io_output, output_buf, state)
local io = cfg.hooks and cfg.hooks.setup and cfg.hooks.setup.io
if io and io.output then
pcall(io.output, output_buf, state)
end
if cfg.hooks and cfg.hooks.setup_io_input then
pcall(cfg.hooks.setup_io_input, input_buf, state)
if io and io.input then
pcall(io.input, input_buf, state)
end
local test_cases = cache.get_test_cases(platform, contest_id, problem_id)
@ -625,13 +599,13 @@ local function render_io_view_results(io_state, test_indices, mode, combined_res
end
function M.run_io_view(test_indices_arg, debug, mode)
if io_view_running then
logger.log('Tests already running', vim.log.levels.WARN)
return
end
io_view_running = true
_run_gen = _run_gen + 1
local gen = _run_gen
logger.log(('%s tests...'):format(debug and 'Debugging' or 'Running'), vim.log.levels.INFO, true)
logger.log(
('%s tests...'):format(debug and 'Debugging' or 'Running'),
{ level = vim.log.levels.INFO, override = true }
)
mode = mode or 'combined'
@ -640,17 +614,15 @@ function M.run_io_view(test_indices_arg, debug, mode)
if not platform or not contest_id or not problem_id then
logger.log(
'No platform/contest/problem configured. Use :CP <platform> <contest> [...] first.',
vim.log.levels.ERROR
{ level = vim.log.levels.ERROR }
)
io_view_running = false
return
end
cache.load()
local contest_data = cache.get_contest_data(platform, contest_id)
if not contest_data or not contest_data.index_map then
logger.log('No test cases available.', vim.log.levels.ERROR)
io_view_running = false
logger.log('No test cases available.', { level = vim.log.levels.ERROR })
return
end
@ -666,14 +638,12 @@ function M.run_io_view(test_indices_arg, debug, mode)
if mode == 'combined' then
local combined = cache.get_combined_test(platform, contest_id, problem_id)
if not combined then
logger.log('No combined test available', vim.log.levels.ERROR)
io_view_running = false
logger.log('No combined test available', { level = vim.log.levels.ERROR })
return
end
else
if not run.load_test_cases() then
logger.log('No test cases available', vim.log.levels.ERROR)
io_view_running = false
logger.log('No test cases available', { level = vim.log.levels.ERROR })
return
end
end
@ -692,9 +662,8 @@ function M.run_io_view(test_indices_arg, debug, mode)
idx,
#test_state.test_cases
),
vim.log.levels.WARN
{ level = vim.log.levels.WARN }
)
io_view_running = false
return
end
end
@ -712,7 +681,6 @@ function M.run_io_view(test_indices_arg, debug, mode)
local io_state = state.get_io_view_state()
if not io_state then
io_view_running = false
return
end
@ -725,8 +693,10 @@ function M.run_io_view(test_indices_arg, debug, mode)
local execute = require('cp.runner.execute')
execute.compile_problem(debug, function(compile_result)
if gen ~= _run_gen then
return
end
if not vim.api.nvim_buf_is_valid(io_state.output_buf) then
io_view_running = false
return
end
@ -746,43 +716,62 @@ function M.run_io_view(test_indices_arg, debug, mode)
local ns = vim.api.nvim_create_namespace('cp_io_view_compile_error')
utils.update_buffer_content(io_state.output_buf, lines, highlights, ns)
io_view_running = false
return
end
if mode == 'combined' then
local combined = cache.get_combined_test(platform, contest_id, problem_id)
if not combined then
logger.log('No combined test found', vim.log.levels.ERROR)
io_view_running = false
logger.log('No combined test found', { level = vim.log.levels.ERROR })
return
end
run.load_test_cases()
run.run_combined_test(debug, function(result)
if gen ~= _run_gen then
return
end
if not result then
logger.log('Failed to run combined test', vim.log.levels.ERROR)
io_view_running = false
logger.log('Failed to run combined test', { level = vim.log.levels.ERROR })
return
end
if vim.api.nvim_buf_is_valid(io_state.output_buf) then
render_io_view_results(io_state, test_indices, mode, result, combined.input)
end
io_view_running = false
end)
else
run.run_all_test_cases(test_indices, debug, nil, function()
if gen ~= _run_gen then
return
end
if vim.api.nvim_buf_is_valid(io_state.output_buf) then
render_io_view_results(io_state, test_indices, mode, nil, nil)
end
io_view_running = false
end)
end
end)
end
function M.cancel_io_view()
_run_gen = _run_gen + 1
end
function M.cancel_interactive()
if state.interactive_buf and vim.api.nvim_buf_is_valid(state.interactive_buf) then
local job = vim.b[state.interactive_buf].terminal_job_id
if job then
vim.fn.jobstop(job)
end
end
if state.saved_interactive_session then
vim.fn.delete(state.saved_interactive_session)
state.saved_interactive_session = nil
end
state.set_active_panel(nil)
end
---@param panel_opts? PanelOpts
function M.toggle_panel(panel_opts)
if state.get_active_panel() == 'run' then
@ -803,7 +792,7 @@ function M.toggle_panel(panel_opts)
end
if state.get_active_panel() then
logger.log('another panel is already active', vim.log.levels.ERROR)
logger.log('another panel is already active', { level = vim.log.levels.ERROR })
return
end
@ -812,7 +801,7 @@ function M.toggle_panel(panel_opts)
if not platform or not contest_id then
logger.log(
'No platform/contest configured. Use :CP <platform> <contest> [...] first.',
vim.log.levels.ERROR
{ level = vim.log.levels.ERROR }
)
return
end
@ -821,9 +810,13 @@ function M.toggle_panel(panel_opts)
local contest_data = cache.get_contest_data(platform, contest_id)
if
contest_data
and contest_data.index_map
and contest_data.problems[contest_data.index_map[state.get_problem_id()]].interactive
then
logger.log('This is an interactive problem. Use :CP interact instead.', vim.log.levels.WARN)
logger.log(
'This is an interactive problem. Use :CP interact instead.',
{ level = vim.log.levels.WARN }
)
return
end
@ -834,10 +827,17 @@ function M.toggle_panel(panel_opts)
logger.log(('run panel: checking test cases for %s'):format(input_file or 'none'))
if not run.load_test_cases() then
logger.log('no test cases found', vim.log.levels.WARN)
logger.log('no test cases found', { level = vim.log.levels.WARN })
return
end
if panel_opts and panel_opts.test_index then
local test_state = run.get_panel_state()
if panel_opts.test_index >= 1 and panel_opts.test_index <= #test_state.test_cases then
test_state.current_index = panel_opts.test_index
end
end
local io_state = state.get_io_view_state()
if io_state then
for _, win in ipairs(vim.api.nvim_list_wins()) do
@ -949,14 +949,15 @@ function M.toggle_panel(panel_opts)
setup_keybindings_for_buffer(test_buffers.tab_buf)
if config.hooks and config.hooks.before_run then
vim.schedule_wrap(function()
config.hooks.before_run(state)
local o = config.hooks and config.hooks.on
if o and o.run then
vim.schedule(function()
o.run(state)
end)
end
if panel_opts and panel_opts.debug and config.hooks and config.hooks.before_debug then
vim.schedule_wrap(function()
config.hooks.before_debug(state)
if panel_opts and panel_opts.debug and o and o.debug then
vim.schedule(function()
o.debug(state)
end)
end

View file

@ -2,7 +2,11 @@ local M = {}
local logger = require('cp.log')
local uname = vim.loop.os_uname()
local _nix_python = nil
local _nix_submit_cmd = nil
local _nix_discovered = false
local uname = vim.uv.os_uname()
local _time_cached = false
local _time_path = nil
@ -57,7 +61,11 @@ local function find_gnu_time()
_time_cached = true
_time_path = nil
_time_reason = 'GNU time not found'
if uname and uname.sysname == 'Darwin' then
_time_reason = 'GNU time not found (install via: brew install coreutils)'
else
_time_reason = 'GNU time not found'
end
return _time_path, _time_reason
end
@ -79,7 +87,164 @@ function M.get_plugin_path()
return vim.fn.fnamemodify(plugin_path, ':h:h:h')
end
---@return boolean
function M.is_nix_build()
return _nix_python ~= nil
end
---@return string|nil
function M.get_nix_python()
return _nix_python
end
---@return boolean
function M.is_nix_discovered()
return _nix_discovered
end
---@param module string
---@param plugin_path string
---@return string[]
function M.get_python_cmd(module, plugin_path)
if _nix_python then
return { _nix_python, '-m', 'scrapers.' .. module }
end
return { 'uv', 'run', '--directory', plugin_path, '-m', 'scrapers.' .. module }
end
---@param module string
---@param plugin_path string
---@return string[]
function M.get_python_submit_cmd(module, plugin_path)
if _nix_submit_cmd then
return { _nix_submit_cmd, 'run', '--directory', plugin_path, '-m', 'scrapers.' .. module }
end
return { 'uv', 'run', '--directory', plugin_path, '-m', 'scrapers.' .. module }
end
local python_env_setup = false
local _nix_submit_attempted = false
---@return boolean
local function discover_nix_submit_cmd()
local cache_dir = vim.fn.stdpath('cache') .. '/cp-nvim'
local cache_file = cache_dir .. '/nix-submit'
local f = io.open(cache_file, 'r')
if f then
local cached = f:read('*l')
f:close()
if cached and vim.fn.executable(cached) == 1 then
_nix_submit_cmd = cached
return true
end
end
local plugin_path = M.get_plugin_path()
vim.cmd.redraw()
vim.notify('Building submit environment...', vim.log.levels.INFO)
vim.cmd.redraw()
local result = vim
.system(
{ 'nix', 'build', plugin_path .. '#submitEnv', '--no-link', '--print-out-paths' },
{ text = true }
)
:wait()
if result.code ~= 0 then
logger.log(
'nix build #submitEnv failed: ' .. (result.stderr or ''),
{ level = vim.log.levels.WARN }
)
return false
end
local store_path = result.stdout:gsub('%s+$', '')
local submit_cmd = store_path .. '/bin/cp-nvim-submit'
if vim.fn.executable(submit_cmd) ~= 1 then
logger.log('nix submit cmd not executable at ' .. submit_cmd, { level = vim.log.levels.WARN })
return false
end
vim.fn.mkdir(cache_dir, 'p')
f = io.open(cache_file, 'w')
if f then
f:write(submit_cmd)
f:close()
end
_nix_submit_cmd = submit_cmd
return true
end
---@return boolean
function M.setup_nix_submit_env()
if _nix_submit_cmd then
return true
end
if _nix_submit_attempted then
return false
end
_nix_submit_attempted = true
if vim.fn.executable('nix') == 1 then
return discover_nix_submit_cmd()
end
return false
end
---@return boolean
local function discover_nix_python()
local cache_dir = vim.fn.stdpath('cache') .. '/cp-nvim'
local cache_file = cache_dir .. '/nix-python'
local f = io.open(cache_file, 'r')
if f then
local cached = f:read('*l')
f:close()
if cached and vim.fn.executable(cached) == 1 then
_nix_python = cached
return true
end
end
local plugin_path = M.get_plugin_path()
vim.notify('[cp.nvim] Building Python environment with nix...', vim.log.levels.INFO)
vim.cmd.redraw()
local result = vim
.system(
{ 'nix', 'build', plugin_path .. '#pythonEnv', '--no-link', '--print-out-paths' },
{ text = true }
)
:wait()
if result.code ~= 0 then
logger.log(
'nix build #pythonEnv failed: ' .. (result.stderr or ''),
{ level = vim.log.levels.WARN }
)
return false
end
local store_path = result.stdout:gsub('%s+$', '')
local python_path = store_path .. '/bin/python3'
if vim.fn.executable(python_path) ~= 1 then
logger.log('nix python not executable at ' .. python_path, { level = vim.log.levels.WARN })
return false
end
vim.fn.mkdir(cache_dir, 'p')
f = io.open(cache_file, 'w')
if f then
f:write(python_path)
f:close()
end
_nix_python = python_path
_nix_discovered = true
return true
end
---@return boolean success
function M.setup_python_env()
@ -87,19 +252,20 @@ function M.setup_python_env()
return true
end
local plugin_path = M.get_plugin_path()
local venv_dir = plugin_path .. '/.venv'
if vim.fn.executable('uv') == 0 then
logger.log(
'uv is not installed. Install it to enable problem scraping: https://docs.astral.sh/uv/',
vim.log.levels.WARN
)
return false
if _nix_python then
logger.log('Python env: nix (python=' .. _nix_python .. ')')
python_env_setup = true
return true
end
if vim.fn.isdirectory(venv_dir) == 0 then
logger.log('Setting up Python environment for scrapers...')
local on_nixos = vim.fn.filereadable('/etc/NIXOS') == 1
if not on_nixos and vim.fn.executable('uv') == 1 then
local plugin_path = M.get_plugin_path()
logger.log('Python env: uv sync (dir=' .. plugin_path .. ')')
vim.notify('[cp.nvim] Setting up Python environment...', vim.log.levels.INFO)
vim.cmd.redraw()
local env = vim.fn.environ()
env.VIRTUAL_ENV = ''
env.PYTHONPATH = ''
@ -108,14 +274,33 @@ function M.setup_python_env()
.system({ 'uv', 'sync' }, { cwd = plugin_path, text = true, env = env })
:wait()
if result.code ~= 0 then
logger.log('Failed to setup Python environment: ' .. result.stderr, vim.log.levels.ERROR)
logger.log(
'Failed to setup Python environment: ' .. (result.stderr or ''),
{ level = vim.log.levels.ERROR }
)
return false
end
logger.log('Python environment setup complete.')
if result.stderr and result.stderr ~= '' then
logger.log('uv sync stderr: ' .. result.stderr:gsub('%s+$', ''))
end
python_env_setup = true
return true
end
python_env_setup = true
return true
if vim.fn.executable('nix') == 1 then
logger.log('Python env: nix discovery')
if discover_nix_python() then
python_env_setup = true
return true
end
end
logger.log(
'No Python environment available. Install uv (https://docs.astral.sh/uv/) or use nix.',
{ level = vim.log.levels.WARN }
)
return false
end
--- Configure the buffer with good defaults
@ -162,20 +347,12 @@ function M.check_required_runtime()
local time = M.time_capability()
if not time.ok then
return false, 'GNU time not found: ' .. (time.reason or '')
return false, time.reason
end
local timeout = M.timeout_capability()
if not timeout.ok then
return false, 'GNU timeout not found: ' .. (timeout.reason or '')
end
if vim.fn.executable('uv') ~= 1 then
return false, 'uv not found (https://docs.astral.sh/uv/)'
end
if not M.setup_python_env() then
return false, 'failed to set up Python virtual environment'
return false, timeout.reason
end
return true
@ -225,7 +402,11 @@ local function find_gnu_timeout()
_timeout_cached = true
_timeout_path = nil
_timeout_reason = 'GNU timeout not found'
if uname and uname.sysname == 'Darwin' then
_timeout_reason = 'GNU timeout not found (install via: brew install coreutils)'
else
_timeout_reason = 'GNU timeout not found'
end
return _timeout_path, _timeout_reason
end
@ -240,7 +421,7 @@ function M.timeout_capability()
end
function M.cwd_executables()
local uv = vim.uv or vim.loop
local uv = vim.uv
local req = uv.fs_scandir('.')
if not req then
return {}

0
new
View file

View file

@ -43,9 +43,13 @@ end, {
vim.list_extend(candidates, platforms)
table.insert(candidates, 'cache')
table.insert(candidates, 'pick')
if platform and contest_id then
vim.list_extend(candidates, actions)
vim.list_extend(
candidates,
vim.tbl_filter(function(a)
return a ~= 'pick' and a ~= 'cache'
end, actions)
)
local cache = require('cp.cache')
cache.load()
local contest_data = cache.get_contest_data(platform, contest_id)
@ -60,13 +64,14 @@ end, {
return filter_candidates(candidates)
elseif num_args == 3 then
if vim.tbl_contains(platforms, args[2]) then
local candidates = { 'login', 'logout' }
local cache = require('cp.cache')
cache.load()
local contests = cache.get_cached_contest_ids(args[2])
return filter_candidates(contests)
vim.list_extend(candidates, cache.get_cached_contest_ids(args[2]))
return filter_candidates(candidates)
elseif args[2] == 'cache' then
return filter_candidates({ 'clear', 'read' })
elseif args[2] == 'interact' then
elseif args[2] == 'stress' or args[2] == 'interact' then
local utils = require('cp.utils')
return filter_candidates(utils.cwd_executables())
elseif args[2] == 'edit' then
@ -103,6 +108,10 @@ end, {
end
end
return filter_candidates(candidates)
elseif args[2] == 'race' then
local candidates = { 'stop' }
vim.list_extend(candidates, platforms)
return filter_candidates(candidates)
elseif args[2] == 'next' or args[2] == 'prev' or args[2] == 'pick' then
return filter_candidates({ '--lang' })
else
@ -112,7 +121,15 @@ end, {
end
end
elseif num_args == 4 then
if args[2] == 'cache' and args[3] == 'clear' then
if args[2] == 'stress' then
local utils = require('cp.utils')
return filter_candidates(utils.cwd_executables())
elseif args[2] == 'race' and vim.tbl_contains(platforms, args[3]) then
local cache = require('cp.cache')
cache.load()
local contests = cache.get_cached_contest_ids(args[3])
return filter_candidates(contests)
elseif args[2] == 'cache' and args[3] == 'clear' then
local candidates = vim.list_extend({}, platforms)
table.insert(candidates, '')
return filter_candidates(candidates)
@ -134,7 +151,9 @@ end, {
return filter_candidates(candidates)
end
elseif num_args == 5 then
if args[2] == 'cache' and args[3] == 'clear' and vim.tbl_contains(platforms, args[4]) then
if args[2] == 'race' and vim.tbl_contains(platforms, args[3]) then
return filter_candidates({ '--lang' })
elseif args[2] == 'cache' and args[3] == 'clear' and vim.tbl_contains(platforms, args[4]) then
local cache = require('cp.cache')
cache.load()
local contests = cache.get_cached_contest_ids(args[4])
@ -147,10 +166,31 @@ end, {
end
end
elseif num_args == 6 then
if vim.tbl_contains(platforms, args[2]) and args[5] == '--lang' then
if args[2] == 'race' and vim.tbl_contains(platforms, args[3]) and args[5] == '--lang' then
return filter_candidates(get_enabled_languages(args[3]))
elseif vim.tbl_contains(platforms, args[2]) and args[5] == '--lang' then
return filter_candidates(get_enabled_languages(args[2]))
end
end
return {}
end,
})
local function cp_action(action)
return function()
require('cp').handle_command({ fargs = { action } })
end
end
vim.keymap.set('n', '<Plug>(cp-run)', cp_action('run'), { desc = 'CP run tests' })
vim.keymap.set('n', '<Plug>(cp-panel)', cp_action('panel'), { desc = 'CP open panel' })
vim.keymap.set('n', '<Plug>(cp-edit)', cp_action('edit'), { desc = 'CP edit test cases' })
vim.keymap.set('n', '<Plug>(cp-next)', cp_action('next'), { desc = 'CP next problem' })
vim.keymap.set('n', '<Plug>(cp-prev)', cp_action('prev'), { desc = 'CP previous problem' })
vim.keymap.set('n', '<Plug>(cp-pick)', cp_action('pick'), { desc = 'CP pick contest' })
vim.keymap.set('n', '<Plug>(cp-interact)', cp_action('interact'), { desc = 'CP interactive mode' })
vim.keymap.set('n', '<Plug>(cp-stress)', cp_action('stress'), { desc = 'CP stress test' })
vim.keymap.set('n', '<Plug>(cp-submit)', cp_action('submit'), { desc = 'CP submit solution' })
vim.keymap.set('n', '<Plug>(cp-race-stop)', function()
require('cp.race').stop()
end, { desc = 'CP stop race countdown' })

View file

@ -7,13 +7,11 @@ requires-python = ">=3.11"
dependencies = [
"backoff>=2.2.1",
"beautifulsoup4>=4.13.5",
"curl-cffi>=0.13.0",
"scrapling[fetchers]>=0.4",
"httpx>=0.28.1",
"ndjson>=0.3.1",
"pydantic>=2.11.10",
"requests>=2.32.5",
"scrapling[fetchers]>=0.3.5",
"types-requests>=2.32.4.20250913",
]
[dependency-groups]

View file

@ -2,9 +2,11 @@
import asyncio
import json
import os
import re
import sys
import subprocess
import time
from pathlib import Path
from typing import Any
import backoff
@ -14,21 +16,34 @@ from bs4 import BeautifulSoup, Tag
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from .base import BaseScraper
from .base import BaseScraper, extract_precision
from .models import (
CombinedTest,
ContestListResult,
ContestSummary,
LoginResult,
MetadataResult,
ProblemSummary,
SubmitResult,
TestCase,
TestsResult,
)
from .timeouts import (
BROWSER_ELEMENT_WAIT,
BROWSER_NAV_TIMEOUT,
BROWSER_SESSION_TIMEOUT,
BROWSER_SETTLE_DELAY,
BROWSER_SUBMIT_NAV_TIMEOUT,
BROWSER_TURNSTILE_POLL,
HTTP_TIMEOUT,
)
_LANGUAGE_ID_EXTENSION = {
"6017": "cc",
"6082": "py",
}
MIB_TO_MB = 1.048576
BASE_URL = "https://atcoder.jp"
ARCHIVE_URL = f"{BASE_URL}/contests/archive"
TIMEOUT_SECONDS = 30
HEADERS = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
}
@ -71,7 +86,7 @@ def _retry_after_requests(details):
on_backoff=_retry_after_requests,
)
def _fetch(url: str) -> str:
r = _session.get(url, headers=HEADERS, timeout=TIMEOUT_SECONDS)
r = _session.get(url, headers=HEADERS, timeout=HTTP_TIMEOUT)
if r.status_code in RETRY_STATUS:
raise requests.HTTPError(response=r)
r.raise_for_status()
@ -94,7 +109,7 @@ def _giveup_httpx(exc: Exception) -> bool:
giveup=_giveup_httpx,
)
async def _get_async(client: httpx.AsyncClient, url: str) -> str:
r = await client.get(url, headers=HEADERS, timeout=TIMEOUT_SECONDS)
r = await client.get(url, headers=HEADERS, timeout=HTTP_TIMEOUT)
r.raise_for_status()
return r.text
@ -121,6 +136,23 @@ def _parse_last_page(html: str) -> int:
return max(nums) if nums else 1
def _parse_start_time(tr: Tag) -> int | None:
tds = tr.select("td")
if not tds:
return None
time_el = tds[0].select_one("time.fixtime-full")
if not time_el:
return None
text = time_el.get_text(strip=True)
try:
from datetime import datetime
dt = datetime.strptime(text, "%Y-%m-%d %H:%M:%S%z")
return int(dt.timestamp())
except (ValueError, TypeError):
return None
def _parse_archive_contests(html: str) -> list[ContestSummary]:
soup = BeautifulSoup(html, "html.parser")
tbody = soup.select_one("table.table-default tbody") or soup.select_one("tbody")
@ -139,7 +171,10 @@ def _parse_archive_contests(html: str) -> list[ContestSummary]:
continue
cid = m.group(1)
name = a.get_text(strip=True)
out.append(ContestSummary(id=cid, name=name, display_name=name))
start_time = _parse_start_time(tr)
out.append(
ContestSummary(id=cid, name=name, display_name=name, start_time=start_time)
)
return out
@ -169,7 +204,7 @@ def _parse_tasks_list(html: str) -> list[dict[str, str]]:
return rows
def _extract_problem_info(html: str) -> tuple[int, float, bool]:
def _extract_problem_info(html: str) -> tuple[int, float, bool, float | None]:
soup = BeautifulSoup(html, "html.parser")
txt = soup.get_text(" ", strip=True)
timeout_ms = 0
@ -181,9 +216,10 @@ def _extract_problem_info(html: str) -> tuple[int, float, bool]:
if ms:
memory_mb = float(ms.group(1)) * MIB_TO_MB
div = soup.select_one("#problem-statement")
txt = div.get_text(" ", strip=True) if div else soup.get_text(" ", strip=True)
interactive = "This is an interactive" in txt
return timeout_ms, memory_mb, interactive
body = div.get_text(" ", strip=True) if div else soup.get_text(" ", strip=True)
interactive = "This is an interactive" in body
precision = extract_precision(body)
return timeout_ms, memory_mb, interactive, precision
def _extract_samples(html: str) -> list[TestCase]:
@ -209,6 +245,277 @@ def _extract_samples(html: str) -> list[TestCase]:
return cases
_TURNSTILE_JS = "() => { const el = document.querySelector('[name=\"cf-turnstile-response\"]'); return el && el.value.length > 0; }"
def _solve_turnstile(page) -> None:
if page.evaluate(_TURNSTILE_JS):
return
iframe_loc = page.locator('iframe[src*="challenges.cloudflare.com"]')
if not iframe_loc.count():
return
for _ in range(6):
try:
box = iframe_loc.first.bounding_box()
if box:
page.mouse.click(
box["x"] + box["width"] * 0.15,
box["y"] + box["height"] * 0.5,
)
except Exception:
pass
try:
page.wait_for_function(_TURNSTILE_JS, timeout=BROWSER_TURNSTILE_POLL)
return
except Exception:
pass
raise RuntimeError("Turnstile not solved after multiple attempts")
def _ensure_browser() -> None:
try:
from patchright._impl._driver import compute_driver_executable # type: ignore[import-untyped,unresolved-import]
node, cli = compute_driver_executable()
except Exception:
return
browser_info = subprocess.run(
[node, cli, "install", "--dry-run", "chromium"],
capture_output=True,
text=True,
)
for line in browser_info.stdout.splitlines():
if "Install location:" in line:
install_dir = line.split(":", 1)[1].strip()
if not os.path.isdir(install_dir):
print(json.dumps({"status": "installing_browser"}), flush=True)
subprocess.run([node, cli, "install", "chromium"], check=True)
break
def _login_headless(credentials: dict[str, str]) -> LoginResult:
try:
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
except ImportError:
return LoginResult(
success=False,
error="scrapling is required for AtCoder login. Install it: uv add 'scrapling[fetchers]>=0.4'",
)
_ensure_browser()
cookie_cache = Path.home() / ".cache" / "cp-nvim" / "atcoder-cookies.json"
cookie_cache.parent.mkdir(parents=True, exist_ok=True)
saved_cookies: list[dict[str, Any]] = []
if cookie_cache.exists():
try:
saved_cookies = json.loads(cookie_cache.read_text())
except Exception:
pass
logged_in = False
login_error: str | None = None
def check_login(page):
nonlocal logged_in
logged_in = page.evaluate(
"() => Array.from(document.querySelectorAll('a')).some(a => a.textContent.trim() === 'Sign Out')"
)
def login_action(page):
nonlocal login_error
try:
_solve_turnstile(page)
page.fill('input[name="username"]', credentials.get("username", ""))
page.fill('input[name="password"]', credentials.get("password", ""))
page.click("#submit")
page.wait_for_url(
lambda url: "/login" not in url, timeout=BROWSER_NAV_TIMEOUT
)
except Exception as e:
login_error = str(e)
try:
with StealthySession(
headless=True,
timeout=BROWSER_SESSION_TIMEOUT,
google_search=False,
cookies=saved_cookies if saved_cookies else [],
) as session:
if saved_cookies:
print(json.dumps({"status": "checking_login"}), flush=True)
session.fetch(
f"{BASE_URL}/home", page_action=check_login, network_idle=True
)
if not logged_in:
print(json.dumps({"status": "logging_in"}), flush=True)
session.fetch(
f"{BASE_URL}/login",
page_action=login_action,
solve_cloudflare=True,
)
if login_error:
return LoginResult(
success=False, error=f"Login failed: {login_error}"
)
session.fetch(
f"{BASE_URL}/home", page_action=check_login, network_idle=True
)
if not logged_in:
return LoginResult(
success=False, error="Login failed (bad credentials?)"
)
try:
browser_cookies = session.context.cookies()
if any(c["name"] == "REVEL_SESSION" for c in browser_cookies):
cookie_cache.write_text(json.dumps(browser_cookies))
except Exception:
pass
return LoginResult(success=True, error="")
except Exception as e:
return LoginResult(success=False, error=str(e))
def _submit_headless(
contest_id: str,
problem_id: str,
file_path: str,
language_id: str,
credentials: dict[str, str],
_retried: bool = False,
) -> "SubmitResult":
try:
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
except ImportError:
return SubmitResult(
success=False,
error="scrapling is required for AtCoder submit. Install it: uv add 'scrapling[fetchers]>=0.4'",
)
_ensure_browser()
cookie_cache = Path.home() / ".cache" / "cp-nvim" / "atcoder-cookies.json"
cookie_cache.parent.mkdir(parents=True, exist_ok=True)
saved_cookies: list[dict[str, Any]] = []
if cookie_cache.exists():
try:
saved_cookies = json.loads(cookie_cache.read_text())
except Exception:
pass
logged_in = cookie_cache.exists() and not _retried
login_error: str | None = None
submit_error: str | None = None
needs_relogin = False
def check_login(page):
nonlocal logged_in
logged_in = page.evaluate(
"() => Array.from(document.querySelectorAll('a')).some(a => a.textContent.trim() === 'Sign Out')"
)
def login_action(page):
nonlocal login_error
try:
_solve_turnstile(page)
page.fill('input[name="username"]', credentials.get("username", ""))
page.fill('input[name="password"]', credentials.get("password", ""))
page.click("#submit")
page.wait_for_url(
lambda url: "/login" not in url, timeout=BROWSER_NAV_TIMEOUT
)
except Exception as e:
login_error = str(e)
def submit_action(page):
nonlocal submit_error, needs_relogin
if "/login" in page.url:
needs_relogin = True
return
try:
_solve_turnstile(page)
page.select_option(
'select[name="data.TaskScreenName"]',
f"{contest_id}_{problem_id}",
)
page.locator(
f'select[name="data.LanguageId"] option[value="{language_id}"]'
).wait_for(state="attached", timeout=BROWSER_ELEMENT_WAIT)
page.select_option('select[name="data.LanguageId"]', language_id)
page.set_input_files("#input-open-file", file_path)
page.wait_for_timeout(BROWSER_SETTLE_DELAY)
page.locator('button[type="submit"]').click()
page.wait_for_url(
lambda url: "/submissions/me" in url,
timeout=BROWSER_SUBMIT_NAV_TIMEOUT["atcoder"],
)
except Exception as e:
submit_error = str(e)
try:
with StealthySession(
headless=True,
timeout=BROWSER_SESSION_TIMEOUT,
google_search=False,
cookies=saved_cookies if (cookie_cache.exists() and not _retried) else [],
) as session:
if not (cookie_cache.exists() and not _retried):
print(json.dumps({"status": "checking_login"}), flush=True)
session.fetch(
f"{BASE_URL}/home", page_action=check_login, network_idle=True
)
if not logged_in:
print(json.dumps({"status": "logging_in"}), flush=True)
session.fetch(
f"{BASE_URL}/login",
page_action=login_action,
solve_cloudflare=True,
)
if login_error:
return SubmitResult(
success=False, error=f"Login failed: {login_error}"
)
print(json.dumps({"status": "submitting"}), flush=True)
session.fetch(
f"{BASE_URL}/contests/{contest_id}/submit",
page_action=submit_action,
solve_cloudflare=True,
)
try:
browser_cookies = session.context.cookies()
if any(c["name"] == "REVEL_SESSION" for c in browser_cookies):
cookie_cache.write_text(json.dumps(browser_cookies))
except Exception:
pass
if needs_relogin and not _retried:
cookie_cache.unlink(missing_ok=True)
return _submit_headless(
contest_id,
problem_id,
file_path,
language_id,
credentials,
_retried=True,
)
if submit_error:
return SubmitResult(success=False, error=submit_error)
return SubmitResult(
success=True, error="", submission_id="", verdict="submitted"
)
except Exception as e:
return SubmitResult(success=False, error=str(e))
def _scrape_tasks_sync(contest_id: str) -> list[dict[str, str]]:
html = _fetch(f"{BASE_URL}/contests/{contest_id}/tasks")
return _parse_tasks_list(html)
@ -220,12 +527,13 @@ def _scrape_problem_page_sync(contest_id: str, slug: str) -> dict[str, Any]:
tests = _extract_samples(html)
except Exception:
tests = []
timeout_ms, memory_mb, interactive = _extract_problem_info(html)
timeout_ms, memory_mb, interactive, precision = _extract_problem_info(html)
return {
"tests": tests,
"timeout_ms": timeout_ms,
"memory_mb": memory_mb,
"interactive": interactive,
"precision": precision,
}
@ -241,14 +549,29 @@ def _to_problem_summaries(rows: list[dict[str, str]]) -> list[ProblemSummary]:
return out
async def _fetch_upcoming_contests_async(
client: httpx.AsyncClient,
) -> list[ContestSummary]:
try:
html = await _get_async(client, f"{BASE_URL}/contests/")
return _parse_archive_contests(html)
except Exception:
return []
async def _fetch_all_contests_async() -> list[ContestSummary]:
async with httpx.AsyncClient(
limits=httpx.Limits(max_connections=100, max_keepalive_connections=100),
) as client:
upcoming = await _fetch_upcoming_contests_async(client)
first_html = await _get_async(client, ARCHIVE_URL)
last = _parse_last_page(first_html)
out = _parse_archive_contests(first_html)
if last <= 1:
seen = {c.id for c in out}
for c in upcoming:
if c.id not in seen:
out.append(c)
return out
tasks = [
asyncio.create_task(_get_async(client, f"{ARCHIVE_URL}?page={p}"))
@ -257,6 +580,10 @@ async def _fetch_all_contests_async() -> list[ContestSummary]:
for coro in asyncio.as_completed(tasks):
html = await coro
out.extend(_parse_archive_contests(html))
seen = {c.id for c in out}
for c in upcoming:
if c.id not in seen:
out.append(c)
return out
@ -279,6 +606,8 @@ class AtcoderScraper(BaseScraper):
contest_id=contest_id,
problems=problems,
url=f"https://atcoder.jp/contests/{contest_id}/tasks/{contest_id}_%s",
contest_url=f"https://atcoder.jp/contests/{contest_id}",
standings_url=f"https://atcoder.jp/contests/{contest_id}/standings",
)
except Exception as e:
return self._metadata_error(str(e))
@ -319,6 +648,7 @@ class AtcoderScraper(BaseScraper):
"memory_mb": data.get("memory_mb", 0),
"interactive": bool(data.get("interactive")),
"multi_test": False,
"precision": data.get("precision"),
}
),
flush=True,
@ -326,74 +656,28 @@ class AtcoderScraper(BaseScraper):
await asyncio.gather(*(emit(r) for r in rows))
async def main_async() -> int:
if len(sys.argv) < 2:
result = MetadataResult(
success=False,
error="Usage: atcoder.py metadata <contest_id> OR atcoder.py tests <contest_id> OR atcoder.py contests",
url="",
async def submit(
self,
contest_id: str,
problem_id: str,
file_path: str,
language_id: str,
credentials: dict[str, str],
) -> SubmitResult:
return await asyncio.to_thread(
_submit_headless,
contest_id,
problem_id,
file_path,
language_id,
credentials,
)
print(result.model_dump_json())
return 1
mode: str = sys.argv[1]
scraper = AtcoderScraper()
if mode == "metadata":
if len(sys.argv) != 3:
result = MetadataResult(
success=False,
error="Usage: atcoder.py metadata <contest_id>",
url="",
)
print(result.model_dump_json())
return 1
contest_id = sys.argv[2]
result = await scraper.scrape_contest_metadata(contest_id)
print(result.model_dump_json())
return 0 if result.success else 1
if mode == "tests":
if len(sys.argv) != 3:
tests_result = TestsResult(
success=False,
error="Usage: atcoder.py tests <contest_id>",
problem_id="",
combined=CombinedTest(input="", expected=""),
tests=[],
timeout_ms=0,
memory_mb=0,
)
print(tests_result.model_dump_json())
return 1
contest_id = sys.argv[2]
await scraper.stream_tests_for_category_async(contest_id)
return 0
if mode == "contests":
if len(sys.argv) != 2:
contest_result = ContestListResult(
success=False, error="Usage: atcoder.py contests"
)
print(contest_result.model_dump_json())
return 1
contest_result = await scraper.scrape_contest_list()
print(contest_result.model_dump_json())
return 0 if contest_result.success else 1
result = MetadataResult(
success=False,
error="Unknown mode. Use 'metadata <contest_id>', 'tests <contest_id>', or 'contests'",
url="",
)
print(result.model_dump_json())
return 1
def main() -> None:
sys.exit(asyncio.run(main_async()))
async def login(self, credentials: dict[str, str]) -> LoginResult:
if not credentials.get("username") or not credentials.get("password"):
return self._login_error("Missing username or password")
return await asyncio.to_thread(_login_headless, credentials)
if __name__ == "__main__":
main()
AtcoderScraper().run_cli()

View file

@ -1,8 +1,38 @@
import asyncio
import json
import os
import re
import sys
from abc import ABC, abstractmethod
from .models import CombinedTest, ContestListResult, MetadataResult, TestsResult
from .language_ids import get_language_id
from .models import (
CombinedTest,
ContestListResult,
LoginResult,
MetadataResult,
SubmitResult,
TestsResult,
)
_PRECISION_ABS_REL_RE = re.compile(
r"(?:absolute|relative)\s+error[^.]*?10\s*[\^{]\s*\{?\s*[-\u2212]\s*(\d+)\s*\}?",
re.IGNORECASE,
)
_PRECISION_DECIMAL_RE = re.compile(
r"round(?:ed)?\s+to\s+(\d+)\s+decimal\s+place",
re.IGNORECASE,
)
def extract_precision(text: str) -> float | None:
m = _PRECISION_ABS_REL_RE.search(text)
if m:
return 10 ** -int(m.group(1))
m = _PRECISION_DECIMAL_RE.search(text)
if m:
return 10 ** -int(m.group(1))
return None
class BaseScraper(ABC):
@ -19,9 +49,22 @@ class BaseScraper(ABC):
@abstractmethod
async def stream_tests_for_category_async(self, category_id: str) -> None: ...
@abstractmethod
async def submit(
self,
contest_id: str,
problem_id: str,
file_path: str,
language_id: str,
credentials: dict[str, str],
) -> SubmitResult: ...
@abstractmethod
async def login(self, credentials: dict[str, str]) -> LoginResult: ...
def _usage(self) -> str:
name = self.platform_name
return f"Usage: {name}.py metadata <id> | tests <id> | contests"
return f"Usage: {name}.py metadata <id> | tests <id> | contests | login"
def _metadata_error(self, msg: str) -> MetadataResult:
return MetadataResult(success=False, error=msg, url="")
@ -40,6 +83,12 @@ class BaseScraper(ABC):
def _contests_error(self, msg: str) -> ContestListResult:
return ContestListResult(success=False, error=msg)
def _submit_error(self, msg: str) -> SubmitResult:
return SubmitResult(success=False, error=msg)
def _login_error(self, msg: str) -> LoginResult:
return LoginResult(success=False, error=msg)
async def _run_cli_async(self, args: list[str]) -> int:
if len(args) < 2:
print(self._metadata_error(self._usage()).model_dump_json())
@ -71,6 +120,36 @@ class BaseScraper(ABC):
print(result.model_dump_json())
return 0 if result.success else 1
case "submit":
if len(args) != 6:
print(
self._submit_error(
"Usage: <platform> submit <contest_id> <problem_id> <language_id> <file_path>"
).model_dump_json()
)
return 1
creds_raw = os.environ.get("CP_CREDENTIALS", "{}")
try:
credentials = json.loads(creds_raw)
except json.JSONDecodeError:
credentials = {}
language_id = get_language_id(self.platform_name, args[4]) or args[4]
result = await self.submit(
args[2], args[3], args[5], language_id, credentials
)
print(result.model_dump_json())
return 0 if result.success else 1
case "login":
creds_raw = os.environ.get("CP_CREDENTIALS", "{}")
try:
credentials = json.loads(creds_raw)
except json.JSONDecodeError:
credentials = {}
result = await self.login(credentials)
print(result.model_dump_json())
return 0 if result.success else 1
case _:
print(
self._metadata_error(

View file

@ -3,17 +3,21 @@
import asyncio
import json
import re
from pathlib import Path
from typing import Any
import httpx
from scrapling.fetchers import Fetcher
from curl_cffi import requests as curl_requests
from .base import BaseScraper
from .base import BaseScraper, extract_precision
from .timeouts import BROWSER_NAV_TIMEOUT, BROWSER_SESSION_TIMEOUT, HTTP_TIMEOUT
from .models import (
ContestListResult,
ContestSummary,
LoginResult,
MetadataResult,
ProblemSummary,
SubmitResult,
TestCase,
)
@ -25,15 +29,28 @@ PROBLEM_URL = "https://www.codechef.com/problems/{problem_id}"
HEADERS = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
TIMEOUT_S = 15.0
CONNECTIONS = 8
_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "codechef-cookies.json"
_CC_CHECK_LOGIN_JS = """() => {
const d = document.getElementById('__NEXT_DATA__');
if (d) {
try {
const p = JSON.parse(d.textContent);
if (p?.props?.pageProps?.currentUser?.username) return true;
} catch(e) {}
}
return !!document.querySelector('a[href="/logout"]') ||
!!document.querySelector('[class*="user-name"]');
}"""
MEMORY_LIMIT_RE = re.compile(
r"Memory\s+[Ll]imit.*?([0-9.]+)\s*(MB|GB)", re.IGNORECASE | re.DOTALL
)
async def fetch_json(client: httpx.AsyncClient, path: str) -> dict:
r = await client.get(BASE_URL + path, headers=HEADERS, timeout=TIMEOUT_S)
async def fetch_json(client: httpx.AsyncClient, path: str) -> dict[str, Any]:
r = await client.get(BASE_URL + path, headers=HEADERS, timeout=HTTP_TIMEOUT)
r.raise_for_status()
return r.json()
@ -50,8 +67,261 @@ def _extract_memory_limit(html: str) -> float:
def _fetch_html_sync(url: str) -> str:
response = Fetcher.get(url)
return str(response.body)
response = curl_requests.get(url, impersonate="chrome", timeout=HTTP_TIMEOUT)
response.raise_for_status()
return response.text
def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult:
try:
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
except ImportError:
return LoginResult(
success=False,
error="scrapling is required for CodeChef login",
)
from .atcoder import _ensure_browser
_ensure_browser()
_COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True)
saved_cookies: list[dict[str, Any]] = []
if _COOKIE_PATH.exists():
try:
saved_cookies = json.loads(_COOKIE_PATH.read_text())
except Exception:
pass
logged_in = False
login_error: str | None = None
def check_login(page):
nonlocal logged_in
logged_in = page.evaluate(_CC_CHECK_LOGIN_JS)
def login_action(page):
nonlocal login_error
try:
page.locator('input[type="email"], input[name="email"]').first.fill(
credentials.get("username", "")
)
page.locator('input[type="password"], input[name="password"]').first.fill(
credentials.get("password", "")
)
page.locator('button[type="submit"]').first.click()
page.wait_for_url(
lambda url: "/login" not in url, timeout=BROWSER_NAV_TIMEOUT
)
except Exception as e:
login_error = str(e)
try:
with StealthySession(
headless=True,
timeout=BROWSER_SESSION_TIMEOUT,
google_search=False,
cookies=saved_cookies if saved_cookies else [],
) as session:
if saved_cookies:
print(json.dumps({"status": "checking_login"}), flush=True)
session.fetch(
f"{BASE_URL}/", page_action=check_login, network_idle=True
)
if not logged_in:
print(json.dumps({"status": "logging_in"}), flush=True)
session.fetch(f"{BASE_URL}/login", page_action=login_action)
if login_error:
return LoginResult(
success=False, error=f"Login failed: {login_error}"
)
session.fetch(
f"{BASE_URL}/", page_action=check_login, network_idle=True
)
if not logged_in:
return LoginResult(
success=False, error="Login failed (bad credentials?)"
)
try:
browser_cookies = session.context.cookies()
if browser_cookies:
_COOKIE_PATH.write_text(json.dumps(browser_cookies))
except Exception:
pass
return LoginResult(success=True, error="")
except Exception as e:
return LoginResult(success=False, error=str(e))
def _submit_headless_codechef(
contest_id: str,
problem_id: str,
file_path: str,
language_id: str,
credentials: dict[str, str],
_retried: bool = False,
) -> SubmitResult:
source_code = Path(file_path).read_text()
try:
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
except ImportError:
return SubmitResult(
success=False,
error="scrapling is required for CodeChef submit",
)
from .atcoder import _ensure_browser
_ensure_browser()
_COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True)
saved_cookies: list[dict[str, Any]] = []
if _COOKIE_PATH.exists() and not _retried:
try:
saved_cookies = json.loads(_COOKIE_PATH.read_text())
except Exception:
pass
logged_in = bool(saved_cookies) and not _retried
login_error: str | None = None
submit_error: str | None = None
needs_relogin = False
def check_login(page):
nonlocal logged_in
logged_in = page.evaluate(_CC_CHECK_LOGIN_JS)
def login_action(page):
nonlocal login_error
try:
page.locator('input[type="email"], input[name="email"]').first.fill(
credentials.get("username", "")
)
page.locator('input[type="password"], input[name="password"]').first.fill(
credentials.get("password", "")
)
page.locator('button[type="submit"]').first.click()
page.wait_for_url(
lambda url: "/login" not in url, timeout=BROWSER_NAV_TIMEOUT
)
except Exception as e:
login_error = str(e)
def submit_action(page):
nonlocal submit_error, needs_relogin
if "/login" in page.url:
needs_relogin = True
return
try:
selected = False
selects = page.locator("select")
for i in range(selects.count()):
try:
sel = selects.nth(i)
opts = sel.locator("option").all_inner_texts()
match = next(
(o for o in opts if language_id.lower() in o.lower()), None
)
if match:
sel.select_option(label=match)
selected = True
break
except Exception:
pass
if not selected:
lang_trigger = page.locator(
'[class*="language"] button, [data-testid*="language"] button'
).first
lang_trigger.click()
page.wait_for_timeout(500)
page.locator(
f'[role="option"]:has-text("{language_id}"), '
f'li:has-text("{language_id}")'
).first.click()
page.evaluate(
"""(code) => {
if (typeof monaco !== 'undefined') {
const models = monaco.editor.getModels();
if (models.length > 0) { models[0].setValue(code); return; }
}
const cm = document.querySelector('.CodeMirror');
if (cm && cm.CodeMirror) { cm.CodeMirror.setValue(code); return; }
const ta = document.querySelector('textarea');
if (ta) { ta.value = code; ta.dispatchEvent(new Event('input', {bubbles: true})); }
}""",
source_code,
)
page.locator(
'button[type="submit"]:has-text("Submit"), button:has-text("Submit Code")'
).first.click()
page.wait_for_url(
lambda url: "/submit/" not in url or "submission" in url,
timeout=BROWSER_NAV_TIMEOUT * 2,
)
except Exception as e:
submit_error = str(e)
try:
with StealthySession(
headless=True,
timeout=BROWSER_SESSION_TIMEOUT,
google_search=False,
cookies=saved_cookies if (saved_cookies and not _retried) else [],
) as session:
if not logged_in:
print(json.dumps({"status": "checking_login"}), flush=True)
session.fetch(
f"{BASE_URL}/", page_action=check_login, network_idle=True
)
if not logged_in:
print(json.dumps({"status": "logging_in"}), flush=True)
session.fetch(f"{BASE_URL}/login", page_action=login_action)
if login_error:
return SubmitResult(
success=False, error=f"Login failed: {login_error}"
)
print(json.dumps({"status": "submitting"}), flush=True)
session.fetch(
f"{BASE_URL}/{contest_id}/submit/{problem_id}",
page_action=submit_action,
)
try:
browser_cookies = session.context.cookies()
if browser_cookies and logged_in:
_COOKIE_PATH.write_text(json.dumps(browser_cookies))
except Exception:
pass
if needs_relogin and not _retried:
_COOKIE_PATH.unlink(missing_ok=True)
return _submit_headless_codechef(
contest_id,
problem_id,
file_path,
language_id,
credentials,
_retried=True,
)
if submit_error:
return SubmitResult(success=False, error=submit_error)
return SubmitResult(
success=True, error="", submission_id="", verdict="submitted"
)
except Exception as e:
return SubmitResult(success=False, error=str(e))
class CodeChefScraper(BaseScraper):
@ -218,11 +488,13 @@ class CodeChefScraper(BaseScraper):
)
memory_mb = _extract_memory_limit(html)
interactive = False
precision = extract_precision(html)
except Exception:
tests = []
timeout_ms = 1000
memory_mb = 256.0
interactive = False
precision = None
combined_input = "\n".join(t.input for t in tests) if tests else ""
combined_expected = (
"\n".join(t.expected for t in tests) if tests else ""
@ -240,6 +512,7 @@ class CodeChefScraper(BaseScraper):
"memory_mb": memory_mb,
"interactive": interactive,
"multi_test": False,
"precision": precision,
}
tasks = [run_one(problem_code) for problem_code in problems.keys()]
@ -247,6 +520,30 @@ class CodeChefScraper(BaseScraper):
payload = await coro
print(json.dumps(payload), flush=True)
async def submit(
self,
contest_id: str,
problem_id: str,
file_path: str,
language_id: str,
credentials: dict[str, str],
) -> SubmitResult:
if not credentials.get("username") or not credentials.get("password"):
return self._submit_error("Missing credentials. Use :CP codechef login")
return await asyncio.to_thread(
_submit_headless_codechef,
contest_id,
problem_id,
file_path,
language_id,
credentials,
)
async def login(self, credentials: dict[str, str]) -> LoginResult:
if not credentials.get("username") or not credentials.get("password"):
return self._login_error("Missing username or password")
return await asyncio.to_thread(_login_headless_codechef, credentials)
if __name__ == "__main__":
CodeChefScraper().run_cli()

View file

@ -2,30 +2,31 @@
import asyncio
import json
import logging
import re
from typing import Any
import requests
from bs4 import BeautifulSoup, Tag
from scrapling.fetchers import Fetcher
from .base import BaseScraper
from .base import BaseScraper, extract_precision
from .models import (
ContestListResult,
ContestSummary,
LoginResult,
MetadataResult,
ProblemSummary,
SubmitResult,
TestCase,
)
# suppress scrapling logging - https://github.com/D4Vinci/Scrapling/issues/31)
logging.getLogger("scrapling").setLevel(logging.CRITICAL)
from .timeouts import (
BROWSER_NAV_TIMEOUT,
BROWSER_SESSION_TIMEOUT,
BROWSER_SUBMIT_NAV_TIMEOUT,
HTTP_TIMEOUT,
)
BASE_URL = "https://codeforces.com"
API_CONTEST_LIST_URL = f"{BASE_URL}/api/contest.list"
TIMEOUT_SECONDS = 30
HEADERS = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
}
@ -83,7 +84,7 @@ def _extract_title(block: Tag) -> tuple[str, str]:
def _extract_samples(block: Tag) -> tuple[list[TestCase], bool]:
st = block.find("div", class_="sample-test")
if not st:
if not isinstance(st, Tag):
return [], False
input_pres: list[Tag] = [
@ -139,11 +140,30 @@ def _is_interactive(block: Tag) -> bool:
def _fetch_problems_html(contest_id: str) -> str:
try:
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
except ImportError:
raise RuntimeError("scrapling is required for Codeforces metadata")
from .atcoder import _ensure_browser
_ensure_browser()
url = f"{BASE_URL}/contest/{contest_id}/problems"
page = Fetcher.get(
url,
)
return page.html_content
html = ""
def page_action(page):
nonlocal html
html = page.content()
with StealthySession(
headless=True,
timeout=BROWSER_SESSION_TIMEOUT,
google_search=False,
) as session:
session.fetch(url, page_action=page_action, solve_cloudflare=True)
return html
def _parse_all_blocks(html: str) -> list[dict[str, Any]]:
@ -159,6 +179,7 @@ def _parse_all_blocks(html: str) -> list[dict[str, Any]]:
raw_samples, is_grouped = _extract_samples(b)
timeout_ms, memory_mb = _extract_limits(b)
interactive = _is_interactive(b)
precision = extract_precision(b.get_text(" ", strip=True))
if is_grouped and raw_samples:
combined_input = f"{len(raw_samples)}\n" + "\n".join(
@ -185,6 +206,7 @@ def _parse_all_blocks(html: str) -> list[dict[str, Any]]:
"memory_mb": memory_mb,
"interactive": interactive,
"multi_test": is_grouped,
"precision": precision,
}
)
return out
@ -220,13 +242,15 @@ class CodeforcesScraper(BaseScraper):
contest_id=contest_id,
problems=problems,
url=f"https://codeforces.com/contest/{contest_id}/problem/%s",
contest_url=f"https://codeforces.com/contest/{contest_id}",
standings_url=f"https://codeforces.com/contest/{contest_id}/standings",
)
except Exception as e:
return self._metadata_error(str(e))
async def scrape_contest_list(self) -> ContestListResult:
try:
r = requests.get(API_CONTEST_LIST_URL, timeout=TIMEOUT_SECONDS)
r = requests.get(API_CONTEST_LIST_URL, timeout=HTTP_TIMEOUT)
r.raise_for_status()
data = r.json()
if data.get("status") != "OK":
@ -234,11 +258,20 @@ class CodeforcesScraper(BaseScraper):
contests: list[ContestSummary] = []
for c in data["result"]:
if c.get("phase") != "FINISHED":
phase = c.get("phase")
if phase not in ("FINISHED", "BEFORE", "CODING"):
continue
cid = str(c["id"])
name = c["name"]
contests.append(ContestSummary(id=cid, name=name, display_name=name))
start_time = c.get("startTimeSeconds") if phase != "FINISHED" else None
contests.append(
ContestSummary(
id=cid,
name=name,
display_name=name,
start_time=start_time,
)
)
if not contests:
return self._contests_error("No contests found")
@ -269,11 +302,301 @@ class CodeforcesScraper(BaseScraper):
"memory_mb": b.get("memory_mb", 0),
"interactive": bool(b.get("interactive")),
"multi_test": bool(b.get("multi_test", False)),
"precision": b.get("precision"),
}
),
flush=True,
)
async def submit(
self,
contest_id: str,
problem_id: str,
file_path: str,
language_id: str,
credentials: dict[str, str],
) -> SubmitResult:
return await asyncio.to_thread(
_submit_headless,
contest_id,
problem_id,
file_path,
language_id,
credentials,
)
async def login(self, credentials: dict[str, str]) -> LoginResult:
if not credentials.get("username") or not credentials.get("password"):
return self._login_error("Missing username or password")
return await asyncio.to_thread(_login_headless_cf, credentials)
def _login_headless_cf(credentials: dict[str, str]) -> LoginResult:
from pathlib import Path
try:
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
except ImportError:
return LoginResult(
success=False,
error="scrapling is required for Codeforces login",
)
from .atcoder import _ensure_browser
_ensure_browser()
cookie_cache = Path.home() / ".cache" / "cp-nvim" / "codeforces-cookies.json"
cookie_cache.parent.mkdir(parents=True, exist_ok=True)
saved_cookies: list[dict[str, Any]] = []
if cookie_cache.exists():
try:
saved_cookies = json.loads(cookie_cache.read_text())
except Exception:
pass
logged_in = False
login_error: str | None = None
def check_login(page):
nonlocal logged_in
logged_in = page.evaluate(
"() => Array.from(document.querySelectorAll('a'))"
".some(a => a.textContent.includes('Logout'))"
)
def login_action(page):
nonlocal login_error
try:
page.fill(
'input[name="handleOrEmail"]',
credentials.get("username", ""),
)
page.fill(
'input[name="password"]',
credentials.get("password", ""),
)
page.locator('#enterForm input[type="submit"]').click()
page.wait_for_url(
lambda url: "/enter" not in url, timeout=BROWSER_NAV_TIMEOUT
)
except Exception as e:
login_error = str(e)
try:
with StealthySession(
headless=True,
timeout=BROWSER_SESSION_TIMEOUT,
google_search=False,
cookies=saved_cookies if saved_cookies else [],
) as session:
if saved_cookies:
print(json.dumps({"status": "checking_login"}), flush=True)
session.fetch(
f"{BASE_URL}/",
page_action=check_login,
network_idle=True,
)
if not logged_in:
print(json.dumps({"status": "logging_in"}), flush=True)
session.fetch(
f"{BASE_URL}/enter",
page_action=login_action,
solve_cloudflare=True,
)
if login_error:
return LoginResult(
success=False, error=f"Login failed: {login_error}"
)
session.fetch(
f"{BASE_URL}/",
page_action=check_login,
network_idle=True,
)
if not logged_in:
return LoginResult(
success=False, error="Login failed (bad credentials?)"
)
try:
browser_cookies = session.context.cookies()
if any(c.get("name") == "X-User-Handle" for c in browser_cookies):
cookie_cache.write_text(json.dumps(browser_cookies))
except Exception:
pass
return LoginResult(success=True, error="")
except Exception as e:
return LoginResult(success=False, error=str(e))
def _submit_headless(
contest_id: str,
problem_id: str,
file_path: str,
language_id: str,
credentials: dict[str, str],
_retried: bool = False,
) -> SubmitResult:
from pathlib import Path
source_code = Path(file_path).read_text()
try:
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
except ImportError:
return SubmitResult(
success=False,
error="scrapling is required for Codeforces submit",
)
from .atcoder import _ensure_browser, _solve_turnstile
_ensure_browser()
cookie_cache = Path.home() / ".cache" / "cp-nvim" / "codeforces-cookies.json"
cookie_cache.parent.mkdir(parents=True, exist_ok=True)
saved_cookies: list[dict[str, Any]] = []
if cookie_cache.exists():
try:
saved_cookies = json.loads(cookie_cache.read_text())
except Exception:
pass
logged_in = cookie_cache.exists() and not _retried
login_error: str | None = None
submit_error: str | None = None
needs_relogin = False
def check_login(page):
nonlocal logged_in
logged_in = page.evaluate(
"() => Array.from(document.querySelectorAll('a'))"
".some(a => a.textContent.includes('Logout'))"
)
def login_action(page):
nonlocal login_error
try:
page.fill(
'input[name="handleOrEmail"]',
credentials.get("username", ""),
)
page.fill(
'input[name="password"]',
credentials.get("password", ""),
)
page.locator('#enterForm input[type="submit"]').click()
page.wait_for_url(
lambda url: "/enter" not in url, timeout=BROWSER_NAV_TIMEOUT
)
except Exception as e:
login_error = str(e)
def submit_action(page):
nonlocal submit_error, needs_relogin
if "/enter" in page.url or "/login" in page.url:
needs_relogin = True
return
_solve_turnstile(page)
try:
page.select_option(
'select[name="submittedProblemIndex"]',
problem_id.upper(),
)
page.select_option('select[name="programTypeId"]', language_id)
page.evaluate(
"""(code) => {
const cm = document.querySelector('.CodeMirror');
if (cm && cm.CodeMirror) {
cm.CodeMirror.setValue(code);
}
const ta = document.querySelector('textarea[name="source"]');
if (ta) ta.value = code;
}""",
source_code,
)
page.locator("form.submit-form input.submit").click(no_wait_after=True)
try:
page.wait_for_url(
lambda url: "/my" in url or "/status" in url,
timeout=BROWSER_SUBMIT_NAV_TIMEOUT["codeforces"],
)
except Exception:
err_el = page.query_selector("span.error")
if err_el:
submit_error = err_el.inner_text().strip()
else:
submit_error = "Submit failed: page did not navigate"
except Exception as e:
submit_error = str(e)
try:
with StealthySession(
headless=True,
timeout=BROWSER_SESSION_TIMEOUT,
google_search=False,
cookies=saved_cookies if (cookie_cache.exists() and not _retried) else [],
) as session:
if not (cookie_cache.exists() and not _retried):
print(json.dumps({"status": "checking_login"}), flush=True)
session.fetch(
f"{BASE_URL}/",
page_action=check_login,
network_idle=True,
)
if not logged_in:
print(json.dumps({"status": "logging_in"}), flush=True)
session.fetch(
f"{BASE_URL}/enter",
page_action=login_action,
solve_cloudflare=True,
)
if login_error:
return SubmitResult(
success=False, error=f"Login failed: {login_error}"
)
print(json.dumps({"status": "submitting"}), flush=True)
session.fetch(
f"{BASE_URL}/contest/{contest_id}/submit",
page_action=submit_action,
solve_cloudflare=False,
)
try:
browser_cookies = session.context.cookies()
if any(c.get("name") == "X-User-Handle" for c in browser_cookies):
cookie_cache.write_text(json.dumps(browser_cookies))
except Exception:
pass
if needs_relogin and not _retried:
cookie_cache.unlink(missing_ok=True)
return _submit_headless(
contest_id,
problem_id,
file_path,
language_id,
credentials,
_retried=True,
)
if submit_error:
return SubmitResult(success=False, error=submit_error)
return SubmitResult(
success=True,
error="",
submission_id="",
verdict="submitted",
)
except Exception as e:
return SubmitResult(success=False, error=str(e))
if __name__ == "__main__":
CodeforcesScraper().run_cli()

View file

@ -1,30 +1,45 @@
#!/usr/bin/env python3
import asyncio
import base64
import json
import re
from typing import Any
import httpx
from .base import BaseScraper
from .base import BaseScraper, extract_precision
from .timeouts import HTTP_TIMEOUT, SUBMIT_POLL_TIMEOUT
from .models import (
ContestListResult,
ContestSummary,
LoginResult,
MetadataResult,
ProblemSummary,
SubmitResult,
TestCase,
)
BASE_URL = "https://cses.fi"
API_URL = "https://cses.fi/api"
SUBMIT_SCOPE = "courses/problemset"
INDEX_PATH = "/problemset"
TASK_PATH = "/problemset/task/{id}"
HEADERS = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
TIMEOUT_S = 15.0
CONNECTIONS = 8
CSES_LANGUAGES: dict[str, dict[str, str]] = {
"C++17": {"name": "C++", "option": "C++17"},
"Python3": {"name": "Python", "option": "CPython3"},
}
EXTENSIONS: dict[str, str] = {
"C++17": "cpp",
"Python3": "py",
}
def normalize_category_name(category_name: str) -> str:
return category_name.lower().replace(" ", "_").replace("&", "and")
@ -64,7 +79,7 @@ def snake_to_title(name: str) -> str:
async def fetch_text(client: httpx.AsyncClient, path: str) -> str:
r = await client.get(BASE_URL + path, headers=HEADERS, timeout=TIMEOUT_S)
r = await client.get(BASE_URL + path, headers=HEADERS, timeout=HTTP_TIMEOUT)
r.raise_for_status()
return r.text
@ -129,17 +144,21 @@ def parse_category_problems(category_id: str, html: str) -> list[ProblemSummary]
return []
def _extract_problem_info(html: str) -> tuple[int, int, bool]:
def _extract_problem_info(html: str) -> tuple[int, int, bool, float | None]:
tm = TIME_RE.search(html)
mm = MEM_RE.search(html)
t = int(round(float(tm.group(1)) * 1000)) if tm else 0
m = int(mm.group(1)) if mm else 0
md = MD_BLOCK_RE.search(html)
interactive = False
precision = None
if md:
body = md.group(1)
interactive = "This is an interactive problem." in body
return t, m, interactive
from bs4 import BeautifulSoup
precision = extract_precision(BeautifulSoup(body, "html.parser").get_text(" "))
return t, m, interactive, precision
def parse_title(html: str) -> str:
@ -199,6 +218,8 @@ class CSESScraper(BaseScraper):
contest_id=contest_id,
problems=problems,
url="https://cses.fi/problemset/task/%s",
contest_url="https://cses.fi/problemset",
standings_url="",
)
async def scrape_contest_list(self) -> ContestListResult:
@ -211,6 +232,43 @@ class CSESScraper(BaseScraper):
)
return ContestListResult(success=True, error="", contests=cats)
async def login(self, credentials: dict[str, str]) -> LoginResult:
username = credentials.get("username", "")
password = credentials.get("password", "")
if not username or not password:
return self._login_error("Missing username or password")
async with httpx.AsyncClient(follow_redirects=True) as client:
token = credentials.get("token")
if token:
print(json.dumps({"status": "checking_login"}), flush=True)
if await self._check_token(client, token):
return LoginResult(
success=True,
error="",
credentials={
"username": username,
"password": password,
"token": token,
},
)
print(json.dumps({"status": "logging_in"}), flush=True)
token = await self._web_login(client, username, password)
if not token:
return self._login_error("Login failed (bad credentials?)")
return LoginResult(
success=True,
error="",
credentials={
"username": username,
"password": password,
"token": token,
},
)
async def stream_tests_for_category_async(self, category_id: str) -> None:
async with httpx.AsyncClient(
limits=httpx.Limits(max_connections=CONNECTIONS)
@ -227,10 +285,17 @@ class CSESScraper(BaseScraper):
try:
html = await fetch_text(client, task_path(pid))
tests = parse_tests(html)
timeout_ms, memory_mb, interactive = _extract_problem_info(html)
timeout_ms, memory_mb, interactive, precision = (
_extract_problem_info(html)
)
except Exception:
tests = []
timeout_ms, memory_mb, interactive = 0, 0, False
timeout_ms, memory_mb, interactive, precision = (
0,
0,
False,
None,
)
combined_input = "\n".join(t.input for t in tests) if tests else ""
combined_expected = (
@ -250,6 +315,7 @@ class CSESScraper(BaseScraper):
"memory_mb": memory_mb,
"interactive": interactive,
"multi_test": False,
"precision": precision,
}
tasks = [run_one(p.id) for p in problems]
@ -257,6 +323,188 @@ class CSESScraper(BaseScraper):
payload = await coro
print(json.dumps(payload), flush=True)
async def _web_login(
self,
client: httpx.AsyncClient,
username: str,
password: str,
) -> str | None:
login_page = await client.get(
f"{BASE_URL}/login", headers=HEADERS, timeout=HTTP_TIMEOUT
)
csrf_match = re.search(r'name="csrf_token" value="([^"]+)"', login_page.text)
if not csrf_match:
return None
login_resp = await client.post(
f"{BASE_URL}/login",
data={
"csrf_token": csrf_match.group(1),
"nick": username,
"pass": password,
},
headers=HEADERS,
timeout=HTTP_TIMEOUT,
)
if "Invalid username or password" in login_resp.text:
return None
api_resp = await client.post(
f"{API_URL}/login", headers=HEADERS, timeout=HTTP_TIMEOUT
)
api_data = api_resp.json()
token: str | None = api_data.get("X-Auth-Token")
auth_url: str | None = api_data.get("authentication_url")
if not token:
raise RuntimeError("CSES API login response missing 'X-Auth-Token'")
if not auth_url:
raise RuntimeError("CSES API login response missing 'authentication_url'")
auth_page = await client.get(auth_url, headers=HEADERS, timeout=HTTP_TIMEOUT)
auth_csrf = re.search(r'name="csrf_token" value="([^"]+)"', auth_page.text)
form_token = re.search(r'name="token" value="([^"]+)"', auth_page.text)
if not auth_csrf or not form_token:
return None
await client.post(
auth_url,
data={
"csrf_token": auth_csrf.group(1),
"token": form_token.group(1),
},
headers=HEADERS,
timeout=HTTP_TIMEOUT,
)
check = await client.get(
f"{API_URL}/login",
headers={"X-Auth-Token": token, **HEADERS},
timeout=HTTP_TIMEOUT,
)
if check.status_code != 200:
return None
return token
async def _check_token(self, client: httpx.AsyncClient, token: str) -> bool:
try:
r = await client.get(
f"{API_URL}/login",
headers={"X-Auth-Token": token, **HEADERS},
timeout=HTTP_TIMEOUT,
)
return r.status_code == 200
except (httpx.ConnectError, httpx.TimeoutException, httpx.NetworkError):
raise
async def submit(
self,
contest_id: str,
problem_id: str,
file_path: str,
language_id: str,
credentials: dict[str, str],
) -> SubmitResult:
from pathlib import Path
source_code = Path(file_path).read_text()
username = credentials.get("username", "")
password = credentials.get("password", "")
if not username or not password:
return self._submit_error("Missing credentials. Use :CP login cses")
async with httpx.AsyncClient(follow_redirects=True) as client:
token = credentials.get("token")
if token:
print(json.dumps({"status": "checking_login"}), flush=True)
if not await self._check_token(client, token):
token = None
if not token:
print(json.dumps({"status": "logging_in"}), flush=True)
token = await self._web_login(client, username, password)
if not token:
return self._submit_error("Login failed (bad credentials?)")
print(
json.dumps(
{
"credentials": {
"username": username,
"password": password,
"token": token,
}
}
),
flush=True,
)
print(json.dumps({"status": "submitting"}), flush=True)
ext = EXTENSIONS.get(language_id, "cpp")
lang = CSES_LANGUAGES.get(language_id, {})
content_b64 = base64.b64encode(source_code.encode()).decode()
payload: dict[str, Any] = {
"language": lang,
"filename": f"{problem_id}.{ext}",
"content": content_b64,
}
r = await client.post(
f"{API_URL}/{SUBMIT_SCOPE}/submissions",
json=payload,
params={"task": problem_id},
headers={
"X-Auth-Token": token,
"Content-Type": "application/json",
**HEADERS,
},
timeout=HTTP_TIMEOUT,
)
if r.status_code not in range(200, 300):
try:
err = r.json().get("message", r.text)
except Exception:
err = r.text
return self._submit_error(f"Submit request failed: {err}")
info = r.json()
submission_id = str(info.get("id", ""))
for _ in range(60):
await asyncio.sleep(2)
try:
r = await client.get(
f"{API_URL}/{SUBMIT_SCOPE}/submissions/{submission_id}",
params={"poll": "true"},
headers={
"X-Auth-Token": token,
**HEADERS,
},
timeout=SUBMIT_POLL_TIMEOUT,
)
if r.status_code == 200:
info = r.json()
if not info.get("pending", True):
verdict = info.get("result", "unknown")
return SubmitResult(
success=True,
error="",
submission_id=submission_id,
verdict=verdict,
)
except Exception:
pass
return SubmitResult(
success=True,
error="",
submission_id=submission_id,
verdict="submitted (poll timed out)",
)
if __name__ == "__main__":
CSESScraper().run_cli()

403
scrapers/kattis.py Normal file
View file

@ -0,0 +1,403 @@
#!/usr/bin/env python3
import asyncio
import io
import json
import re
import zipfile
from datetime import datetime
from pathlib import Path
import httpx
from .base import BaseScraper
from .timeouts import HTTP_TIMEOUT
from .models import (
ContestListResult,
ContestSummary,
LoginResult,
MetadataResult,
ProblemSummary,
SubmitResult,
TestCase,
)
BASE_URL = "https://open.kattis.com"
HEADERS = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
CONNECTIONS = 8
_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "kattis-cookies.json"
TIME_RE = re.compile(
r"CPU Time limit</span>\s*<span[^>]*>\s*(\d+)\s*seconds?\s*</span>",
re.DOTALL,
)
MEM_RE = re.compile(
r"Memory limit</span>\s*<span[^>]*>\s*(\d+)\s*MB\s*</span>",
re.DOTALL,
)
async def _fetch_text(client: httpx.AsyncClient, url: str) -> str:
r = await client.get(url, headers=HEADERS, timeout=HTTP_TIMEOUT)
r.raise_for_status()
return r.text
async def _fetch_bytes(client: httpx.AsyncClient, url: str) -> bytes:
r = await client.get(url, headers=HEADERS, timeout=HTTP_TIMEOUT)
r.raise_for_status()
return r.content
def _parse_limits(html: str) -> tuple[int, int]:
tm = TIME_RE.search(html)
mm = MEM_RE.search(html)
timeout_ms = int(tm.group(1)) * 1000 if tm else 1000
memory_mb = int(mm.group(1)) if mm else 1024
return timeout_ms, memory_mb
def _parse_samples_html(html: str) -> list[TestCase]:
tests: list[TestCase] = []
tables = re.finditer(r'<table\s+class="sample"[^>]*>.*?</table>', html, re.DOTALL)
for table_match in tables:
table_html = table_match.group(0)
pres = re.findall(r"<pre>(.*?)</pre>", table_html, re.DOTALL)
if len(pres) >= 2:
inp = pres[0].strip()
out = pres[1].strip()
tests.append(TestCase(input=inp, expected=out))
return tests
def _parse_samples_zip(data: bytes) -> list[TestCase]:
try:
zf = zipfile.ZipFile(io.BytesIO(data))
except zipfile.BadZipFile:
return []
inputs: dict[str, str] = {}
outputs: dict[str, str] = {}
for name in zf.namelist():
content = zf.read(name).decode("utf-8").strip()
if name.endswith(".in"):
key = name[: -len(".in")]
inputs[key] = content
elif name.endswith(".ans"):
key = name[: -len(".ans")]
outputs[key] = content
tests: list[TestCase] = []
for key in sorted(set(inputs) & set(outputs)):
tests.append(TestCase(input=inputs[key], expected=outputs[key]))
return tests
def _is_interactive(html: str) -> bool:
return "This is an interactive problem" in html
def _parse_contests_page(html: str) -> list[ContestSummary]:
results: list[ContestSummary] = []
seen: set[str] = set()
for row_m in re.finditer(r"<tr[^>]*>(.*?)</tr>", html, re.DOTALL):
row = row_m.group(1)
link_m = re.search(r'href="/contests/([a-z0-9]+)"[^>]*>([^<]+)</a>', row)
if not link_m:
continue
cid = link_m.group(1)
name = link_m.group(2).strip()
if cid in seen:
continue
seen.add(cid)
start_time: int | None = None
ts_m = re.search(r'data-timestamp="(\d+)"', row)
if ts_m:
start_time = int(ts_m.group(1))
else:
time_m = re.search(r'<time[^>]+datetime="([^"]+)"', row)
if time_m:
try:
dt = datetime.fromisoformat(time_m.group(1).replace("Z", "+00:00"))
start_time = int(dt.timestamp())
except Exception:
pass
results.append(ContestSummary(id=cid, name=name, start_time=start_time))
return results
def _parse_contest_problem_list(html: str) -> list[tuple[str, str]]:
if "The problems will become available when the contest starts" in html:
return []
results: list[tuple[str, str]] = []
seen: set[str] = set()
for row_m in re.finditer(r"<tr[^>]*>(.*?)</tr>", html, re.DOTALL):
row = row_m.group(1)
link_m = re.search(
r'href="/contests/[^/]+/problems/([^"]+)"[^>]*>([^<]+)</a>', row
)
if not link_m:
continue
slug = link_m.group(1)
name = link_m.group(2).strip()
if slug in seen:
continue
seen.add(slug)
label_m = re.search(r"<td[^>]*>\s*([A-Z])\s*</td>", row)
label = label_m.group(1) if label_m else ""
display = f"{label} - {name}" if label else name
results.append((slug, display))
return results
async def _fetch_contest_slugs(
client: httpx.AsyncClient, contest_id: str
) -> list[tuple[str, str]]:
try:
html = await _fetch_text(client, f"{BASE_URL}/contests/{contest_id}/problems")
return _parse_contest_problem_list(html)
except httpx.HTTPStatusError:
return []
except Exception:
return []
async def _stream_single_problem(client: httpx.AsyncClient, slug: str) -> None:
try:
html = await _fetch_text(client, f"{BASE_URL}/problems/{slug}")
except Exception:
return
timeout_ms, memory_mb = _parse_limits(html)
interactive = _is_interactive(html)
tests: list[TestCase] = []
try:
zip_data = await _fetch_bytes(
client,
f"{BASE_URL}/problems/{slug}/file/statement/samples.zip",
)
tests = _parse_samples_zip(zip_data)
except Exception:
tests = _parse_samples_html(html)
combined_input = "\n".join(t.input for t in tests) if tests else ""
combined_expected = "\n".join(t.expected for t in tests) if tests else ""
print(
json.dumps(
{
"problem_id": slug,
"combined": {
"input": combined_input,
"expected": combined_expected,
},
"tests": [{"input": t.input, "expected": t.expected} for t in tests],
"timeout_ms": timeout_ms,
"memory_mb": memory_mb,
"interactive": interactive,
"multi_test": False,
}
),
flush=True,
)
async def _load_kattis_cookies(client: httpx.AsyncClient) -> None:
if not _COOKIE_PATH.exists():
return
try:
for k, v in json.loads(_COOKIE_PATH.read_text()).items():
client.cookies.set(k, v)
except Exception:
pass
async def _save_kattis_cookies(client: httpx.AsyncClient) -> None:
cookies = {k: v for k, v in client.cookies.items()}
if cookies:
_COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True)
_COOKIE_PATH.write_text(json.dumps(cookies))
async def _check_kattis_login(client: httpx.AsyncClient) -> bool:
try:
r = await client.get(BASE_URL + "/", headers=HEADERS, timeout=HTTP_TIMEOUT)
text = r.text.lower()
return "sign out" in text or "logout" in text or "my profile" in text
except Exception:
return False
async def _do_kattis_login(
client: httpx.AsyncClient, username: str, password: str
) -> bool:
r = await client.post(
f"{BASE_URL}/login/email",
data={"user": username, "password": password, "script": "true"},
headers=HEADERS,
timeout=HTTP_TIMEOUT,
)
return r.status_code == 200 and "login failed" not in r.text.lower()
class KattisScraper(BaseScraper):
@property
def platform_name(self) -> str:
return "kattis"
async def scrape_contest_metadata(self, contest_id: str) -> MetadataResult:
try:
async with httpx.AsyncClient() as client:
slugs = await _fetch_contest_slugs(client, contest_id)
if slugs:
return MetadataResult(
success=True,
error="",
contest_id=contest_id,
problems=[
ProblemSummary(id=slug, name=name) for slug, name in slugs
],
url=f"{BASE_URL}/problems/%s",
)
try:
html = await _fetch_text(
client, f"{BASE_URL}/problems/{contest_id}"
)
except Exception as e:
return self._metadata_error(str(e))
title_m = re.search(r"<title>([^<]+)</title>", html)
name = (
title_m.group(1).split("\u2013")[0].strip()
if title_m
else contest_id
)
return MetadataResult(
success=True,
error="",
contest_id=contest_id,
problems=[ProblemSummary(id=contest_id, name=name)],
url=f"{BASE_URL}/problems/%s",
)
except Exception as e:
return self._metadata_error(str(e))
async def scrape_contest_list(self) -> ContestListResult:
try:
async with httpx.AsyncClient() as client:
html = await _fetch_text(
client,
f"{BASE_URL}/contests?kattis_original=on&kattis_recycled=off&user_created=off",
)
contests = _parse_contests_page(html)
if not contests:
return self._contests_error("No contests found")
return ContestListResult(success=True, error="", contests=contests)
except Exception as e:
return self._contests_error(str(e))
async def stream_tests_for_category_async(self, category_id: str) -> None:
async with httpx.AsyncClient(
limits=httpx.Limits(max_connections=CONNECTIONS)
) as client:
slugs = await _fetch_contest_slugs(client, category_id)
if slugs:
sem = asyncio.Semaphore(CONNECTIONS)
async def emit_one(slug: str, _name: str) -> None:
async with sem:
await _stream_single_problem(client, slug)
await asyncio.gather(*(emit_one(s, n) for s, n in slugs))
return
await _stream_single_problem(client, category_id)
async def submit(
self,
contest_id: str,
problem_id: str,
file_path: str,
language_id: str,
credentials: dict[str, str],
) -> SubmitResult:
source = Path(file_path).read_bytes()
username = credentials.get("username", "")
password = credentials.get("password", "")
if not username or not password:
return self._submit_error("Missing credentials. Use :CP kattis login")
async with httpx.AsyncClient(follow_redirects=True) as client:
await _load_kattis_cookies(client)
print(json.dumps({"status": "checking_login"}), flush=True)
logged_in = bool(client.cookies) and await _check_kattis_login(client)
if not logged_in:
print(json.dumps({"status": "logging_in"}), flush=True)
ok = await _do_kattis_login(client, username, password)
if not ok:
return self._submit_error("Login failed (bad credentials?)")
await _save_kattis_cookies(client)
print(json.dumps({"status": "submitting"}), flush=True)
ext = "py" if "python" in language_id.lower() else "cpp"
data: dict[str, str] = {
"submit": "true",
"script": "true",
"language": language_id,
"problem": problem_id,
"mainclass": "",
"submit_ctr": "2",
}
if contest_id != problem_id:
data["contest"] = contest_id
try:
r = await client.post(
f"{BASE_URL}/submit",
data=data,
files={"sub_file[]": (f"solution.{ext}", source, "text/plain")},
headers=HEADERS,
timeout=HTTP_TIMEOUT,
)
r.raise_for_status()
except Exception as e:
return self._submit_error(f"Submit request failed: {e}")
sid_m = re.search(r"Submission ID:\s*(\d+)", r.text, re.IGNORECASE)
sid = sid_m.group(1) if sid_m else ""
return SubmitResult(
success=True, error="", submission_id=sid, verdict="submitted"
)
async def login(self, credentials: dict[str, str]) -> LoginResult:
username = credentials.get("username", "")
password = credentials.get("password", "")
if not username or not password:
return self._login_error("Missing username or password")
async with httpx.AsyncClient(follow_redirects=True) as client:
await _load_kattis_cookies(client)
if client.cookies:
print(json.dumps({"status": "checking_login"}), flush=True)
if await _check_kattis_login(client):
return LoginResult(
success=True,
error="",
credentials={"username": username, "password": password},
)
print(json.dumps({"status": "logging_in"}), flush=True)
ok = await _do_kattis_login(client, username, password)
if not ok:
return self._login_error("Login failed (bad credentials?)")
await _save_kattis_cookies(client)
return LoginResult(
success=True,
error="",
credentials={"username": username, "password": password},
)
if __name__ == "__main__":
KattisScraper().run_cli()

30
scrapers/language_ids.py Normal file
View file

@ -0,0 +1,30 @@
LANGUAGE_IDS = {
"atcoder": {
"cpp": "6017",
"python": "6082",
},
"codeforces": {
"cpp": "89",
"python": "70",
},
"cses": {
"cpp": "C++17",
"python": "Python3",
},
"usaco": {
"cpp": "cpp",
"python": "python",
},
"kattis": {
"cpp": "C++17",
"python": "Python 3",
},
"codechef": {
"cpp": "C++ 17",
"python": "Python 3",
},
}
def get_language_id(platform: str, language: str) -> str | None:
return LANGUAGE_IDS.get(platform, {}).get(language)

View file

@ -26,6 +26,7 @@ class ContestSummary(BaseModel):
id: str
name: str
display_name: str | None = None
start_time: int | None = None
model_config = ConfigDict(extra="forbid")
@ -41,6 +42,8 @@ class MetadataResult(ScrapingResult):
contest_id: str = ""
problems: list[ProblemSummary] = Field(default_factory=list)
url: str
contest_url: str = ""
standings_url: str = ""
model_config = ConfigDict(extra="forbid")
@ -63,6 +66,19 @@ class TestsResult(ScrapingResult):
model_config = ConfigDict(extra="forbid")
class LoginResult(ScrapingResult):
credentials: dict[str, str] = Field(default_factory=dict)
model_config = ConfigDict(extra="forbid")
class SubmitResult(ScrapingResult):
submission_id: str = ""
verdict: str = ""
model_config = ConfigDict(extra="forbid")
class ScraperConfig(BaseModel):
timeout_seconds: int = 30
max_retries: int = 3

15
scrapers/timeouts.py Normal file
View file

@ -0,0 +1,15 @@
from collections import defaultdict
HTTP_TIMEOUT = 15.0
BROWSER_SESSION_TIMEOUT = 15000
BROWSER_NAV_TIMEOUT = 10000
BROWSER_SUBMIT_NAV_TIMEOUT: defaultdict[str, int] = defaultdict(
lambda: BROWSER_NAV_TIMEOUT
)
BROWSER_SUBMIT_NAV_TIMEOUT["codeforces"] = BROWSER_NAV_TIMEOUT * 2
BROWSER_TURNSTILE_POLL = 5000
BROWSER_ELEMENT_WAIT = 10000
BROWSER_SETTLE_DELAY = 500
SUBMIT_POLL_TIMEOUT = 30.0

508
scrapers/usaco.py Normal file
View file

@ -0,0 +1,508 @@
#!/usr/bin/env python3
import asyncio
import json
import re
from pathlib import Path
from typing import Any, cast
import httpx
from .base import BaseScraper
from .timeouts import HTTP_TIMEOUT
from .models import (
ContestListResult,
ContestSummary,
LoginResult,
MetadataResult,
ProblemSummary,
SubmitResult,
TestCase,
)
BASE_URL = "http://www.usaco.org"
_AUTH_BASE = "https://usaco.org"
HEADERS = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
CONNECTIONS = 4
_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "usaco-cookies.json"
_LOGIN_PATH = "/current/tpcm/login-session.php"
_SUBMIT_PATH = "/current/tpcm/submitproblem.php"
_LANG_KEYWORDS: dict[str, list[str]] = {
"cpp": ["c++17", "c++ 17", "g++17", "c++", "cpp"],
"python": ["python3", "python 3", "python"],
"java": ["java"],
}
MONTHS = [
"dec",
"jan",
"feb",
"mar",
"open",
]
DIVISION_HEADING_RE = re.compile(
r"<h2>.*?USACO\s+(\d{4})\s+(\w+)\s+Contest,\s+(\w+)\s*</h2>",
re.IGNORECASE,
)
PROBLEM_BLOCK_RE = re.compile(
r"<b>([^<]+)</b>\s*<br\s*/?>.*?"
r"viewproblem2&cpid=(\d+)",
re.DOTALL,
)
SAMPLE_IN_RE = re.compile(r"<pre\s+class=['\"]in['\"]>(.*?)</pre>", re.DOTALL)
SAMPLE_OUT_RE = re.compile(r"<pre\s+class=['\"]out['\"]>(.*?)</pre>", re.DOTALL)
TIME_NOTE_RE = re.compile(
r"time\s+limit\s+(?:for\s+this\s+problem\s+is\s+)?(\d+)s",
re.IGNORECASE,
)
MEMORY_NOTE_RE = re.compile(
r"memory\s+limit\s+(?:for\s+this\s+problem\s+is\s+)?(\d+)\s*MB",
re.IGNORECASE,
)
RESULTS_PAGE_RE = re.compile(
r'href="index\.php\?page=([a-z]+\d{2,4}results)"',
re.IGNORECASE,
)
async def _fetch_text(client: httpx.AsyncClient, url: str) -> str:
r = await client.get(
url, headers=HEADERS, timeout=HTTP_TIMEOUT, follow_redirects=True
)
r.raise_for_status()
return r.text
def _parse_results_page(html: str) -> dict[str, list[tuple[str, str]]]:
sections: dict[str, list[tuple[str, str]]] = {}
current_div: str | None = None
parts = re.split(r"(<h2>.*?</h2>)", html, flags=re.DOTALL)
for part in parts:
heading_m = DIVISION_HEADING_RE.search(part)
if heading_m:
div = heading_m.group(3)
if div:
key = div.lower()
current_div = key
sections.setdefault(key, [])
continue
if current_div is not None:
for m in PROBLEM_BLOCK_RE.finditer(part):
name = m.group(1).strip()
cpid = m.group(2)
sections[current_div].append((cpid, name))
return sections
def _parse_contest_id(contest_id: str) -> tuple[str, str]:
parts = contest_id.rsplit("_", 1)
if len(parts) != 2:
return contest_id, ""
return parts[0], parts[1].lower()
def _results_page_slug(month_year: str) -> str:
return f"{month_year}results"
def _parse_problem_page(html: str) -> dict[str, Any]:
inputs = SAMPLE_IN_RE.findall(html)
outputs = SAMPLE_OUT_RE.findall(html)
tests: list[TestCase] = []
for inp, out in zip(inputs, outputs):
tests.append(
TestCase(
input=inp.strip().replace("\r", ""),
expected=out.strip().replace("\r", ""),
)
)
tm = TIME_NOTE_RE.search(html)
mm = MEMORY_NOTE_RE.search(html)
timeout_ms = int(tm.group(1)) * 1000 if tm else 4000
memory_mb = int(mm.group(1)) if mm else 256
interactive = "interactive problem" in html.lower()
return {
"tests": tests,
"timeout_ms": timeout_ms,
"memory_mb": memory_mb,
"interactive": interactive,
}
def _pick_lang_option(select_body: str, language_id: str) -> str | None:
keywords = _LANG_KEYWORDS.get(language_id.lower(), [language_id.lower()])
for m in re.finditer(
r'<option\b[^>]*\bvalue=["\']([^"\']*)["\'][^>]*>([^<]+)',
select_body,
re.IGNORECASE,
):
val, text = m.group(1), m.group(2).strip().lower()
for kw in keywords:
if kw in text:
return val
return None
def _parse_submit_form(
html: str, language_id: str
) -> tuple[str, dict[str, str], str | None]:
form_action = _AUTH_BASE + _SUBMIT_PATH
hidden: dict[str, str] = {}
lang_val: str | None = None
for form_m in re.finditer(
r'<form\b[^>]*action=["\']([^"\']+)["\'][^>]*>(.*?)</form>',
html,
re.DOTALL | re.IGNORECASE,
):
action, body = form_m.group(1), form_m.group(2)
if "sub_file" not in body.lower():
continue
if action.startswith("http"):
form_action = action
elif action.startswith("/"):
form_action = _AUTH_BASE + action
else:
form_action = _AUTH_BASE + "/" + action
for input_m in re.finditer(
r'<input\b[^>]*\btype=["\']hidden["\'][^>]*/?>',
body,
re.IGNORECASE,
):
tag = input_m.group(0)
name_m = re.search(r'\bname=["\']([^"\']+)["\']', tag, re.IGNORECASE)
val_m = re.search(r'\bvalue=["\']([^"\']*)["\']', tag, re.IGNORECASE)
if name_m and val_m:
hidden[name_m.group(1)] = val_m.group(2)
for sel_m in re.finditer(
r'<select\b[^>]*\bname=["\']([^"\']+)["\'][^>]*>(.*?)</select>',
body,
re.DOTALL | re.IGNORECASE,
):
name, sel_body = sel_m.group(1), sel_m.group(2)
if "lang" in name.lower():
lang_val = _pick_lang_option(sel_body, language_id)
break
break
return form_action, hidden, lang_val
async def _load_usaco_cookies(client: httpx.AsyncClient) -> None:
if not _COOKIE_PATH.exists():
return
try:
for k, v in json.loads(_COOKIE_PATH.read_text()).items():
client.cookies.set(k, v)
except Exception:
pass
async def _save_usaco_cookies(client: httpx.AsyncClient) -> None:
cookies = {k: v for k, v in client.cookies.items()}
if cookies:
_COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True)
_COOKIE_PATH.write_text(json.dumps(cookies))
async def _check_usaco_login(client: httpx.AsyncClient, username: str) -> bool:
try:
r = await client.get(
f"{_AUTH_BASE}/index.php",
headers=HEADERS,
timeout=HTTP_TIMEOUT,
)
text = r.text.lower()
return username.lower() in text or "logout" in text
except Exception:
return False
async def _do_usaco_login(
client: httpx.AsyncClient, username: str, password: str
) -> bool:
r = await client.post(
f"{_AUTH_BASE}{_LOGIN_PATH}",
data={"user": username, "password": password},
headers=HEADERS,
timeout=HTTP_TIMEOUT,
)
r.raise_for_status()
try:
data = r.json()
return bool(data.get("success") or data.get("status") == "success")
except Exception:
return r.status_code == 200 and "error" not in r.text.lower()
class USACOScraper(BaseScraper):
@property
def platform_name(self) -> str:
return "usaco"
async def scrape_contest_metadata(self, contest_id: str) -> MetadataResult:
try:
month_year, division = _parse_contest_id(contest_id)
if not division:
return self._metadata_error(
f"Invalid contest ID '{contest_id}'. "
"Expected format: <monthYY>_<division> (e.g. dec24_gold)"
)
slug = _results_page_slug(month_year)
async with httpx.AsyncClient() as client:
html = await _fetch_text(client, f"{BASE_URL}/index.php?page={slug}")
sections = _parse_results_page(html)
problems_raw = sections.get(division, [])
if not problems_raw:
return self._metadata_error(
f"No problems found for {contest_id} (division: {division})"
)
problems = [
ProblemSummary(id=cpid, name=name) for cpid, name in problems_raw
]
return MetadataResult(
success=True,
error="",
contest_id=contest_id,
problems=problems,
url=f"{BASE_URL}/index.php?page=viewproblem2&cpid=%s",
)
except Exception as e:
return self._metadata_error(str(e))
async def scrape_contest_list(self) -> ContestListResult:
try:
async with httpx.AsyncClient(
limits=httpx.Limits(max_connections=CONNECTIONS)
) as client:
html = await _fetch_text(client, f"{BASE_URL}/index.php?page=contests")
page_slugs: set[str] = set()
for m in RESULTS_PAGE_RE.finditer(html):
page_slugs.add(m.group(1))
recent_patterns = []
for year in range(15, 27):
for month in MONTHS:
recent_patterns.append(f"{month}{year:02d}results")
page_slugs.update(recent_patterns)
contests: list[ContestSummary] = []
sem = asyncio.Semaphore(CONNECTIONS)
async def check_page(slug: str) -> list[ContestSummary]:
async with sem:
try:
page_html = await _fetch_text(
client, f"{BASE_URL}/index.php?page={slug}"
)
except Exception:
return []
sections = _parse_results_page(page_html)
if not sections:
return []
month_year = slug.replace("results", "")
out: list[ContestSummary] = []
for div in sections:
cid = f"{month_year}_{div}"
year_m = re.search(r"\d{2,4}", month_year)
month_m = re.search(r"[a-z]+", month_year)
year_str = year_m.group() if year_m else ""
month_str = month_m.group().capitalize() if month_m else ""
if len(year_str) == 2:
year_str = f"20{year_str}"
display = (
f"USACO {year_str} {month_str} - {div.capitalize()}"
)
out.append(
ContestSummary(id=cid, name=cid, display_name=display)
)
return out
tasks = [check_page(slug) for slug in sorted(page_slugs)]
for coro in asyncio.as_completed(tasks):
contests.extend(await coro)
if not contests:
return self._contests_error("No contests found")
return ContestListResult(success=True, error="", contests=contests)
except Exception as e:
return self._contests_error(str(e))
async def stream_tests_for_category_async(self, category_id: str) -> None:
month_year, division = _parse_contest_id(category_id)
if not division:
return
slug = _results_page_slug(month_year)
async with httpx.AsyncClient(
limits=httpx.Limits(max_connections=CONNECTIONS)
) as client:
try:
html = await _fetch_text(client, f"{BASE_URL}/index.php?page={slug}")
except Exception:
return
sections = _parse_results_page(html)
problems_raw = sections.get(division, [])
if not problems_raw:
return
sem = asyncio.Semaphore(CONNECTIONS)
async def run_one(cpid: str) -> dict[str, Any]:
async with sem:
try:
problem_html = await _fetch_text(
client,
f"{BASE_URL}/index.php?page=viewproblem2&cpid={cpid}",
)
info = _parse_problem_page(problem_html)
except Exception:
info = {
"tests": [],
"timeout_ms": 4000,
"memory_mb": 256,
"interactive": False,
}
tests = cast(list[TestCase], info["tests"])
combined_input = "\n".join(t.input for t in tests) if tests else ""
combined_expected = (
"\n".join(t.expected for t in tests) if tests else ""
)
return {
"problem_id": cpid,
"combined": {
"input": combined_input,
"expected": combined_expected,
},
"tests": [
{"input": t.input, "expected": t.expected} for t in tests
],
"timeout_ms": info["timeout_ms"],
"memory_mb": info["memory_mb"],
"interactive": info["interactive"],
"multi_test": False,
}
tasks = [run_one(cpid) for cpid, _ in problems_raw]
for coro in asyncio.as_completed(tasks):
payload = await coro
print(json.dumps(payload), flush=True)
async def submit(
self,
contest_id: str,
problem_id: str,
file_path: str,
language_id: str,
credentials: dict[str, str],
) -> SubmitResult:
source = Path(file_path).read_bytes()
username = credentials.get("username", "")
password = credentials.get("password", "")
if not username or not password:
return self._submit_error("Missing credentials. Use :CP usaco login")
async with httpx.AsyncClient(follow_redirects=True) as client:
await _load_usaco_cookies(client)
print(json.dumps({"status": "checking_login"}), flush=True)
logged_in = bool(client.cookies) and await _check_usaco_login(
client, username
)
if not logged_in:
print(json.dumps({"status": "logging_in"}), flush=True)
try:
ok = await _do_usaco_login(client, username, password)
except Exception as e:
return self._submit_error(f"Login failed: {e}")
if not ok:
return self._submit_error("Login failed (bad credentials?)")
await _save_usaco_cookies(client)
print(json.dumps({"status": "submitting"}), flush=True)
try:
page_r = await client.get(
f"{_AUTH_BASE}/index.php?page=viewproblem2&cpid={problem_id}",
headers=HEADERS,
timeout=HTTP_TIMEOUT,
)
form_url, hidden_fields, lang_val = _parse_submit_form(
page_r.text, language_id
)
except Exception:
form_url = _AUTH_BASE + _SUBMIT_PATH
hidden_fields = {}
lang_val = None
data: dict[str, str] = {"cpid": problem_id, **hidden_fields}
data["language"] = lang_val if lang_val is not None else language_id
ext = "py" if "python" in language_id.lower() else "cpp"
try:
r = await client.post(
form_url,
data=data,
files={"sub_file[]": (f"solution.{ext}", source, "text/plain")},
headers=HEADERS,
timeout=HTTP_TIMEOUT,
)
r.raise_for_status()
except Exception as e:
return self._submit_error(f"Submit request failed: {e}")
try:
resp = r.json()
sid = str(resp.get("submission_id", resp.get("id", "")))
except Exception:
sid = ""
return SubmitResult(
success=True, error="", submission_id=sid, verdict="submitted"
)
async def login(self, credentials: dict[str, str]) -> LoginResult:
username = credentials.get("username", "")
password = credentials.get("password", "")
if not username or not password:
return self._login_error("Missing username or password")
async with httpx.AsyncClient(follow_redirects=True) as client:
await _load_usaco_cookies(client)
if client.cookies:
print(json.dumps({"status": "checking_login"}), flush=True)
if await _check_usaco_login(client, username):
return LoginResult(
success=True,
error="",
credentials={"username": username, "password": password},
)
print(json.dumps({"status": "logging_in"}), flush=True)
try:
ok = await _do_usaco_login(client, username, password)
except Exception as e:
return self._login_error(f"Login request failed: {e}")
if not ok:
return self._login_error("Login failed (bad credentials?)")
await _save_usaco_cookies(client)
return LoginResult(
success=True,
error="",
credentials={"username": username, "password": password},
)
if __name__ == "__main__":
USACOScraper().run_cli()

13
scripts/ci.sh Executable file
View file

@ -0,0 +1,13 @@
#!/bin/sh
set -eu
nix develop --command stylua --check .
git ls-files '*.lua' | xargs nix develop --command selene --display-style quiet
nix develop --command prettier --check .
nix fmt
git diff --exit-code -- '*.nix'
nix develop --command lua-language-server --check . --checklevel=Warning
nix develop --command ruff format --check .
nix develop --command ruff check .
nix develop --command ty check .
nix develop --command python -m pytest tests/ -v

113
scripts/stress.py Executable file
View file

@ -0,0 +1,113 @@
#!/usr/bin/env python3
import subprocess
import sys
def main() -> None:
argv = sys.argv[1:]
max_iterations = 1000
timeout = 10
positional: list[str] = []
i = 0
while i < len(argv):
if argv[i] == "--max-iterations" and i + 1 < len(argv):
max_iterations = int(argv[i + 1])
i += 2
elif argv[i] == "--timeout" and i + 1 < len(argv):
timeout = int(argv[i + 1])
i += 2
else:
positional.append(argv[i])
i += 1
if len(positional) != 3:
print(
"Usage: stress.py <generator> <brute> <candidate> "
"[--max-iterations N] [--timeout S]",
file=sys.stderr,
)
sys.exit(1)
generator, brute, candidate = positional
for iteration in range(1, max_iterations + 1):
try:
gen_result = subprocess.run(
generator,
capture_output=True,
text=True,
shell=True,
timeout=timeout,
)
except subprocess.TimeoutExpired:
print(
f"[stress] generator timed out on iteration {iteration}",
file=sys.stderr,
)
sys.exit(1)
if gen_result.returncode != 0:
print(
f"[stress] generator failed on iteration {iteration} "
f"(exit code {gen_result.returncode})",
file=sys.stderr,
)
if gen_result.stderr:
print(gen_result.stderr, file=sys.stderr, end="")
sys.exit(1)
test_input = gen_result.stdout
try:
brute_result = subprocess.run(
brute,
input=test_input,
capture_output=True,
text=True,
shell=True,
timeout=timeout,
)
except subprocess.TimeoutExpired:
print(f"[stress] brute timed out on iteration {iteration}", file=sys.stderr)
print(f"\n--- input ---\n{test_input}", end="")
sys.exit(1)
try:
cand_result = subprocess.run(
candidate,
input=test_input,
capture_output=True,
text=True,
shell=True,
timeout=timeout,
)
except subprocess.TimeoutExpired:
print(
f"[stress] candidate timed out on iteration {iteration}",
file=sys.stderr,
)
print(f"\n--- input ---\n{test_input}", end="")
sys.exit(1)
brute_out = brute_result.stdout.strip()
cand_out = cand_result.stdout.strip()
if brute_out != cand_out:
print(f"[stress] mismatch on iteration {iteration}", file=sys.stderr)
print(f"\n--- input ---\n{test_input}", end="")
print(f"\n--- expected (brute) ---\n{brute_out}")
print(f"\n--- actual (candidate) ---\n{cand_out}")
sys.exit(1)
print(f"[stress] iteration {iteration} OK", file=sys.stderr)
print(
f"[stress] all {max_iterations} iterations passed",
file=sys.stderr,
)
sys.exit(0)
if __name__ == "__main__":
main()

View file

@ -1 +1,4 @@
std = 'vim'
[lints]
bad_string_escape = 'allow'

View file

@ -10,7 +10,6 @@ from typing import Any
import httpx
import pytest
import requests
from scrapling import fetchers
ROOT = Path(__file__).resolve().parent.parent
FIX = Path(__file__).resolve().parent / "fixtures"
@ -136,12 +135,10 @@ def run_scraper_offline(fixture_text):
case "codeforces":
class MockCodeForcesPage:
def __init__(self, html: str):
self.html_content = html
def _mock_stealthy_fetch(url: str, **kwargs):
return MockCodeForcesPage(_router_codeforces(url=url))
def _mock_fetch_problems_html(cid: str) -> str:
return _router_codeforces(
url=f"https://codeforces.com/contest/{cid}/problems"
)
def _mock_requests_get(url: str, **kwargs):
if "api/contest.list" in url:
@ -172,7 +169,7 @@ def run_scraper_offline(fixture_text):
raise AssertionError(f"Unexpected requests.get call: {url}")
return {
"Fetcher.get": _mock_stealthy_fetch,
"_fetch_problems_html": _mock_fetch_problems_html,
"requests.get": _mock_requests_get,
}
@ -212,21 +209,8 @@ def run_scraper_offline(fixture_text):
return MockResponse(data)
raise AssertionError(f"No fixture for CodeChef url={url!r}")
class MockCodeChefPage:
def __init__(self, html: str):
self.body = html
self.status = 200
def _mock_stealthy_fetch(url: str, **kwargs):
if "/problems/" in url:
problem_id = url.rstrip("/").split("/")[-1]
html = fixture_text(f"codechef/{problem_id}.html")
return MockCodeChefPage(html)
raise AssertionError(f"No fixture for CodeChef url={url!r}")
return {
"__offline_get_async": __offline_get_async,
"Fetcher.get": _mock_stealthy_fetch,
}
case _:
@ -245,7 +229,7 @@ def run_scraper_offline(fixture_text):
offline_fetches = _make_offline_fetches(scraper_name)
if scraper_name == "codeforces":
fetchers.Fetcher.get = offline_fetches["Fetcher.get"]
ns._fetch_problems_html = offline_fetches["_fetch_problems_html"]
requests.get = offline_fetches["requests.get"]
elif scraper_name == "atcoder":
ns._fetch = offline_fetches["_fetch"]
@ -254,7 +238,6 @@ def run_scraper_offline(fixture_text):
httpx.AsyncClient.get = offline_fetches["__offline_fetch_text"]
elif scraper_name == "codechef":
httpx.AsyncClient.get = offline_fetches["__offline_get_async"]
fetchers.Fetcher.get = offline_fetches["Fetcher.get"]
scraper_class = getattr(ns, scraper_classes[scraper_name])
scraper = scraper_class()

View file

@ -6,11 +6,6 @@ from scrapers.models import (
TestsResult,
)
MODEL_FOR_MODE = {
"metadata": MetadataResult,
"contests": ContestListResult,
}
MATRIX = {
"cses": {
"metadata": ("introductory_problems",),
@ -43,17 +38,16 @@ def test_scraper_offline_fixture_matrix(run_scraper_offline, scraper, mode):
assert rc in (0, 1), f"Bad exit code {rc}"
assert objs, f"No JSON output for {scraper}:{mode}"
if mode in ("metadata", "contests"):
Model = MODEL_FOR_MODE[mode]
model = Model.model_validate(objs[-1])
assert model is not None
if mode == "metadata":
model = MetadataResult.model_validate(objs[-1])
assert model.success is True
if mode == "metadata":
assert model.url
assert len(model.problems) >= 1
assert all(isinstance(p.id, str) and p.id for p in model.problems)
else:
assert len(model.contests) >= 1
assert model.url
assert len(model.problems) >= 1
assert all(isinstance(p.id, str) and p.id for p in model.problems)
elif mode == "contests":
model = ContestListResult.model_validate(objs[-1])
assert model.success is True
assert len(model.contests) >= 1
else:
assert len(objs) >= 1, "No test objects returned"
validated_any = False

1166
uv.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,30 +0,0 @@
[selene]
base = "lua51"
name = "vim"
[vim]
any = true
[jit]
any = true
[assert]
any = true
[describe]
any = true
[it]
any = true
[before_each]
any = true
[after_each]
any = true
[spy]
any = true
[stub]
any = true

26
vim.yaml Normal file
View file

@ -0,0 +1,26 @@
---
base: lua51
name: vim
lua_versions:
- luajit
globals:
vim:
any: true
jit:
any: true
assert:
any: true
describe:
any: true
it:
any: true
before_each:
any: true
after_each:
any: true
spy:
any: true
stub:
any: true
bit:
any: true