feat: validate credentials on :CP <platform> login (#310)

## Problem

`:CP <platform> login` blindly caches username/password without
server-side
validation. Bad credentials are only discovered at submit time, which is
confusing and wastes a browser session.

## Solution

Wire `:CP <platform> login` through the scraper pipeline so each
platform
actually authenticates before persisting credentials. On failure, the
user
sees an error and nothing is cached.

- CSES: reuses `_check_token` (fast path) and `_web_login`; returns API
token
  in `LoginResult.credentials` so subsequent submits skip re-auth.
- AtCoder/Codeforces: new `_login_headless` functions open a
StealthySession,
solve Turnstile/Cloudflare, fill the login form, and validate success by
  checking for the logout link. Cookies only persist on confirmed login.
- CodeChef/Kattis/USACO: return "not yet implemented" errors.
- `scraper.lua`: generalizes submit-only guards (`needs_browser` flag)
to
  cover both `submit` and `login` subcommands.
- `credentials.lua`: prompts for username/password, passes cached token
for
CSES fast path, shows ndjson status notifications, only caches on
success.
This commit is contained in:
Barrett Ruth 2026-03-05 15:12:09 -05:00 committed by GitHub
parent a202725cc5
commit 2c119774df
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 356 additions and 108 deletions

View file

@ -5,8 +5,8 @@ import json
import os
import re
import subprocess
import sys
import time
from pathlib import Path
from typing import Any
import backoff
@ -17,16 +17,14 @@ from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from .base import BaseScraper, extract_precision
from .language_ids import get_language_id
from .models import (
CombinedTest,
ContestListResult,
ContestSummary,
LoginResult,
MetadataResult,
ProblemSummary,
SubmitResult,
TestCase,
TestsResult,
)
from .timeouts import (
BROWSER_ELEMENT_WAIT,
@ -295,6 +293,93 @@ def _ensure_browser() -> None:
break
def _login_headless(credentials: dict[str, str]) -> LoginResult:
try:
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
except ImportError:
return LoginResult(
success=False,
error="scrapling is required for AtCoder login. Install it: uv add 'scrapling[fetchers]>=0.4'",
)
_ensure_browser()
cookie_cache = Path.home() / ".cache" / "cp-nvim" / "atcoder-cookies.json"
cookie_cache.parent.mkdir(parents=True, exist_ok=True)
saved_cookies: list[dict[str, Any]] = []
if cookie_cache.exists():
try:
saved_cookies = json.loads(cookie_cache.read_text())
except Exception:
pass
logged_in = False
login_error: str | None = None
def check_login(page):
nonlocal logged_in
logged_in = page.evaluate(
"() => Array.from(document.querySelectorAll('a')).some(a => a.textContent.trim() === 'Sign Out')"
)
def login_action(page):
nonlocal login_error
try:
_solve_turnstile(page)
page.fill('input[name="username"]', credentials.get("username", ""))
page.fill('input[name="password"]', credentials.get("password", ""))
page.click("#submit")
page.wait_for_url(
lambda url: "/login" not in url, timeout=BROWSER_NAV_TIMEOUT
)
except Exception as e:
login_error = str(e)
try:
with StealthySession(
headless=True,
timeout=BROWSER_SESSION_TIMEOUT,
google_search=False,
cookies=saved_cookies if saved_cookies else [],
) as session:
if saved_cookies:
print(json.dumps({"status": "checking_login"}), flush=True)
session.fetch(
f"{BASE_URL}/home", page_action=check_login, network_idle=True
)
if not logged_in:
print(json.dumps({"status": "logging_in"}), flush=True)
session.fetch(
f"{BASE_URL}/login",
page_action=login_action,
solve_cloudflare=True,
)
if login_error:
return LoginResult(
success=False, error=f"Login failed: {login_error}"
)
session.fetch(
f"{BASE_URL}/home", page_action=check_login, network_idle=True
)
if not logged_in:
return LoginResult(
success=False, error="Login failed (bad credentials?)"
)
try:
browser_cookies = session.context.cookies()
if any(c["name"] == "REVEL_SESSION" for c in browser_cookies):
cookie_cache.write_text(json.dumps(browser_cookies))
except Exception:
pass
return LoginResult(success=True, error="")
except Exception as e:
return LoginResult(success=False, error=str(e))
def _submit_headless(
contest_id: str,
problem_id: str,
@ -303,8 +388,6 @@ def _submit_headless(
credentials: dict[str, str],
_retried: bool = False,
) -> "SubmitResult":
from pathlib import Path
try:
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
except ImportError:
@ -588,95 +671,11 @@ class AtcoderScraper(BaseScraper):
credentials,
)
async def main_async() -> int:
if len(sys.argv) < 2:
result = MetadataResult(
success=False,
error="Usage: atcoder.py metadata <contest_id> OR atcoder.py tests <contest_id> OR atcoder.py contests",
url="",
)
print(result.model_dump_json())
return 1
mode: str = sys.argv[1]
scraper = AtcoderScraper()
if mode == "metadata":
if len(sys.argv) != 3:
result = MetadataResult(
success=False,
error="Usage: atcoder.py metadata <contest_id>",
url="",
)
print(result.model_dump_json())
return 1
contest_id = sys.argv[2]
result = await scraper.scrape_contest_metadata(contest_id)
print(result.model_dump_json())
return 0 if result.success else 1
if mode == "tests":
if len(sys.argv) != 3:
tests_result = TestsResult(
success=False,
error="Usage: atcoder.py tests <contest_id>",
problem_id="",
combined=CombinedTest(input="", expected=""),
tests=[],
timeout_ms=0,
memory_mb=0,
)
print(tests_result.model_dump_json())
return 1
contest_id = sys.argv[2]
await scraper.stream_tests_for_category_async(contest_id)
return 0
if mode == "contests":
if len(sys.argv) != 2:
contest_result = ContestListResult(
success=False, error="Usage: atcoder.py contests"
)
print(contest_result.model_dump_json())
return 1
contest_result = await scraper.scrape_contest_list()
print(contest_result.model_dump_json())
return 0 if contest_result.success else 1
if mode == "submit":
if len(sys.argv) != 6:
print(
SubmitResult(
success=False,
error="Usage: atcoder.py submit <contest_id> <problem_id> <language> <file_path>",
).model_dump_json()
)
return 1
creds_raw = os.environ.get("CP_CREDENTIALS", "{}")
try:
credentials = json.loads(creds_raw)
except json.JSONDecodeError:
credentials = {}
language_id = get_language_id("atcoder", sys.argv[4]) or sys.argv[4]
submit_result = await scraper.submit(
sys.argv[2], sys.argv[3], sys.argv[5], language_id, credentials
)
print(submit_result.model_dump_json())
return 0 if submit_result.success else 1
result = MetadataResult(
success=False,
error="Unknown mode. Use 'metadata <contest_id>', 'tests <contest_id>', 'contests', or 'submit <contest_id> <problem_id> <language>'",
url="",
)
print(result.model_dump_json())
return 1
def main() -> None:
sys.exit(asyncio.run(main_async()))
async def login(self, credentials: dict[str, str]) -> LoginResult:
if not credentials.get("username") or not credentials.get("password"):
return self._login_error("Missing username or password")
return await asyncio.to_thread(_login_headless, credentials)
if __name__ == "__main__":
main()
AtcoderScraper().run_cli()

View file

@ -9,6 +9,7 @@ from .language_ids import get_language_id
from .models import (
CombinedTest,
ContestListResult,
LoginResult,
MetadataResult,
SubmitResult,
TestsResult,
@ -58,9 +59,12 @@ class BaseScraper(ABC):
credentials: dict[str, str],
) -> SubmitResult: ...
@abstractmethod
async def login(self, credentials: dict[str, str]) -> LoginResult: ...
def _usage(self) -> str:
name = self.platform_name
return f"Usage: {name}.py metadata <id> | tests <id> | contests"
return f"Usage: {name}.py metadata <id> | tests <id> | contests | login"
def _metadata_error(self, msg: str) -> MetadataResult:
return MetadataResult(success=False, error=msg, url="")
@ -82,6 +86,9 @@ class BaseScraper(ABC):
def _submit_error(self, msg: str) -> SubmitResult:
return SubmitResult(success=False, error=msg)
def _login_error(self, msg: str) -> LoginResult:
return LoginResult(success=False, error=msg)
async def _run_cli_async(self, args: list[str]) -> int:
if len(args) < 2:
print(self._metadata_error(self._usage()).model_dump_json())
@ -133,6 +140,16 @@ class BaseScraper(ABC):
print(result.model_dump_json())
return 0 if result.success else 1
case "login":
creds_raw = os.environ.get("CP_CREDENTIALS", "{}")
try:
credentials = json.loads(creds_raw)
except json.JSONDecodeError:
credentials = {}
result = await self.login(credentials)
print(result.model_dump_json())
return 0 if result.success else 1
case _:
print(
self._metadata_error(

View file

@ -13,6 +13,7 @@ from .timeouts import HTTP_TIMEOUT
from .models import (
ContestListResult,
ContestSummary,
LoginResult,
MetadataResult,
ProblemSummary,
SubmitResult,
@ -267,6 +268,9 @@ class CodeChefScraper(BaseScraper):
verdict="",
)
async def login(self, credentials: dict[str, str]) -> LoginResult:
return self._login_error("CodeChef login not yet implemented")
if __name__ == "__main__":
CodeChefScraper().run_cli()

View file

@ -13,6 +13,7 @@ from .base import BaseScraper, extract_precision
from .models import (
ContestListResult,
ContestSummary,
LoginResult,
MetadataResult,
ProblemSummary,
SubmitResult,
@ -303,6 +304,111 @@ class CodeforcesScraper(BaseScraper):
credentials,
)
async def login(self, credentials: dict[str, str]) -> LoginResult:
if not credentials.get("username") or not credentials.get("password"):
return self._login_error("Missing username or password")
return await asyncio.to_thread(_login_headless_cf, credentials)
def _login_headless_cf(credentials: dict[str, str]) -> LoginResult:
from pathlib import Path
try:
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
except ImportError:
return LoginResult(
success=False,
error="scrapling is required for Codeforces login",
)
from .atcoder import _ensure_browser
_ensure_browser()
cookie_cache = Path.home() / ".cache" / "cp-nvim" / "codeforces-cookies.json"
cookie_cache.parent.mkdir(parents=True, exist_ok=True)
saved_cookies: list[dict[str, Any]] = []
if cookie_cache.exists():
try:
saved_cookies = json.loads(cookie_cache.read_text())
except Exception:
pass
logged_in = False
login_error: str | None = None
def check_login(page):
nonlocal logged_in
logged_in = page.evaluate(
"() => Array.from(document.querySelectorAll('a'))"
".some(a => a.textContent.includes('Logout'))"
)
def login_action(page):
nonlocal login_error
try:
page.fill(
'input[name="handleOrEmail"]',
credentials.get("username", ""),
)
page.fill(
'input[name="password"]',
credentials.get("password", ""),
)
page.locator('#enterForm input[type="submit"]').click()
page.wait_for_url(
lambda url: "/enter" not in url, timeout=BROWSER_NAV_TIMEOUT
)
except Exception as e:
login_error = str(e)
try:
with StealthySession(
headless=True,
timeout=BROWSER_SESSION_TIMEOUT,
google_search=False,
cookies=saved_cookies if saved_cookies else [],
) as session:
if saved_cookies:
print(json.dumps({"status": "checking_login"}), flush=True)
session.fetch(
f"{BASE_URL}/",
page_action=check_login,
network_idle=True,
)
if not logged_in:
print(json.dumps({"status": "logging_in"}), flush=True)
session.fetch(
f"{BASE_URL}/enter",
page_action=login_action,
solve_cloudflare=True,
)
if login_error:
return LoginResult(
success=False, error=f"Login failed: {login_error}"
)
session.fetch(
f"{BASE_URL}/",
page_action=check_login,
network_idle=True,
)
if not logged_in:
return LoginResult(
success=False, error="Login failed (bad credentials?)"
)
try:
browser_cookies = session.context.cookies()
cookie_cache.write_text(json.dumps(browser_cookies))
except Exception:
pass
return LoginResult(success=True, error="")
except Exception as e:
return LoginResult(success=False, error=str(e))
def _submit_headless(
contest_id: str,

View file

@ -13,6 +13,7 @@ from .timeouts import HTTP_TIMEOUT, SUBMIT_POLL_TIMEOUT
from .models import (
ContestListResult,
ContestSummary,
LoginResult,
MetadataResult,
ProblemSummary,
SubmitResult,
@ -229,6 +230,43 @@ class CSESScraper(BaseScraper):
)
return ContestListResult(success=True, error="", contests=cats)
async def login(self, credentials: dict[str, str]) -> LoginResult:
username = credentials.get("username", "")
password = credentials.get("password", "")
if not username or not password:
return self._login_error("Missing username or password")
async with httpx.AsyncClient(follow_redirects=True) as client:
token = credentials.get("token")
if token:
print(json.dumps({"status": "checking_login"}), flush=True)
if await self._check_token(client, token):
return LoginResult(
success=True,
error="",
credentials={
"username": username,
"password": password,
"token": token,
},
)
print(json.dumps({"status": "logging_in"}), flush=True)
token = await self._web_login(client, username, password)
if not token:
return self._login_error("Login failed (bad credentials?)")
return LoginResult(
success=True,
error="",
credentials={
"username": username,
"password": password,
"token": token,
},
)
async def stream_tests_for_category_async(self, category_id: str) -> None:
async with httpx.AsyncClient(
limits=httpx.Limits(max_connections=CONNECTIONS)

View file

@ -14,6 +14,7 @@ from .timeouts import HTTP_TIMEOUT
from .models import (
ContestListResult,
ContestSummary,
LoginResult,
MetadataResult,
ProblemSummary,
SubmitResult,
@ -284,6 +285,9 @@ class KattisScraper(BaseScraper):
verdict="",
)
async def login(self, credentials: dict[str, str]) -> LoginResult:
return self._login_error("Kattis login not yet implemented")
if __name__ == "__main__":
KattisScraper().run_cli()

View file

@ -64,6 +64,12 @@ class TestsResult(ScrapingResult):
model_config = ConfigDict(extra="forbid")
class LoginResult(ScrapingResult):
credentials: dict[str, str] = Field(default_factory=dict)
model_config = ConfigDict(extra="forbid")
class SubmitResult(ScrapingResult):
submission_id: str = ""
verdict: str = ""

View file

@ -12,6 +12,7 @@ from .timeouts import HTTP_TIMEOUT
from .models import (
ContestListResult,
ContestSummary,
LoginResult,
MetadataResult,
ProblemSummary,
SubmitResult,
@ -299,6 +300,9 @@ class USACOScraper(BaseScraper):
verdict="",
)
async def login(self, credentials: dict[str, str]) -> LoginResult:
return self._login_error("USACO login not yet implemented")
if __name__ == "__main__":
USACOScraper().run_cli()