Compare commits

..

No commits in common. "09a03f25aab2086959f74097c391af8005e99ac6" and "eb0dea777e3ea2edf9541060fc4e349a8a79f31c" have entirely different histories.

8 changed files with 262 additions and 435 deletions

View file

@ -219,6 +219,4 @@ M.LANGUAGE_VERSIONS = {
M.DEFAULT_VERSIONS = { cpp = 'c++20', python = 'python3' } M.DEFAULT_VERSIONS = { cpp = 'c++20', python = 'python3' }
M.COOKIE_FILE = vim.fn.expand('~/.cache/cp-nvim/cookies.json')
return M return M

View file

@ -38,7 +38,6 @@ local function prompt_and_login(platform, display)
end, function(result) end, function(result)
vim.schedule(function() vim.schedule(function()
if result.success then if result.success then
cache.set_credentials(platform, credentials)
logger.log( logger.log(
display .. ' login successful', display .. ' login successful',
{ level = vim.log.levels.INFO, override = true } { level = vim.log.levels.INFO, override = true }
@ -106,14 +105,6 @@ function M.logout(platform)
local display = constants.PLATFORM_DISPLAY_NAMES[platform] or platform local display = constants.PLATFORM_DISPLAY_NAMES[platform] or platform
cache.load() cache.load()
cache.clear_credentials(platform) cache.clear_credentials(platform)
local cookie_file = constants.COOKIE_FILE
if vim.fn.filereadable(cookie_file) == 1 then
local ok, data = pcall(vim.fn.json_decode, vim.fn.readfile(cookie_file, 'b'))
if ok and type(data) == 'table' then
data[platform] = nil
vim.fn.writefile({ vim.fn.json_encode(data) }, cookie_file)
end
end
logger.log(display .. ' credentials cleared', { level = vim.log.levels.INFO, override = true }) logger.log(display .. ' credentials cleared', { level = vim.log.levels.INFO, override = true })
end end

View file

@ -6,6 +6,7 @@ import os
import re import re
import subprocess import subprocess
import time import time
from pathlib import Path
from typing import Any from typing import Any
import backoff import backoff
@ -15,13 +16,7 @@ from bs4 import BeautifulSoup, Tag
from requests.adapters import HTTPAdapter from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry from urllib3.util.retry import Retry
from .base import ( from .base import BaseScraper, extract_precision
BaseScraper,
clear_platform_cookies,
extract_precision,
load_platform_cookies,
save_platform_cookies,
)
from .models import ( from .models import (
ContestListResult, ContestListResult,
ContestSummary, ContestSummary,
@ -384,15 +379,26 @@ def _ensure_browser() -> None:
break break
def _at_check_logged_in(page) -> bool: def _login_headless(credentials: dict[str, str]) -> LoginResult:
return page.evaluate( try:
"() => Array.from(document.querySelectorAll('a')).some(a => a.textContent.trim() === 'Sign Out')" from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
except ImportError:
return LoginResult(
success=False,
error="scrapling is required for AtCoder login. Install it: uv add 'scrapling[fetchers]>=0.4'",
) )
_ensure_browser()
def _at_login_action(credentials: dict[str, str]): logged_in = False
login_error: str | None = None login_error: str | None = None
def check_login(page):
nonlocal logged_in
logged_in = page.evaluate(
"() => Array.from(document.querySelectorAll('a')).some(a => a.textContent.trim() === 'Sign Out')"
)
def login_action(page): def login_action(page):
nonlocal login_error nonlocal login_error
try: try:
@ -406,47 +412,6 @@ def _at_login_action(credentials: dict[str, str]):
except Exception as e: except Exception as e:
login_error = str(e) login_error = str(e)
return login_action, lambda: login_error
def _login_headless(credentials: dict[str, str]) -> LoginResult:
try:
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
except ImportError:
return LoginResult(
success=False,
error="scrapling is required for AtCoder login. Install it: uv add 'scrapling[fetchers]>=0.4'",
)
_ensure_browser()
saved_cookies = load_platform_cookies("atcoder") or []
if saved_cookies:
print(json.dumps({"status": "checking_login"}), flush=True)
logged_in = False
def check_action(page):
nonlocal logged_in
logged_in = _at_check_logged_in(page)
try:
with StealthySession(
headless=True,
timeout=BROWSER_SESSION_TIMEOUT,
google_search=False,
cookies=saved_cookies,
) as session:
session.fetch(
f"{BASE_URL}/home", page_action=check_action, network_idle=True
)
if logged_in:
return LoginResult(success=True, error="")
except Exception:
pass
login_action, get_error = _at_login_action(credentials)
try: try:
with StealthySession( with StealthySession(
headless=True, headless=True,
@ -459,31 +424,17 @@ def _login_headless(credentials: dict[str, str]) -> LoginResult:
page_action=login_action, page_action=login_action,
solve_cloudflare=True, solve_cloudflare=True,
) )
login_error = get_error()
if login_error: if login_error:
return LoginResult(success=False, error=f"Login failed: {login_error}") return LoginResult(success=False, error=f"Login failed: {login_error}")
logged_in = False
def verify_action(page):
nonlocal logged_in
logged_in = _at_check_logged_in(page)
session.fetch( session.fetch(
f"{BASE_URL}/home", page_action=verify_action, network_idle=True f"{BASE_URL}/home", page_action=check_login, network_idle=True
) )
if not logged_in: if not logged_in:
return LoginResult( return LoginResult(
success=False, error="Login failed (bad credentials?)" success=False, error="Login failed (bad credentials?)"
) )
try:
browser_cookies = session.context.cookies()
if browser_cookies:
save_platform_cookies("atcoder", browser_cookies)
except Exception:
pass
return LoginResult(success=True, error="") return LoginResult(success=True, error="")
except Exception as e: except Exception as e:
return LoginResult(success=False, error=str(e)) return LoginResult(success=False, error=str(e))
@ -495,7 +446,6 @@ def _submit_headless(
file_path: str, file_path: str,
language_id: str, language_id: str,
credentials: dict[str, str], credentials: dict[str, str],
_retried: bool = False,
) -> "SubmitResult": ) -> "SubmitResult":
try: try:
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import] from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
@ -507,24 +457,26 @@ def _submit_headless(
_ensure_browser() _ensure_browser()
saved_cookies: list[dict[str, Any]] = [] login_error: str | None = None
if not _retried:
saved_cookies = load_platform_cookies("atcoder") or []
logged_in = bool(saved_cookies)
submit_error: str | None = None submit_error: str | None = None
needs_relogin = False
def check_login(page): def login_action(page):
nonlocal logged_in nonlocal login_error
logged_in = _at_check_logged_in(page) try:
_solve_turnstile(page)
login_action, get_login_error = _at_login_action(credentials) page.fill('input[name="username"]', credentials.get("username", ""))
page.fill('input[name="password"]', credentials.get("password", ""))
page.click("#submit")
page.wait_for_url(
lambda url: "/login" not in url, timeout=BROWSER_NAV_TIMEOUT
)
except Exception as e:
login_error = str(e)
def submit_action(page): def submit_action(page):
nonlocal submit_error, needs_relogin nonlocal submit_error
if "/login" in page.url: if "/login" in page.url:
needs_relogin = True submit_error = "Not logged in after login step"
return return
try: try:
_solve_turnstile(page) _solve_turnstile(page)
@ -536,12 +488,18 @@ def _submit_headless(
f'select[name="data.LanguageId"] option[value="{language_id}"]' f'select[name="data.LanguageId"] option[value="{language_id}"]'
).wait_for(state="attached", timeout=BROWSER_ELEMENT_WAIT) ).wait_for(state="attached", timeout=BROWSER_ELEMENT_WAIT)
page.select_option('select[name="data.LanguageId"]', language_id) page.select_option('select[name="data.LanguageId"]', language_id)
page.set_input_files("#input-open-file", file_path) ext = _LANGUAGE_ID_EXTENSION.get(
page.wait_for_function( language_id, Path(file_path).suffix.lstrip(".") or "txt"
"() => { const ta = document.getElementById('plain-textarea'); return ta && ta.value.length > 0; }",
timeout=BROWSER_ELEMENT_WAIT,
) )
page.evaluate("document.getElementById('submit').click()") page.set_input_files(
"#input-open-file",
{
"name": f"solution.{ext}",
"mimeType": "text/plain",
"buffer": Path(file_path).read_bytes(),
},
)
page.locator('button[type="submit"]').click(no_wait_after=True)
page.wait_for_url( page.wait_for_url(
lambda url: "/submissions/me" in url, lambda url: "/submissions/me" in url,
timeout=BROWSER_SUBMIT_NAV_TIMEOUT["atcoder"], timeout=BROWSER_SUBMIT_NAV_TIMEOUT["atcoder"],
@ -554,33 +512,15 @@ def _submit_headless(
headless=True, headless=True,
timeout=BROWSER_SESSION_TIMEOUT, timeout=BROWSER_SESSION_TIMEOUT,
google_search=False, google_search=False,
cookies=saved_cookies if saved_cookies else [],
) as session: ) as session:
if not _retried and saved_cookies:
print(json.dumps({"status": "checking_login"}), flush=True)
session.fetch(
f"{BASE_URL}/home", page_action=check_login, network_idle=True
)
if not logged_in:
print(json.dumps({"status": "logging_in"}), flush=True) print(json.dumps({"status": "logging_in"}), flush=True)
session.fetch( session.fetch(
f"{BASE_URL}/login", f"{BASE_URL}/login",
page_action=login_action, page_action=login_action,
solve_cloudflare=True, solve_cloudflare=True,
) )
login_error = get_login_error()
if login_error: if login_error:
return SubmitResult( return SubmitResult(success=False, error=f"Login failed: {login_error}")
success=False, error=f"Login failed: {login_error}"
)
logged_in = True
try:
browser_cookies = session.context.cookies()
if browser_cookies:
save_platform_cookies("atcoder", browser_cookies)
except Exception:
pass
print(json.dumps({"status": "submitting"}), flush=True) print(json.dumps({"status": "submitting"}), flush=True)
session.fetch( session.fetch(
@ -589,17 +529,6 @@ def _submit_headless(
solve_cloudflare=True, solve_cloudflare=True,
) )
if needs_relogin and not _retried:
clear_platform_cookies("atcoder")
return _submit_headless(
contest_id,
problem_id,
file_path,
language_id,
credentials,
_retried=True,
)
if submit_error: if submit_error:
return SubmitResult(success=False, error=submit_error) return SubmitResult(success=False, error=submit_error)

View file

@ -4,8 +4,6 @@ import os
import re import re
import sys import sys
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any
from .language_ids import get_language_id from .language_ids import get_language_id
from .models import ( from .models import (
@ -17,36 +15,6 @@ from .models import (
TestsResult, TestsResult,
) )
_COOKIE_FILE = Path.home() / ".cache" / "cp-nvim" / "cookies.json"
def load_platform_cookies(platform: str) -> Any | None:
try:
data = json.loads(_COOKIE_FILE.read_text())
return data.get(platform)
except Exception:
return None
def save_platform_cookies(platform: str, data: Any) -> None:
_COOKIE_FILE.parent.mkdir(parents=True, exist_ok=True)
try:
existing = json.loads(_COOKIE_FILE.read_text())
except Exception:
existing = {}
existing[platform] = data
_COOKIE_FILE.write_text(json.dumps(existing))
def clear_platform_cookies(platform: str) -> None:
try:
existing = json.loads(_COOKIE_FILE.read_text())
existing.pop(platform, None)
_COOKIE_FILE.write_text(json.dumps(existing))
except Exception:
pass
_PRECISION_ABS_REL_RE = re.compile( _PRECISION_ABS_REL_RE = re.compile(
r"(?:absolute|relative)\s+error[^.]*?10\s*[\^{]\s*\{?\s*[-\u2212]\s*(\d+)\s*\}?", r"(?:absolute|relative)\s+error[^.]*?10\s*[\^{]\s*\{?\s*[-\u2212]\s*(\d+)\s*\}?",
re.IGNORECASE, re.IGNORECASE,

View file

@ -9,18 +9,8 @@ from typing import Any
import httpx import httpx
from .base import ( from .base import BaseScraper
BaseScraper, from .timeouts import BROWSER_SESSION_TIMEOUT, HTTP_TIMEOUT
clear_platform_cookies,
load_platform_cookies,
save_platform_cookies,
)
from .timeouts import (
BROWSER_ELEMENT_WAIT,
BROWSER_NAV_TIMEOUT,
BROWSER_SESSION_TIMEOUT,
HTTP_TIMEOUT,
)
from .models import ( from .models import (
ContestListResult, ContestListResult,
ContestSummary, ContestSummary,
@ -41,6 +31,7 @@ HEADERS = {
} }
CONNECTIONS = 8 CONNECTIONS = 8
_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "codechef-cookies.json"
_CC_CHECK_LOGIN_JS = "() => !!document.querySelector('a[href*=\"/users/\"]')" _CC_CHECK_LOGIN_JS = "() => !!document.querySelector('a[href*=\"/users/\"]')"
@ -63,29 +54,6 @@ async def fetch_json(client: httpx.AsyncClient, path: str) -> dict[str, Any]:
return r.json() return r.json()
def _cc_check_logged_in(page) -> bool:
return "dashboard" in page.url or page.evaluate(_CC_CHECK_LOGIN_JS)
def _cc_login_action(credentials: dict[str, str]):
login_error: str | None = None
def login_action(page):
nonlocal login_error
try:
page.locator('input[name="name"]').fill(credentials.get("username", ""))
page.locator('input[name="pass"]').fill(credentials.get("password", ""))
page.locator("input.cc-login-btn").click()
page.wait_for_function(
"() => !window.location.pathname.includes('/login')",
timeout=BROWSER_NAV_TIMEOUT,
)
except Exception as e:
login_error = str(e)
return login_action, lambda: login_error
def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult: def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult:
try: try:
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import] from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
@ -99,32 +67,28 @@ def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult:
_ensure_browser() _ensure_browser()
saved_cookies = load_platform_cookies("codechef") or [] _COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True)
if saved_cookies:
print(json.dumps({"status": "checking_login"}), flush=True)
logged_in = False logged_in = False
login_error: str | None = None
def check_action(page): def check_login(page):
nonlocal logged_in nonlocal logged_in
logged_in = _cc_check_logged_in(page) logged_in = "dashboard" in page.url or page.evaluate(_CC_CHECK_LOGIN_JS)
def login_action(page):
nonlocal login_error
try: try:
with StealthySession( page.locator('input[name="name"]').fill(credentials.get("username", ""))
headless=True, page.locator('input[name="pass"]').fill(credentials.get("password", ""))
timeout=BROWSER_SESSION_TIMEOUT, page.locator("input.cc-login-btn").click()
google_search=False, try:
cookies=saved_cookies, page.wait_for_url(lambda url: "/login" not in url, timeout=3000)
) as session:
session.fetch(
f"{BASE_URL}/", page_action=check_action, network_idle=True
)
if logged_in:
return LoginResult(success=True, error="")
except Exception: except Exception:
pass login_error = "Login failed (bad credentials?)"
return
login_action, get_error = _cc_login_action(credentials) except Exception as e:
login_error = str(e)
try: try:
with StealthySession( with StealthySession(
@ -133,20 +97,11 @@ def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult:
google_search=False, google_search=False,
) as session: ) as session:
print(json.dumps({"status": "logging_in"}), flush=True) print(json.dumps({"status": "logging_in"}), flush=True)
session.fetch( session.fetch(f"{BASE_URL}/login", page_action=login_action)
f"{BASE_URL}/login", page_action=login_action, network_idle=True
)
login_error = get_error()
if login_error: if login_error:
return LoginResult(success=False, error=f"Login failed: {login_error}") return LoginResult(success=False, error=f"Login failed: {login_error}")
logged_in = False session.fetch(f"{BASE_URL}/", page_action=check_login, network_idle=True)
def verify_action(page):
nonlocal logged_in
logged_in = _cc_check_logged_in(page)
session.fetch(f"{BASE_URL}/", page_action=verify_action, network_idle=True)
if not logged_in: if not logged_in:
return LoginResult( return LoginResult(
success=False, error="Login failed (bad credentials?)" success=False, error="Login failed (bad credentials?)"
@ -154,8 +109,8 @@ def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult:
try: try:
browser_cookies = session.context.cookies() browser_cookies = session.context.cookies()
if any(c.get("name") == "userkey" for c in browser_cookies): if browser_cookies:
save_platform_cookies("codechef", browser_cookies) _COOKIE_PATH.write_text(json.dumps(browser_cookies))
except Exception: except Exception:
pass pass
@ -171,7 +126,6 @@ def _submit_headless_codechef(
language_id: str, language_id: str,
credentials: dict[str, str], credentials: dict[str, str],
_retried: bool = False, _retried: bool = False,
_practice: bool = False,
) -> SubmitResult: ) -> SubmitResult:
source_code = Path(file_path).read_text() source_code = Path(file_path).read_text()
@ -187,19 +141,36 @@ def _submit_headless_codechef(
_ensure_browser() _ensure_browser()
_COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True)
saved_cookies: list[dict[str, Any]] = [] saved_cookies: list[dict[str, Any]] = []
if not _retried: if _COOKIE_PATH.exists() and not _retried:
saved_cookies = load_platform_cookies("codechef") or [] try:
saved_cookies = json.loads(_COOKIE_PATH.read_text())
except Exception:
pass
logged_in = bool(saved_cookies) logged_in = bool(saved_cookies) and not _retried
login_error: str | None = None
submit_error: str | None = None submit_error: str | None = None
needs_relogin = False needs_relogin = False
def check_login(page): def check_login(page):
nonlocal logged_in nonlocal logged_in
logged_in = _cc_check_logged_in(page) logged_in = "dashboard" in page.url or page.evaluate(_CC_CHECK_LOGIN_JS)
_login_action, _get_login_error = _cc_login_action(credentials) def login_action(page):
nonlocal login_error
try:
page.locator('input[name="name"]').fill(credentials.get("username", ""))
page.locator('input[name="pass"]').fill(credentials.get("password", ""))
page.locator("input.cc-login-btn").click()
try:
page.wait_for_url(lambda url: "/login" not in url, timeout=3000)
except Exception:
login_error = "Login failed (bad credentials?)"
return
except Exception as e:
login_error = str(e)
def submit_action(page): def submit_action(page):
nonlocal submit_error, needs_relogin nonlocal submit_error, needs_relogin
@ -207,13 +178,12 @@ def _submit_headless_codechef(
needs_relogin = True needs_relogin = True
return return
try: try:
page.wait_for_selector( page.wait_for_selector('[aria-haspopup="listbox"]', timeout=10000)
'[aria-haspopup="listbox"]', timeout=BROWSER_ELEMENT_WAIT
)
page.locator('[aria-haspopup="listbox"]').click() page.locator('[aria-haspopup="listbox"]').click()
page.wait_for_selector('[role="option"]', timeout=BROWSER_ELEMENT_WAIT) page.wait_for_selector('[role="option"]', timeout=5000)
page.locator(f'[role="option"][data-value="{language_id}"]').click() page.locator(f'[role="option"][data-value="{language_id}"]').click()
page.wait_for_timeout(250)
page.locator(".ace_editor").click() page.locator(".ace_editor").click()
page.keyboard.press("Control+a") page.keyboard.press("Control+a")
@ -228,6 +198,7 @@ def _submit_headless_codechef(
}""", }""",
source_code, source_code,
) )
page.wait_for_timeout(125)
page.evaluate( page.evaluate(
"() => document.getElementById('submit_btn').scrollIntoView({block:'center'})" "() => document.getElementById('submit_btn').scrollIntoView({block:'center'})"
@ -242,9 +213,7 @@ def _submit_headless_codechef(
const d = document.querySelector('[role="dialog"], .swal2-popup'); const d = document.querySelector('[role="dialog"], .swal2-popup');
return d ? d.textContent.trim() : null; return d ? d.textContent.trim() : null;
}""") }""")
if dialog_text and "login" in dialog_text.lower(): if dialog_text and (
needs_relogin = True
elif dialog_text and (
"not available for accepting solutions" in dialog_text "not available for accepting solutions" in dialog_text
or "not available for submission" in dialog_text or "not available for submission" in dialog_text
): ):
@ -259,9 +228,9 @@ def _submit_headless_codechef(
headless=True, headless=True,
timeout=BROWSER_SESSION_TIMEOUT, timeout=BROWSER_SESSION_TIMEOUT,
google_search=False, google_search=False,
cookies=saved_cookies if saved_cookies else [], cookies=saved_cookies if (saved_cookies and not _retried) else [],
) as session: ) as session:
if not _retried and not _practice and saved_cookies: if not logged_in:
print(json.dumps({"status": "checking_login"}), flush=True) print(json.dumps({"status": "checking_login"}), flush=True)
session.fetch( session.fetch(
f"{BASE_URL}/", page_action=check_login, network_idle=True f"{BASE_URL}/", page_action=check_login, network_idle=True
@ -269,17 +238,12 @@ def _submit_headless_codechef(
if not logged_in: if not logged_in:
print(json.dumps({"status": "logging_in"}), flush=True) print(json.dumps({"status": "logging_in"}), flush=True)
session.fetch( session.fetch(f"{BASE_URL}/login", page_action=login_action)
f"{BASE_URL}/login", page_action=_login_action, network_idle=True
)
login_error = _get_login_error()
if login_error: if login_error:
return SubmitResult( return SubmitResult(
success=False, error=f"Login failed: {login_error}" success=False, error=f"Login failed: {login_error}"
) )
logged_in = True
if not _practice:
print(json.dumps({"status": "submitting"}), flush=True) print(json.dumps({"status": "submitting"}), flush=True)
submit_url = ( submit_url = (
f"{BASE_URL}/submit/{problem_id}" f"{BASE_URL}/submit/{problem_id}"
@ -290,13 +254,13 @@ def _submit_headless_codechef(
try: try:
browser_cookies = session.context.cookies() browser_cookies = session.context.cookies()
if any(c.get("name") == "userkey" for c in browser_cookies): if browser_cookies and logged_in:
save_platform_cookies("codechef", browser_cookies) _COOKIE_PATH.write_text(json.dumps(browser_cookies))
except Exception: except Exception:
pass pass
if needs_relogin and not _retried: if needs_relogin and not _retried:
clear_platform_cookies("codechef") _COOKIE_PATH.unlink(missing_ok=True)
return _submit_headless_codechef( return _submit_headless_codechef(
contest_id, contest_id,
problem_id, problem_id,
@ -306,14 +270,14 @@ def _submit_headless_codechef(
_retried=True, _retried=True,
) )
if submit_error == "PRACTICE_FALLBACK" and not _practice: if submit_error == "PRACTICE_FALLBACK" and not _retried:
return _submit_headless_codechef( return _submit_headless_codechef(
"PRACTICE", "PRACTICE",
problem_id, problem_id,
file_path, file_path,
language_id, language_id,
credentials, credentials,
_practice=True, _retried=True,
) )
if submit_error: if submit_error:

View file

@ -8,13 +8,7 @@ from typing import Any
import requests import requests
from bs4 import BeautifulSoup, Tag from bs4 import BeautifulSoup, Tag
from .base import ( from .base import BaseScraper, extract_precision
BaseScraper,
clear_platform_cookies,
extract_precision,
load_platform_cookies,
save_platform_cookies,
)
from .models import ( from .models import (
ContestListResult, ContestListResult,
ContestSummary, ContestSummary,
@ -337,33 +331,9 @@ class CodeforcesScraper(BaseScraper):
return await asyncio.to_thread(_login_headless_cf, credentials) return await asyncio.to_thread(_login_headless_cf, credentials)
def _cf_check_logged_in(page) -> bool:
return page.evaluate(
"() => Array.from(document.querySelectorAll('a'))"
".some(a => a.textContent.includes('Logout'))"
)
def _cf_login_action(credentials: dict[str, str]):
login_error: str | None = None
def login_action(page):
nonlocal login_error
try:
page.wait_for_selector('input[name="handleOrEmail"]', timeout=60000)
page.fill('input[name="handleOrEmail"]', credentials.get("username", ""))
page.fill('input[name="password"]', credentials.get("password", ""))
page.locator('#enterForm input[type="submit"]').click()
page.wait_for_url(
lambda url: "/enter" not in url, timeout=BROWSER_NAV_TIMEOUT
)
except Exception as e:
login_error = str(e)
return login_action, lambda: login_error
def _login_headless_cf(credentials: dict[str, str]) -> LoginResult: def _login_headless_cf(credentials: dict[str, str]) -> LoginResult:
from pathlib import Path
try: try:
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import] from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
except ImportError: except ImportError:
@ -376,32 +346,36 @@ def _login_headless_cf(credentials: dict[str, str]) -> LoginResult:
_ensure_browser() _ensure_browser()
saved_cookies = load_platform_cookies("codeforces") or [] cookie_cache = Path.home() / ".cache" / "cp-nvim" / "codeforces-cookies.json"
cookie_cache.parent.mkdir(parents=True, exist_ok=True)
if saved_cookies:
print(json.dumps({"status": "checking_login"}), flush=True)
logged_in = False logged_in = False
login_error: str | None = None
def check_action(page): def check_login(page):
nonlocal logged_in nonlocal logged_in
logged_in = _cf_check_logged_in(page) logged_in = page.evaluate(
"() => Array.from(document.querySelectorAll('a'))"
try: ".some(a => a.textContent.includes('Logout'))"
with StealthySession(
headless=True,
timeout=BROWSER_SESSION_TIMEOUT,
google_search=False,
cookies=saved_cookies,
) as session:
session.fetch(
f"{BASE_URL}/", page_action=check_action, solve_cloudflare=True
) )
if logged_in:
return LoginResult(success=True, error="")
except Exception:
pass
login_action, get_error = _cf_login_action(credentials) def login_action(page):
nonlocal login_error
try:
page.fill(
'input[name="handleOrEmail"]',
credentials.get("username", ""),
)
page.fill(
'input[name="password"]',
credentials.get("password", ""),
)
page.locator('#enterForm input[type="submit"]').click()
page.wait_for_url(
lambda url: "/enter" not in url, timeout=BROWSER_NAV_TIMEOUT
)
except Exception as e:
login_error = str(e)
try: try:
with StealthySession( with StealthySession(
@ -415,17 +389,14 @@ def _login_headless_cf(credentials: dict[str, str]) -> LoginResult:
page_action=login_action, page_action=login_action,
solve_cloudflare=True, solve_cloudflare=True,
) )
login_error = get_error()
if login_error: if login_error:
return LoginResult(success=False, error=f"Login failed: {login_error}") return LoginResult(success=False, error=f"Login failed: {login_error}")
logged_in = False session.fetch(
f"{BASE_URL}/",
def verify_action(page): page_action=check_login,
nonlocal logged_in network_idle=True,
logged_in = _cf_check_logged_in(page) )
session.fetch(f"{BASE_URL}/", page_action=verify_action, network_idle=True)
if not logged_in: if not logged_in:
return LoginResult( return LoginResult(
success=False, error="Login failed (bad credentials?)" success=False, error="Login failed (bad credentials?)"
@ -433,8 +404,8 @@ def _login_headless_cf(credentials: dict[str, str]) -> LoginResult:
try: try:
browser_cookies = session.context.cookies() browser_cookies = session.context.cookies()
if any(c.get("name") == "X-User-Sha1" for c in browser_cookies): if any(c.get("name") == "X-User-Handle" for c in browser_cookies):
save_platform_cookies("codeforces", browser_cookies) cookie_cache.write_text(json.dumps(browser_cookies))
except Exception: except Exception:
pass pass
@ -467,19 +438,44 @@ def _submit_headless(
_ensure_browser() _ensure_browser()
cookie_cache = Path.home() / ".cache" / "cp-nvim" / "codeforces-cookies.json"
cookie_cache.parent.mkdir(parents=True, exist_ok=True)
saved_cookies: list[dict[str, Any]] = [] saved_cookies: list[dict[str, Any]] = []
if not _retried: if cookie_cache.exists():
saved_cookies = load_platform_cookies("codeforces") or [] try:
saved_cookies = json.loads(cookie_cache.read_text())
except Exception:
pass
logged_in = bool(saved_cookies) logged_in = cookie_cache.exists() and not _retried
login_error: str | None = None
submit_error: str | None = None submit_error: str | None = None
needs_relogin = False needs_relogin = False
def check_login(page): def check_login(page):
nonlocal logged_in nonlocal logged_in
logged_in = _cf_check_logged_in(page) logged_in = page.evaluate(
"() => Array.from(document.querySelectorAll('a'))"
".some(a => a.textContent.includes('Logout'))"
)
_login_action, _get_login_error = _cf_login_action(credentials) def login_action(page):
nonlocal login_error
try:
page.fill(
'input[name="handleOrEmail"]',
credentials.get("username", ""),
)
page.fill(
'input[name="password"]',
credentials.get("password", ""),
)
page.locator('#enterForm input[type="submit"]').click()
page.wait_for_url(
lambda url: "/enter" not in url, timeout=BROWSER_NAV_TIMEOUT
)
except Exception as e:
login_error = str(e)
def submit_action(page): def submit_action(page):
nonlocal submit_error, needs_relogin nonlocal submit_error, needs_relogin
@ -524,27 +520,27 @@ def _submit_headless(
headless=True, headless=True,
timeout=BROWSER_SESSION_TIMEOUT, timeout=BROWSER_SESSION_TIMEOUT,
google_search=False, google_search=False,
cookies=saved_cookies if saved_cookies else [], cookies=saved_cookies if (cookie_cache.exists() and not _retried) else [],
) as session: ) as session:
if not _retried and saved_cookies: if not (cookie_cache.exists() and not _retried):
print(json.dumps({"status": "checking_login"}), flush=True) print(json.dumps({"status": "checking_login"}), flush=True)
session.fetch( session.fetch(
f"{BASE_URL}/", page_action=check_login, solve_cloudflare=True f"{BASE_URL}/",
page_action=check_login,
network_idle=True,
) )
if not logged_in: if not logged_in:
print(json.dumps({"status": "logging_in"}), flush=True) print(json.dumps({"status": "logging_in"}), flush=True)
session.fetch( session.fetch(
f"{BASE_URL}/enter", f"{BASE_URL}/enter",
page_action=_login_action, page_action=login_action,
solve_cloudflare=True, solve_cloudflare=True,
) )
login_error = _get_login_error()
if login_error: if login_error:
return SubmitResult( return SubmitResult(
success=False, error=f"Login failed: {login_error}" success=False, error=f"Login failed: {login_error}"
) )
logged_in = True
print(json.dumps({"status": "submitting"}), flush=True) print(json.dumps({"status": "submitting"}), flush=True)
session.fetch( session.fetch(
@ -555,13 +551,13 @@ def _submit_headless(
try: try:
browser_cookies = session.context.cookies() browser_cookies = session.context.cookies()
if any(c.get("name") == "X-User-Sha1" for c in browser_cookies): if any(c.get("name") == "X-User-Handle" for c in browser_cookies):
save_platform_cookies("codeforces", browser_cookies) cookie_cache.write_text(json.dumps(browser_cookies))
except Exception: except Exception:
pass pass
if needs_relogin and not _retried: if needs_relogin and not _retried:
clear_platform_cookies("codeforces") cookie_cache.unlink(missing_ok=True)
return _submit_headless( return _submit_headless(
contest_id, contest_id,
problem_id, problem_id,

View file

@ -10,13 +10,7 @@ from pathlib import Path
import httpx import httpx
from .base import ( from .base import BaseScraper, extract_precision
BaseScraper,
clear_platform_cookies,
extract_precision,
load_platform_cookies,
save_platform_cookies,
)
from .timeouts import HTTP_TIMEOUT from .timeouts import HTTP_TIMEOUT
from .models import ( from .models import (
ContestListResult, ContestListResult,
@ -34,6 +28,8 @@ HEADERS = {
} }
CONNECTIONS = 8 CONNECTIONS = 8
_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "kattis-cookies.json"
TIME_RE = re.compile( TIME_RE = re.compile(
r"CPU Time limit</span>\s*<span[^>]*>\s*(\d+)\s*seconds?\s*</span>", r"CPU Time limit</span>\s*<span[^>]*>\s*(\d+)\s*seconds?\s*</span>",
re.DOTALL, re.DOTALL,
@ -213,24 +209,20 @@ async def _stream_single_problem(client: httpx.AsyncClient, slug: str) -> None:
async def _load_kattis_cookies(client: httpx.AsyncClient) -> None: async def _load_kattis_cookies(client: httpx.AsyncClient) -> None:
data = load_platform_cookies("kattis") if not _COOKIE_PATH.exists():
if isinstance(data, dict): return
for k, v in data.items(): try:
for k, v in json.loads(_COOKIE_PATH.read_text()).items():
client.cookies.set(k, v) client.cookies.set(k, v)
except Exception:
pass
async def _save_kattis_cookies(client: httpx.AsyncClient) -> None: async def _save_kattis_cookies(client: httpx.AsyncClient) -> None:
cookies = dict(client.cookies.items()) cookies = {k: v for k, v in client.cookies.items()}
if cookies: if cookies:
save_platform_cookies("kattis", cookies) _COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True)
_COOKIE_PATH.write_text(json.dumps(cookies))
async def _check_kattis_login(client: httpx.AsyncClient) -> bool:
try:
r = await client.get(BASE_URL, headers=HEADERS, timeout=HTTP_TIMEOUT)
return bool(r.headers.get("x-username"))
except Exception:
return False
async def _do_kattis_login( async def _do_kattis_login(
@ -337,10 +329,9 @@ class KattisScraper(BaseScraper):
return self._submit_error("Missing credentials. Use :CP kattis login") return self._submit_error("Missing credentials. Use :CP kattis login")
async with httpx.AsyncClient(follow_redirects=True) as client: async with httpx.AsyncClient(follow_redirects=True) as client:
await _load_kattis_cookies(client)
if client.cookies:
print(json.dumps({"status": "checking_login"}), flush=True) print(json.dumps({"status": "checking_login"}), flush=True)
else: await _load_kattis_cookies(client)
if not client.cookies:
print(json.dumps({"status": "logging_in"}), flush=True) print(json.dumps({"status": "logging_in"}), flush=True)
ok = await _do_kattis_login(client, username, password) ok = await _do_kattis_login(client, username, password)
if not ok: if not ok:
@ -377,7 +368,7 @@ class KattisScraper(BaseScraper):
return self._submit_error(f"Submit request failed: {e}") return self._submit_error(f"Submit request failed: {e}")
if r.status_code in (400, 403) or r.text == "Request validation failed": if r.status_code in (400, 403) or r.text == "Request validation failed":
clear_platform_cookies("kattis") _COOKIE_PATH.unlink(missing_ok=True)
print(json.dumps({"status": "logging_in"}), flush=True) print(json.dumps({"status": "logging_in"}), flush=True)
ok = await _do_kattis_login(client, username, password) ok = await _do_kattis_login(client, username, password)
if not ok: if not ok:
@ -408,16 +399,6 @@ class KattisScraper(BaseScraper):
return self._login_error("Missing username or password") return self._login_error("Missing username or password")
async with httpx.AsyncClient(follow_redirects=True) as client: async with httpx.AsyncClient(follow_redirects=True) as client:
await _load_kattis_cookies(client)
if client.cookies:
print(json.dumps({"status": "checking_login"}), flush=True)
if await _check_kattis_login(client):
return LoginResult(
success=True,
error="",
credentials={"username": username, "password": password},
)
print(json.dumps({"status": "logging_in"}), flush=True) print(json.dumps({"status": "logging_in"}), flush=True)
ok = await _do_kattis_login(client, username, password) ok = await _do_kattis_login(client, username, password)
if not ok: if not ok:

View file

@ -8,12 +8,7 @@ from typing import Any, cast
import httpx import httpx
from .base import ( from .base import BaseScraper, extract_precision
BaseScraper,
extract_precision,
load_platform_cookies,
save_platform_cookies,
)
from .timeouts import HTTP_TIMEOUT from .timeouts import HTTP_TIMEOUT
from .models import ( from .models import (
ContestListResult, ContestListResult,
@ -32,6 +27,7 @@ HEADERS = {
} }
CONNECTIONS = 4 CONNECTIONS = 4
_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "usaco-cookies.json"
_LOGIN_PATH = "/current/tpcm/login-session.php" _LOGIN_PATH = "/current/tpcm/login-session.php"
_SUBMIT_PATH = "/current/tpcm/submit-solution.php" _SUBMIT_PATH = "/current/tpcm/submit-solution.php"
@ -206,16 +202,20 @@ def _parse_submit_form(
async def _load_usaco_cookies(client: httpx.AsyncClient) -> None: async def _load_usaco_cookies(client: httpx.AsyncClient) -> None:
data = load_platform_cookies("usaco") if not _COOKIE_PATH.exists():
if isinstance(data, dict): return
for k, v in data.items(): try:
for k, v in json.loads(_COOKIE_PATH.read_text()).items():
client.cookies.set(k, v) client.cookies.set(k, v)
except Exception:
pass
async def _save_usaco_cookies(client: httpx.AsyncClient) -> None: async def _save_usaco_cookies(client: httpx.AsyncClient) -> None:
cookies = dict(client.cookies.items()) cookies = {k: v for k, v in client.cookies.items()}
if cookies: if cookies:
save_platform_cookies("usaco", cookies) _COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True)
_COOKIE_PATH.write_text(json.dumps(cookies))
async def _check_usaco_login(client: httpx.AsyncClient, username: str) -> bool: async def _check_usaco_login(client: httpx.AsyncClient, username: str) -> bool: