Compare commits
6 commits
eb0dea777e
...
09a03f25aa
| Author | SHA1 | Date | |
|---|---|---|---|
| 09a03f25aa | |||
| 900d96f7ab | |||
| 13438beca7 | |||
| f5992fa9b5 | |||
| 30dc2363da | |||
| cb58062464 |
8 changed files with 389 additions and 216 deletions
|
|
@ -219,4 +219,6 @@ M.LANGUAGE_VERSIONS = {
|
||||||
|
|
||||||
M.DEFAULT_VERSIONS = { cpp = 'c++20', python = 'python3' }
|
M.DEFAULT_VERSIONS = { cpp = 'c++20', python = 'python3' }
|
||||||
|
|
||||||
|
M.COOKIE_FILE = vim.fn.expand('~/.cache/cp-nvim/cookies.json')
|
||||||
|
|
||||||
return M
|
return M
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,7 @@ local function prompt_and_login(platform, display)
|
||||||
end, function(result)
|
end, function(result)
|
||||||
vim.schedule(function()
|
vim.schedule(function()
|
||||||
if result.success then
|
if result.success then
|
||||||
|
cache.set_credentials(platform, credentials)
|
||||||
logger.log(
|
logger.log(
|
||||||
display .. ' login successful',
|
display .. ' login successful',
|
||||||
{ level = vim.log.levels.INFO, override = true }
|
{ level = vim.log.levels.INFO, override = true }
|
||||||
|
|
@ -105,6 +106,14 @@ function M.logout(platform)
|
||||||
local display = constants.PLATFORM_DISPLAY_NAMES[platform] or platform
|
local display = constants.PLATFORM_DISPLAY_NAMES[platform] or platform
|
||||||
cache.load()
|
cache.load()
|
||||||
cache.clear_credentials(platform)
|
cache.clear_credentials(platform)
|
||||||
|
local cookie_file = constants.COOKIE_FILE
|
||||||
|
if vim.fn.filereadable(cookie_file) == 1 then
|
||||||
|
local ok, data = pcall(vim.fn.json_decode, vim.fn.readfile(cookie_file, 'b'))
|
||||||
|
if ok and type(data) == 'table' then
|
||||||
|
data[platform] = nil
|
||||||
|
vim.fn.writefile({ vim.fn.json_encode(data) }, cookie_file)
|
||||||
|
end
|
||||||
|
end
|
||||||
logger.log(display .. ' credentials cleared', { level = vim.log.levels.INFO, override = true })
|
logger.log(display .. ' credentials cleared', { level = vim.log.levels.INFO, override = true })
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,6 @@ import os
|
||||||
import re
|
import re
|
||||||
import subprocess
|
import subprocess
|
||||||
import time
|
import time
|
||||||
from pathlib import Path
|
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
import backoff
|
import backoff
|
||||||
|
|
@ -16,7 +15,13 @@ from bs4 import BeautifulSoup, Tag
|
||||||
from requests.adapters import HTTPAdapter
|
from requests.adapters import HTTPAdapter
|
||||||
from urllib3.util.retry import Retry
|
from urllib3.util.retry import Retry
|
||||||
|
|
||||||
from .base import BaseScraper, extract_precision
|
from .base import (
|
||||||
|
BaseScraper,
|
||||||
|
clear_platform_cookies,
|
||||||
|
extract_precision,
|
||||||
|
load_platform_cookies,
|
||||||
|
save_platform_cookies,
|
||||||
|
)
|
||||||
from .models import (
|
from .models import (
|
||||||
ContestListResult,
|
ContestListResult,
|
||||||
ContestSummary,
|
ContestSummary,
|
||||||
|
|
@ -379,26 +384,15 @@ def _ensure_browser() -> None:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
||||||
def _login_headless(credentials: dict[str, str]) -> LoginResult:
|
def _at_check_logged_in(page) -> bool:
|
||||||
try:
|
return page.evaluate(
|
||||||
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
|
|
||||||
except ImportError:
|
|
||||||
return LoginResult(
|
|
||||||
success=False,
|
|
||||||
error="scrapling is required for AtCoder login. Install it: uv add 'scrapling[fetchers]>=0.4'",
|
|
||||||
)
|
|
||||||
|
|
||||||
_ensure_browser()
|
|
||||||
|
|
||||||
logged_in = False
|
|
||||||
login_error: str | None = None
|
|
||||||
|
|
||||||
def check_login(page):
|
|
||||||
nonlocal logged_in
|
|
||||||
logged_in = page.evaluate(
|
|
||||||
"() => Array.from(document.querySelectorAll('a')).some(a => a.textContent.trim() === 'Sign Out')"
|
"() => Array.from(document.querySelectorAll('a')).some(a => a.textContent.trim() === 'Sign Out')"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _at_login_action(credentials: dict[str, str]):
|
||||||
|
login_error: str | None = None
|
||||||
|
|
||||||
def login_action(page):
|
def login_action(page):
|
||||||
nonlocal login_error
|
nonlocal login_error
|
||||||
try:
|
try:
|
||||||
|
|
@ -412,6 +406,47 @@ def _login_headless(credentials: dict[str, str]) -> LoginResult:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
login_error = str(e)
|
login_error = str(e)
|
||||||
|
|
||||||
|
return login_action, lambda: login_error
|
||||||
|
|
||||||
|
|
||||||
|
def _login_headless(credentials: dict[str, str]) -> LoginResult:
|
||||||
|
try:
|
||||||
|
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
|
||||||
|
except ImportError:
|
||||||
|
return LoginResult(
|
||||||
|
success=False,
|
||||||
|
error="scrapling is required for AtCoder login. Install it: uv add 'scrapling[fetchers]>=0.4'",
|
||||||
|
)
|
||||||
|
|
||||||
|
_ensure_browser()
|
||||||
|
|
||||||
|
saved_cookies = load_platform_cookies("atcoder") or []
|
||||||
|
|
||||||
|
if saved_cookies:
|
||||||
|
print(json.dumps({"status": "checking_login"}), flush=True)
|
||||||
|
logged_in = False
|
||||||
|
|
||||||
|
def check_action(page):
|
||||||
|
nonlocal logged_in
|
||||||
|
logged_in = _at_check_logged_in(page)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with StealthySession(
|
||||||
|
headless=True,
|
||||||
|
timeout=BROWSER_SESSION_TIMEOUT,
|
||||||
|
google_search=False,
|
||||||
|
cookies=saved_cookies,
|
||||||
|
) as session:
|
||||||
|
session.fetch(
|
||||||
|
f"{BASE_URL}/home", page_action=check_action, network_idle=True
|
||||||
|
)
|
||||||
|
if logged_in:
|
||||||
|
return LoginResult(success=True, error="")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
login_action, get_error = _at_login_action(credentials)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with StealthySession(
|
with StealthySession(
|
||||||
headless=True,
|
headless=True,
|
||||||
|
|
@ -424,17 +459,31 @@ def _login_headless(credentials: dict[str, str]) -> LoginResult:
|
||||||
page_action=login_action,
|
page_action=login_action,
|
||||||
solve_cloudflare=True,
|
solve_cloudflare=True,
|
||||||
)
|
)
|
||||||
|
login_error = get_error()
|
||||||
if login_error:
|
if login_error:
|
||||||
return LoginResult(success=False, error=f"Login failed: {login_error}")
|
return LoginResult(success=False, error=f"Login failed: {login_error}")
|
||||||
|
|
||||||
|
logged_in = False
|
||||||
|
|
||||||
|
def verify_action(page):
|
||||||
|
nonlocal logged_in
|
||||||
|
logged_in = _at_check_logged_in(page)
|
||||||
|
|
||||||
session.fetch(
|
session.fetch(
|
||||||
f"{BASE_URL}/home", page_action=check_login, network_idle=True
|
f"{BASE_URL}/home", page_action=verify_action, network_idle=True
|
||||||
)
|
)
|
||||||
if not logged_in:
|
if not logged_in:
|
||||||
return LoginResult(
|
return LoginResult(
|
||||||
success=False, error="Login failed (bad credentials?)"
|
success=False, error="Login failed (bad credentials?)"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
browser_cookies = session.context.cookies()
|
||||||
|
if browser_cookies:
|
||||||
|
save_platform_cookies("atcoder", browser_cookies)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
return LoginResult(success=True, error="")
|
return LoginResult(success=True, error="")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return LoginResult(success=False, error=str(e))
|
return LoginResult(success=False, error=str(e))
|
||||||
|
|
@ -446,6 +495,7 @@ def _submit_headless(
|
||||||
file_path: str,
|
file_path: str,
|
||||||
language_id: str,
|
language_id: str,
|
||||||
credentials: dict[str, str],
|
credentials: dict[str, str],
|
||||||
|
_retried: bool = False,
|
||||||
) -> "SubmitResult":
|
) -> "SubmitResult":
|
||||||
try:
|
try:
|
||||||
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
|
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
|
||||||
|
|
@ -457,26 +507,24 @@ def _submit_headless(
|
||||||
|
|
||||||
_ensure_browser()
|
_ensure_browser()
|
||||||
|
|
||||||
login_error: str | None = None
|
saved_cookies: list[dict[str, Any]] = []
|
||||||
submit_error: str | None = None
|
if not _retried:
|
||||||
|
saved_cookies = load_platform_cookies("atcoder") or []
|
||||||
|
|
||||||
def login_action(page):
|
logged_in = bool(saved_cookies)
|
||||||
nonlocal login_error
|
submit_error: str | None = None
|
||||||
try:
|
needs_relogin = False
|
||||||
_solve_turnstile(page)
|
|
||||||
page.fill('input[name="username"]', credentials.get("username", ""))
|
def check_login(page):
|
||||||
page.fill('input[name="password"]', credentials.get("password", ""))
|
nonlocal logged_in
|
||||||
page.click("#submit")
|
logged_in = _at_check_logged_in(page)
|
||||||
page.wait_for_url(
|
|
||||||
lambda url: "/login" not in url, timeout=BROWSER_NAV_TIMEOUT
|
login_action, get_login_error = _at_login_action(credentials)
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
login_error = str(e)
|
|
||||||
|
|
||||||
def submit_action(page):
|
def submit_action(page):
|
||||||
nonlocal submit_error
|
nonlocal submit_error, needs_relogin
|
||||||
if "/login" in page.url:
|
if "/login" in page.url:
|
||||||
submit_error = "Not logged in after login step"
|
needs_relogin = True
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
_solve_turnstile(page)
|
_solve_turnstile(page)
|
||||||
|
|
@ -488,18 +536,12 @@ def _submit_headless(
|
||||||
f'select[name="data.LanguageId"] option[value="{language_id}"]'
|
f'select[name="data.LanguageId"] option[value="{language_id}"]'
|
||||||
).wait_for(state="attached", timeout=BROWSER_ELEMENT_WAIT)
|
).wait_for(state="attached", timeout=BROWSER_ELEMENT_WAIT)
|
||||||
page.select_option('select[name="data.LanguageId"]', language_id)
|
page.select_option('select[name="data.LanguageId"]', language_id)
|
||||||
ext = _LANGUAGE_ID_EXTENSION.get(
|
page.set_input_files("#input-open-file", file_path)
|
||||||
language_id, Path(file_path).suffix.lstrip(".") or "txt"
|
page.wait_for_function(
|
||||||
|
"() => { const ta = document.getElementById('plain-textarea'); return ta && ta.value.length > 0; }",
|
||||||
|
timeout=BROWSER_ELEMENT_WAIT,
|
||||||
)
|
)
|
||||||
page.set_input_files(
|
page.evaluate("document.getElementById('submit').click()")
|
||||||
"#input-open-file",
|
|
||||||
{
|
|
||||||
"name": f"solution.{ext}",
|
|
||||||
"mimeType": "text/plain",
|
|
||||||
"buffer": Path(file_path).read_bytes(),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
page.locator('button[type="submit"]').click(no_wait_after=True)
|
|
||||||
page.wait_for_url(
|
page.wait_for_url(
|
||||||
lambda url: "/submissions/me" in url,
|
lambda url: "/submissions/me" in url,
|
||||||
timeout=BROWSER_SUBMIT_NAV_TIMEOUT["atcoder"],
|
timeout=BROWSER_SUBMIT_NAV_TIMEOUT["atcoder"],
|
||||||
|
|
@ -512,15 +554,33 @@ def _submit_headless(
|
||||||
headless=True,
|
headless=True,
|
||||||
timeout=BROWSER_SESSION_TIMEOUT,
|
timeout=BROWSER_SESSION_TIMEOUT,
|
||||||
google_search=False,
|
google_search=False,
|
||||||
|
cookies=saved_cookies if saved_cookies else [],
|
||||||
) as session:
|
) as session:
|
||||||
|
if not _retried and saved_cookies:
|
||||||
|
print(json.dumps({"status": "checking_login"}), flush=True)
|
||||||
|
session.fetch(
|
||||||
|
f"{BASE_URL}/home", page_action=check_login, network_idle=True
|
||||||
|
)
|
||||||
|
|
||||||
|
if not logged_in:
|
||||||
print(json.dumps({"status": "logging_in"}), flush=True)
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
||||||
session.fetch(
|
session.fetch(
|
||||||
f"{BASE_URL}/login",
|
f"{BASE_URL}/login",
|
||||||
page_action=login_action,
|
page_action=login_action,
|
||||||
solve_cloudflare=True,
|
solve_cloudflare=True,
|
||||||
)
|
)
|
||||||
|
login_error = get_login_error()
|
||||||
if login_error:
|
if login_error:
|
||||||
return SubmitResult(success=False, error=f"Login failed: {login_error}")
|
return SubmitResult(
|
||||||
|
success=False, error=f"Login failed: {login_error}"
|
||||||
|
)
|
||||||
|
logged_in = True
|
||||||
|
try:
|
||||||
|
browser_cookies = session.context.cookies()
|
||||||
|
if browser_cookies:
|
||||||
|
save_platform_cookies("atcoder", browser_cookies)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
print(json.dumps({"status": "submitting"}), flush=True)
|
print(json.dumps({"status": "submitting"}), flush=True)
|
||||||
session.fetch(
|
session.fetch(
|
||||||
|
|
@ -529,6 +589,17 @@ def _submit_headless(
|
||||||
solve_cloudflare=True,
|
solve_cloudflare=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if needs_relogin and not _retried:
|
||||||
|
clear_platform_cookies("atcoder")
|
||||||
|
return _submit_headless(
|
||||||
|
contest_id,
|
||||||
|
problem_id,
|
||||||
|
file_path,
|
||||||
|
language_id,
|
||||||
|
credentials,
|
||||||
|
_retried=True,
|
||||||
|
)
|
||||||
|
|
||||||
if submit_error:
|
if submit_error:
|
||||||
return SubmitResult(success=False, error=submit_error)
|
return SubmitResult(success=False, error=submit_error)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,8 @@ import os
|
||||||
import re
|
import re
|
||||||
import sys
|
import sys
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
from .language_ids import get_language_id
|
from .language_ids import get_language_id
|
||||||
from .models import (
|
from .models import (
|
||||||
|
|
@ -15,6 +17,36 @@ from .models import (
|
||||||
TestsResult,
|
TestsResult,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
_COOKIE_FILE = Path.home() / ".cache" / "cp-nvim" / "cookies.json"
|
||||||
|
|
||||||
|
|
||||||
|
def load_platform_cookies(platform: str) -> Any | None:
|
||||||
|
try:
|
||||||
|
data = json.loads(_COOKIE_FILE.read_text())
|
||||||
|
return data.get(platform)
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def save_platform_cookies(platform: str, data: Any) -> None:
|
||||||
|
_COOKIE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
try:
|
||||||
|
existing = json.loads(_COOKIE_FILE.read_text())
|
||||||
|
except Exception:
|
||||||
|
existing = {}
|
||||||
|
existing[platform] = data
|
||||||
|
_COOKIE_FILE.write_text(json.dumps(existing))
|
||||||
|
|
||||||
|
|
||||||
|
def clear_platform_cookies(platform: str) -> None:
|
||||||
|
try:
|
||||||
|
existing = json.loads(_COOKIE_FILE.read_text())
|
||||||
|
existing.pop(platform, None)
|
||||||
|
_COOKIE_FILE.write_text(json.dumps(existing))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
_PRECISION_ABS_REL_RE = re.compile(
|
_PRECISION_ABS_REL_RE = re.compile(
|
||||||
r"(?:absolute|relative)\s+error[^.]*?10\s*[\^{]\s*\{?\s*[-\u2212]\s*(\d+)\s*\}?",
|
r"(?:absolute|relative)\s+error[^.]*?10\s*[\^{]\s*\{?\s*[-\u2212]\s*(\d+)\s*\}?",
|
||||||
re.IGNORECASE,
|
re.IGNORECASE,
|
||||||
|
|
|
||||||
|
|
@ -9,8 +9,18 @@ from typing import Any
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
from .base import BaseScraper
|
from .base import (
|
||||||
from .timeouts import BROWSER_SESSION_TIMEOUT, HTTP_TIMEOUT
|
BaseScraper,
|
||||||
|
clear_platform_cookies,
|
||||||
|
load_platform_cookies,
|
||||||
|
save_platform_cookies,
|
||||||
|
)
|
||||||
|
from .timeouts import (
|
||||||
|
BROWSER_ELEMENT_WAIT,
|
||||||
|
BROWSER_NAV_TIMEOUT,
|
||||||
|
BROWSER_SESSION_TIMEOUT,
|
||||||
|
HTTP_TIMEOUT,
|
||||||
|
)
|
||||||
from .models import (
|
from .models import (
|
||||||
ContestListResult,
|
ContestListResult,
|
||||||
ContestSummary,
|
ContestSummary,
|
||||||
|
|
@ -31,7 +41,6 @@ HEADERS = {
|
||||||
}
|
}
|
||||||
CONNECTIONS = 8
|
CONNECTIONS = 8
|
||||||
|
|
||||||
_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "codechef-cookies.json"
|
|
||||||
|
|
||||||
_CC_CHECK_LOGIN_JS = "() => !!document.querySelector('a[href*=\"/users/\"]')"
|
_CC_CHECK_LOGIN_JS = "() => !!document.querySelector('a[href*=\"/users/\"]')"
|
||||||
|
|
||||||
|
|
@ -54,6 +63,29 @@ async def fetch_json(client: httpx.AsyncClient, path: str) -> dict[str, Any]:
|
||||||
return r.json()
|
return r.json()
|
||||||
|
|
||||||
|
|
||||||
|
def _cc_check_logged_in(page) -> bool:
|
||||||
|
return "dashboard" in page.url or page.evaluate(_CC_CHECK_LOGIN_JS)
|
||||||
|
|
||||||
|
|
||||||
|
def _cc_login_action(credentials: dict[str, str]):
|
||||||
|
login_error: str | None = None
|
||||||
|
|
||||||
|
def login_action(page):
|
||||||
|
nonlocal login_error
|
||||||
|
try:
|
||||||
|
page.locator('input[name="name"]').fill(credentials.get("username", ""))
|
||||||
|
page.locator('input[name="pass"]').fill(credentials.get("password", ""))
|
||||||
|
page.locator("input.cc-login-btn").click()
|
||||||
|
page.wait_for_function(
|
||||||
|
"() => !window.location.pathname.includes('/login')",
|
||||||
|
timeout=BROWSER_NAV_TIMEOUT,
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
login_error = str(e)
|
||||||
|
|
||||||
|
return login_action, lambda: login_error
|
||||||
|
|
||||||
|
|
||||||
def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult:
|
def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult:
|
||||||
try:
|
try:
|
||||||
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
|
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
|
||||||
|
|
@ -67,28 +99,32 @@ def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult:
|
||||||
|
|
||||||
_ensure_browser()
|
_ensure_browser()
|
||||||
|
|
||||||
_COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True)
|
saved_cookies = load_platform_cookies("codechef") or []
|
||||||
|
|
||||||
|
if saved_cookies:
|
||||||
|
print(json.dumps({"status": "checking_login"}), flush=True)
|
||||||
logged_in = False
|
logged_in = False
|
||||||
login_error: str | None = None
|
|
||||||
|
|
||||||
def check_login(page):
|
def check_action(page):
|
||||||
nonlocal logged_in
|
nonlocal logged_in
|
||||||
logged_in = "dashboard" in page.url or page.evaluate(_CC_CHECK_LOGIN_JS)
|
logged_in = _cc_check_logged_in(page)
|
||||||
|
|
||||||
def login_action(page):
|
|
||||||
nonlocal login_error
|
|
||||||
try:
|
try:
|
||||||
page.locator('input[name="name"]').fill(credentials.get("username", ""))
|
with StealthySession(
|
||||||
page.locator('input[name="pass"]').fill(credentials.get("password", ""))
|
headless=True,
|
||||||
page.locator("input.cc-login-btn").click()
|
timeout=BROWSER_SESSION_TIMEOUT,
|
||||||
try:
|
google_search=False,
|
||||||
page.wait_for_url(lambda url: "/login" not in url, timeout=3000)
|
cookies=saved_cookies,
|
||||||
|
) as session:
|
||||||
|
session.fetch(
|
||||||
|
f"{BASE_URL}/", page_action=check_action, network_idle=True
|
||||||
|
)
|
||||||
|
if logged_in:
|
||||||
|
return LoginResult(success=True, error="")
|
||||||
except Exception:
|
except Exception:
|
||||||
login_error = "Login failed (bad credentials?)"
|
pass
|
||||||
return
|
|
||||||
except Exception as e:
|
login_action, get_error = _cc_login_action(credentials)
|
||||||
login_error = str(e)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with StealthySession(
|
with StealthySession(
|
||||||
|
|
@ -97,11 +133,20 @@ def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult:
|
||||||
google_search=False,
|
google_search=False,
|
||||||
) as session:
|
) as session:
|
||||||
print(json.dumps({"status": "logging_in"}), flush=True)
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
||||||
session.fetch(f"{BASE_URL}/login", page_action=login_action)
|
session.fetch(
|
||||||
|
f"{BASE_URL}/login", page_action=login_action, network_idle=True
|
||||||
|
)
|
||||||
|
login_error = get_error()
|
||||||
if login_error:
|
if login_error:
|
||||||
return LoginResult(success=False, error=f"Login failed: {login_error}")
|
return LoginResult(success=False, error=f"Login failed: {login_error}")
|
||||||
|
|
||||||
session.fetch(f"{BASE_URL}/", page_action=check_login, network_idle=True)
|
logged_in = False
|
||||||
|
|
||||||
|
def verify_action(page):
|
||||||
|
nonlocal logged_in
|
||||||
|
logged_in = _cc_check_logged_in(page)
|
||||||
|
|
||||||
|
session.fetch(f"{BASE_URL}/", page_action=verify_action, network_idle=True)
|
||||||
if not logged_in:
|
if not logged_in:
|
||||||
return LoginResult(
|
return LoginResult(
|
||||||
success=False, error="Login failed (bad credentials?)"
|
success=False, error="Login failed (bad credentials?)"
|
||||||
|
|
@ -109,8 +154,8 @@ def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
browser_cookies = session.context.cookies()
|
browser_cookies = session.context.cookies()
|
||||||
if browser_cookies:
|
if any(c.get("name") == "userkey" for c in browser_cookies):
|
||||||
_COOKIE_PATH.write_text(json.dumps(browser_cookies))
|
save_platform_cookies("codechef", browser_cookies)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
@ -126,6 +171,7 @@ def _submit_headless_codechef(
|
||||||
language_id: str,
|
language_id: str,
|
||||||
credentials: dict[str, str],
|
credentials: dict[str, str],
|
||||||
_retried: bool = False,
|
_retried: bool = False,
|
||||||
|
_practice: bool = False,
|
||||||
) -> SubmitResult:
|
) -> SubmitResult:
|
||||||
source_code = Path(file_path).read_text()
|
source_code = Path(file_path).read_text()
|
||||||
|
|
||||||
|
|
@ -141,36 +187,19 @@ def _submit_headless_codechef(
|
||||||
|
|
||||||
_ensure_browser()
|
_ensure_browser()
|
||||||
|
|
||||||
_COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
||||||
saved_cookies: list[dict[str, Any]] = []
|
saved_cookies: list[dict[str, Any]] = []
|
||||||
if _COOKIE_PATH.exists() and not _retried:
|
if not _retried:
|
||||||
try:
|
saved_cookies = load_platform_cookies("codechef") or []
|
||||||
saved_cookies = json.loads(_COOKIE_PATH.read_text())
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
logged_in = bool(saved_cookies) and not _retried
|
logged_in = bool(saved_cookies)
|
||||||
login_error: str | None = None
|
|
||||||
submit_error: str | None = None
|
submit_error: str | None = None
|
||||||
needs_relogin = False
|
needs_relogin = False
|
||||||
|
|
||||||
def check_login(page):
|
def check_login(page):
|
||||||
nonlocal logged_in
|
nonlocal logged_in
|
||||||
logged_in = "dashboard" in page.url or page.evaluate(_CC_CHECK_LOGIN_JS)
|
logged_in = _cc_check_logged_in(page)
|
||||||
|
|
||||||
def login_action(page):
|
_login_action, _get_login_error = _cc_login_action(credentials)
|
||||||
nonlocal login_error
|
|
||||||
try:
|
|
||||||
page.locator('input[name="name"]').fill(credentials.get("username", ""))
|
|
||||||
page.locator('input[name="pass"]').fill(credentials.get("password", ""))
|
|
||||||
page.locator("input.cc-login-btn").click()
|
|
||||||
try:
|
|
||||||
page.wait_for_url(lambda url: "/login" not in url, timeout=3000)
|
|
||||||
except Exception:
|
|
||||||
login_error = "Login failed (bad credentials?)"
|
|
||||||
return
|
|
||||||
except Exception as e:
|
|
||||||
login_error = str(e)
|
|
||||||
|
|
||||||
def submit_action(page):
|
def submit_action(page):
|
||||||
nonlocal submit_error, needs_relogin
|
nonlocal submit_error, needs_relogin
|
||||||
|
|
@ -178,12 +207,13 @@ def _submit_headless_codechef(
|
||||||
needs_relogin = True
|
needs_relogin = True
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
page.wait_for_selector('[aria-haspopup="listbox"]', timeout=10000)
|
page.wait_for_selector(
|
||||||
|
'[aria-haspopup="listbox"]', timeout=BROWSER_ELEMENT_WAIT
|
||||||
|
)
|
||||||
|
|
||||||
page.locator('[aria-haspopup="listbox"]').click()
|
page.locator('[aria-haspopup="listbox"]').click()
|
||||||
page.wait_for_selector('[role="option"]', timeout=5000)
|
page.wait_for_selector('[role="option"]', timeout=BROWSER_ELEMENT_WAIT)
|
||||||
page.locator(f'[role="option"][data-value="{language_id}"]').click()
|
page.locator(f'[role="option"][data-value="{language_id}"]').click()
|
||||||
page.wait_for_timeout(250)
|
|
||||||
|
|
||||||
page.locator(".ace_editor").click()
|
page.locator(".ace_editor").click()
|
||||||
page.keyboard.press("Control+a")
|
page.keyboard.press("Control+a")
|
||||||
|
|
@ -198,7 +228,6 @@ def _submit_headless_codechef(
|
||||||
}""",
|
}""",
|
||||||
source_code,
|
source_code,
|
||||||
)
|
)
|
||||||
page.wait_for_timeout(125)
|
|
||||||
|
|
||||||
page.evaluate(
|
page.evaluate(
|
||||||
"() => document.getElementById('submit_btn').scrollIntoView({block:'center'})"
|
"() => document.getElementById('submit_btn').scrollIntoView({block:'center'})"
|
||||||
|
|
@ -213,7 +242,9 @@ def _submit_headless_codechef(
|
||||||
const d = document.querySelector('[role="dialog"], .swal2-popup');
|
const d = document.querySelector('[role="dialog"], .swal2-popup');
|
||||||
return d ? d.textContent.trim() : null;
|
return d ? d.textContent.trim() : null;
|
||||||
}""")
|
}""")
|
||||||
if dialog_text and (
|
if dialog_text and "login" in dialog_text.lower():
|
||||||
|
needs_relogin = True
|
||||||
|
elif dialog_text and (
|
||||||
"not available for accepting solutions" in dialog_text
|
"not available for accepting solutions" in dialog_text
|
||||||
or "not available for submission" in dialog_text
|
or "not available for submission" in dialog_text
|
||||||
):
|
):
|
||||||
|
|
@ -228,9 +259,9 @@ def _submit_headless_codechef(
|
||||||
headless=True,
|
headless=True,
|
||||||
timeout=BROWSER_SESSION_TIMEOUT,
|
timeout=BROWSER_SESSION_TIMEOUT,
|
||||||
google_search=False,
|
google_search=False,
|
||||||
cookies=saved_cookies if (saved_cookies and not _retried) else [],
|
cookies=saved_cookies if saved_cookies else [],
|
||||||
) as session:
|
) as session:
|
||||||
if not logged_in:
|
if not _retried and not _practice and saved_cookies:
|
||||||
print(json.dumps({"status": "checking_login"}), flush=True)
|
print(json.dumps({"status": "checking_login"}), flush=True)
|
||||||
session.fetch(
|
session.fetch(
|
||||||
f"{BASE_URL}/", page_action=check_login, network_idle=True
|
f"{BASE_URL}/", page_action=check_login, network_idle=True
|
||||||
|
|
@ -238,12 +269,17 @@ def _submit_headless_codechef(
|
||||||
|
|
||||||
if not logged_in:
|
if not logged_in:
|
||||||
print(json.dumps({"status": "logging_in"}), flush=True)
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
||||||
session.fetch(f"{BASE_URL}/login", page_action=login_action)
|
session.fetch(
|
||||||
|
f"{BASE_URL}/login", page_action=_login_action, network_idle=True
|
||||||
|
)
|
||||||
|
login_error = _get_login_error()
|
||||||
if login_error:
|
if login_error:
|
||||||
return SubmitResult(
|
return SubmitResult(
|
||||||
success=False, error=f"Login failed: {login_error}"
|
success=False, error=f"Login failed: {login_error}"
|
||||||
)
|
)
|
||||||
|
logged_in = True
|
||||||
|
|
||||||
|
if not _practice:
|
||||||
print(json.dumps({"status": "submitting"}), flush=True)
|
print(json.dumps({"status": "submitting"}), flush=True)
|
||||||
submit_url = (
|
submit_url = (
|
||||||
f"{BASE_URL}/submit/{problem_id}"
|
f"{BASE_URL}/submit/{problem_id}"
|
||||||
|
|
@ -254,13 +290,13 @@ def _submit_headless_codechef(
|
||||||
|
|
||||||
try:
|
try:
|
||||||
browser_cookies = session.context.cookies()
|
browser_cookies = session.context.cookies()
|
||||||
if browser_cookies and logged_in:
|
if any(c.get("name") == "userkey" for c in browser_cookies):
|
||||||
_COOKIE_PATH.write_text(json.dumps(browser_cookies))
|
save_platform_cookies("codechef", browser_cookies)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
if needs_relogin and not _retried:
|
if needs_relogin and not _retried:
|
||||||
_COOKIE_PATH.unlink(missing_ok=True)
|
clear_platform_cookies("codechef")
|
||||||
return _submit_headless_codechef(
|
return _submit_headless_codechef(
|
||||||
contest_id,
|
contest_id,
|
||||||
problem_id,
|
problem_id,
|
||||||
|
|
@ -270,14 +306,14 @@ def _submit_headless_codechef(
|
||||||
_retried=True,
|
_retried=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
if submit_error == "PRACTICE_FALLBACK" and not _retried:
|
if submit_error == "PRACTICE_FALLBACK" and not _practice:
|
||||||
return _submit_headless_codechef(
|
return _submit_headless_codechef(
|
||||||
"PRACTICE",
|
"PRACTICE",
|
||||||
problem_id,
|
problem_id,
|
||||||
file_path,
|
file_path,
|
||||||
language_id,
|
language_id,
|
||||||
credentials,
|
credentials,
|
||||||
_retried=True,
|
_practice=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
if submit_error:
|
if submit_error:
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,13 @@ from typing import Any
|
||||||
import requests
|
import requests
|
||||||
from bs4 import BeautifulSoup, Tag
|
from bs4 import BeautifulSoup, Tag
|
||||||
|
|
||||||
from .base import BaseScraper, extract_precision
|
from .base import (
|
||||||
|
BaseScraper,
|
||||||
|
clear_platform_cookies,
|
||||||
|
extract_precision,
|
||||||
|
load_platform_cookies,
|
||||||
|
save_platform_cookies,
|
||||||
|
)
|
||||||
from .models import (
|
from .models import (
|
||||||
ContestListResult,
|
ContestListResult,
|
||||||
ContestSummary,
|
ContestSummary,
|
||||||
|
|
@ -331,9 +337,33 @@ class CodeforcesScraper(BaseScraper):
|
||||||
return await asyncio.to_thread(_login_headless_cf, credentials)
|
return await asyncio.to_thread(_login_headless_cf, credentials)
|
||||||
|
|
||||||
|
|
||||||
def _login_headless_cf(credentials: dict[str, str]) -> LoginResult:
|
def _cf_check_logged_in(page) -> bool:
|
||||||
from pathlib import Path
|
return page.evaluate(
|
||||||
|
"() => Array.from(document.querySelectorAll('a'))"
|
||||||
|
".some(a => a.textContent.includes('Logout'))"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _cf_login_action(credentials: dict[str, str]):
|
||||||
|
login_error: str | None = None
|
||||||
|
|
||||||
|
def login_action(page):
|
||||||
|
nonlocal login_error
|
||||||
|
try:
|
||||||
|
page.wait_for_selector('input[name="handleOrEmail"]', timeout=60000)
|
||||||
|
page.fill('input[name="handleOrEmail"]', credentials.get("username", ""))
|
||||||
|
page.fill('input[name="password"]', credentials.get("password", ""))
|
||||||
|
page.locator('#enterForm input[type="submit"]').click()
|
||||||
|
page.wait_for_url(
|
||||||
|
lambda url: "/enter" not in url, timeout=BROWSER_NAV_TIMEOUT
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
login_error = str(e)
|
||||||
|
|
||||||
|
return login_action, lambda: login_error
|
||||||
|
|
||||||
|
|
||||||
|
def _login_headless_cf(credentials: dict[str, str]) -> LoginResult:
|
||||||
try:
|
try:
|
||||||
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
|
from scrapling.fetchers import StealthySession # type: ignore[import-untyped,unresolved-import]
|
||||||
except ImportError:
|
except ImportError:
|
||||||
|
|
@ -346,36 +376,32 @@ def _login_headless_cf(credentials: dict[str, str]) -> LoginResult:
|
||||||
|
|
||||||
_ensure_browser()
|
_ensure_browser()
|
||||||
|
|
||||||
cookie_cache = Path.home() / ".cache" / "cp-nvim" / "codeforces-cookies.json"
|
saved_cookies = load_platform_cookies("codeforces") or []
|
||||||
cookie_cache.parent.mkdir(parents=True, exist_ok=True)
|
|
||||||
|
|
||||||
|
if saved_cookies:
|
||||||
|
print(json.dumps({"status": "checking_login"}), flush=True)
|
||||||
logged_in = False
|
logged_in = False
|
||||||
login_error: str | None = None
|
|
||||||
|
|
||||||
def check_login(page):
|
def check_action(page):
|
||||||
nonlocal logged_in
|
nonlocal logged_in
|
||||||
logged_in = page.evaluate(
|
logged_in = _cf_check_logged_in(page)
|
||||||
"() => Array.from(document.querySelectorAll('a'))"
|
|
||||||
".some(a => a.textContent.includes('Logout'))"
|
|
||||||
)
|
|
||||||
|
|
||||||
def login_action(page):
|
|
||||||
nonlocal login_error
|
|
||||||
try:
|
try:
|
||||||
page.fill(
|
with StealthySession(
|
||||||
'input[name="handleOrEmail"]',
|
headless=True,
|
||||||
credentials.get("username", ""),
|
timeout=BROWSER_SESSION_TIMEOUT,
|
||||||
|
google_search=False,
|
||||||
|
cookies=saved_cookies,
|
||||||
|
) as session:
|
||||||
|
session.fetch(
|
||||||
|
f"{BASE_URL}/", page_action=check_action, solve_cloudflare=True
|
||||||
)
|
)
|
||||||
page.fill(
|
if logged_in:
|
||||||
'input[name="password"]',
|
return LoginResult(success=True, error="")
|
||||||
credentials.get("password", ""),
|
except Exception:
|
||||||
)
|
pass
|
||||||
page.locator('#enterForm input[type="submit"]').click()
|
|
||||||
page.wait_for_url(
|
login_action, get_error = _cf_login_action(credentials)
|
||||||
lambda url: "/enter" not in url, timeout=BROWSER_NAV_TIMEOUT
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
login_error = str(e)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with StealthySession(
|
with StealthySession(
|
||||||
|
|
@ -389,14 +415,17 @@ def _login_headless_cf(credentials: dict[str, str]) -> LoginResult:
|
||||||
page_action=login_action,
|
page_action=login_action,
|
||||||
solve_cloudflare=True,
|
solve_cloudflare=True,
|
||||||
)
|
)
|
||||||
|
login_error = get_error()
|
||||||
if login_error:
|
if login_error:
|
||||||
return LoginResult(success=False, error=f"Login failed: {login_error}")
|
return LoginResult(success=False, error=f"Login failed: {login_error}")
|
||||||
|
|
||||||
session.fetch(
|
logged_in = False
|
||||||
f"{BASE_URL}/",
|
|
||||||
page_action=check_login,
|
def verify_action(page):
|
||||||
network_idle=True,
|
nonlocal logged_in
|
||||||
)
|
logged_in = _cf_check_logged_in(page)
|
||||||
|
|
||||||
|
session.fetch(f"{BASE_URL}/", page_action=verify_action, network_idle=True)
|
||||||
if not logged_in:
|
if not logged_in:
|
||||||
return LoginResult(
|
return LoginResult(
|
||||||
success=False, error="Login failed (bad credentials?)"
|
success=False, error="Login failed (bad credentials?)"
|
||||||
|
|
@ -404,8 +433,8 @@ def _login_headless_cf(credentials: dict[str, str]) -> LoginResult:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
browser_cookies = session.context.cookies()
|
browser_cookies = session.context.cookies()
|
||||||
if any(c.get("name") == "X-User-Handle" for c in browser_cookies):
|
if any(c.get("name") == "X-User-Sha1" for c in browser_cookies):
|
||||||
cookie_cache.write_text(json.dumps(browser_cookies))
|
save_platform_cookies("codeforces", browser_cookies)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
@ -438,44 +467,19 @@ def _submit_headless(
|
||||||
|
|
||||||
_ensure_browser()
|
_ensure_browser()
|
||||||
|
|
||||||
cookie_cache = Path.home() / ".cache" / "cp-nvim" / "codeforces-cookies.json"
|
|
||||||
cookie_cache.parent.mkdir(parents=True, exist_ok=True)
|
|
||||||
saved_cookies: list[dict[str, Any]] = []
|
saved_cookies: list[dict[str, Any]] = []
|
||||||
if cookie_cache.exists():
|
if not _retried:
|
||||||
try:
|
saved_cookies = load_platform_cookies("codeforces") or []
|
||||||
saved_cookies = json.loads(cookie_cache.read_text())
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
logged_in = cookie_cache.exists() and not _retried
|
logged_in = bool(saved_cookies)
|
||||||
login_error: str | None = None
|
|
||||||
submit_error: str | None = None
|
submit_error: str | None = None
|
||||||
needs_relogin = False
|
needs_relogin = False
|
||||||
|
|
||||||
def check_login(page):
|
def check_login(page):
|
||||||
nonlocal logged_in
|
nonlocal logged_in
|
||||||
logged_in = page.evaluate(
|
logged_in = _cf_check_logged_in(page)
|
||||||
"() => Array.from(document.querySelectorAll('a'))"
|
|
||||||
".some(a => a.textContent.includes('Logout'))"
|
|
||||||
)
|
|
||||||
|
|
||||||
def login_action(page):
|
_login_action, _get_login_error = _cf_login_action(credentials)
|
||||||
nonlocal login_error
|
|
||||||
try:
|
|
||||||
page.fill(
|
|
||||||
'input[name="handleOrEmail"]',
|
|
||||||
credentials.get("username", ""),
|
|
||||||
)
|
|
||||||
page.fill(
|
|
||||||
'input[name="password"]',
|
|
||||||
credentials.get("password", ""),
|
|
||||||
)
|
|
||||||
page.locator('#enterForm input[type="submit"]').click()
|
|
||||||
page.wait_for_url(
|
|
||||||
lambda url: "/enter" not in url, timeout=BROWSER_NAV_TIMEOUT
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
login_error = str(e)
|
|
||||||
|
|
||||||
def submit_action(page):
|
def submit_action(page):
|
||||||
nonlocal submit_error, needs_relogin
|
nonlocal submit_error, needs_relogin
|
||||||
|
|
@ -520,27 +524,27 @@ def _submit_headless(
|
||||||
headless=True,
|
headless=True,
|
||||||
timeout=BROWSER_SESSION_TIMEOUT,
|
timeout=BROWSER_SESSION_TIMEOUT,
|
||||||
google_search=False,
|
google_search=False,
|
||||||
cookies=saved_cookies if (cookie_cache.exists() and not _retried) else [],
|
cookies=saved_cookies if saved_cookies else [],
|
||||||
) as session:
|
) as session:
|
||||||
if not (cookie_cache.exists() and not _retried):
|
if not _retried and saved_cookies:
|
||||||
print(json.dumps({"status": "checking_login"}), flush=True)
|
print(json.dumps({"status": "checking_login"}), flush=True)
|
||||||
session.fetch(
|
session.fetch(
|
||||||
f"{BASE_URL}/",
|
f"{BASE_URL}/", page_action=check_login, solve_cloudflare=True
|
||||||
page_action=check_login,
|
|
||||||
network_idle=True,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if not logged_in:
|
if not logged_in:
|
||||||
print(json.dumps({"status": "logging_in"}), flush=True)
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
||||||
session.fetch(
|
session.fetch(
|
||||||
f"{BASE_URL}/enter",
|
f"{BASE_URL}/enter",
|
||||||
page_action=login_action,
|
page_action=_login_action,
|
||||||
solve_cloudflare=True,
|
solve_cloudflare=True,
|
||||||
)
|
)
|
||||||
|
login_error = _get_login_error()
|
||||||
if login_error:
|
if login_error:
|
||||||
return SubmitResult(
|
return SubmitResult(
|
||||||
success=False, error=f"Login failed: {login_error}"
|
success=False, error=f"Login failed: {login_error}"
|
||||||
)
|
)
|
||||||
|
logged_in = True
|
||||||
|
|
||||||
print(json.dumps({"status": "submitting"}), flush=True)
|
print(json.dumps({"status": "submitting"}), flush=True)
|
||||||
session.fetch(
|
session.fetch(
|
||||||
|
|
@ -551,13 +555,13 @@ def _submit_headless(
|
||||||
|
|
||||||
try:
|
try:
|
||||||
browser_cookies = session.context.cookies()
|
browser_cookies = session.context.cookies()
|
||||||
if any(c.get("name") == "X-User-Handle" for c in browser_cookies):
|
if any(c.get("name") == "X-User-Sha1" for c in browser_cookies):
|
||||||
cookie_cache.write_text(json.dumps(browser_cookies))
|
save_platform_cookies("codeforces", browser_cookies)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
if needs_relogin and not _retried:
|
if needs_relogin and not _retried:
|
||||||
cookie_cache.unlink(missing_ok=True)
|
clear_platform_cookies("codeforces")
|
||||||
return _submit_headless(
|
return _submit_headless(
|
||||||
contest_id,
|
contest_id,
|
||||||
problem_id,
|
problem_id,
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,13 @@ from pathlib import Path
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
from .base import BaseScraper, extract_precision
|
from .base import (
|
||||||
|
BaseScraper,
|
||||||
|
clear_platform_cookies,
|
||||||
|
extract_precision,
|
||||||
|
load_platform_cookies,
|
||||||
|
save_platform_cookies,
|
||||||
|
)
|
||||||
from .timeouts import HTTP_TIMEOUT
|
from .timeouts import HTTP_TIMEOUT
|
||||||
from .models import (
|
from .models import (
|
||||||
ContestListResult,
|
ContestListResult,
|
||||||
|
|
@ -28,8 +34,6 @@ HEADERS = {
|
||||||
}
|
}
|
||||||
CONNECTIONS = 8
|
CONNECTIONS = 8
|
||||||
|
|
||||||
_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "kattis-cookies.json"
|
|
||||||
|
|
||||||
TIME_RE = re.compile(
|
TIME_RE = re.compile(
|
||||||
r"CPU Time limit</span>\s*<span[^>]*>\s*(\d+)\s*seconds?\s*</span>",
|
r"CPU Time limit</span>\s*<span[^>]*>\s*(\d+)\s*seconds?\s*</span>",
|
||||||
re.DOTALL,
|
re.DOTALL,
|
||||||
|
|
@ -209,20 +213,24 @@ async def _stream_single_problem(client: httpx.AsyncClient, slug: str) -> None:
|
||||||
|
|
||||||
|
|
||||||
async def _load_kattis_cookies(client: httpx.AsyncClient) -> None:
|
async def _load_kattis_cookies(client: httpx.AsyncClient) -> None:
|
||||||
if not _COOKIE_PATH.exists():
|
data = load_platform_cookies("kattis")
|
||||||
return
|
if isinstance(data, dict):
|
||||||
try:
|
for k, v in data.items():
|
||||||
for k, v in json.loads(_COOKIE_PATH.read_text()).items():
|
|
||||||
client.cookies.set(k, v)
|
client.cookies.set(k, v)
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
async def _save_kattis_cookies(client: httpx.AsyncClient) -> None:
|
async def _save_kattis_cookies(client: httpx.AsyncClient) -> None:
|
||||||
cookies = {k: v for k, v in client.cookies.items()}
|
cookies = dict(client.cookies.items())
|
||||||
if cookies:
|
if cookies:
|
||||||
_COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True)
|
save_platform_cookies("kattis", cookies)
|
||||||
_COOKIE_PATH.write_text(json.dumps(cookies))
|
|
||||||
|
|
||||||
|
async def _check_kattis_login(client: httpx.AsyncClient) -> bool:
|
||||||
|
try:
|
||||||
|
r = await client.get(BASE_URL, headers=HEADERS, timeout=HTTP_TIMEOUT)
|
||||||
|
return bool(r.headers.get("x-username"))
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
async def _do_kattis_login(
|
async def _do_kattis_login(
|
||||||
|
|
@ -329,9 +337,10 @@ class KattisScraper(BaseScraper):
|
||||||
return self._submit_error("Missing credentials. Use :CP kattis login")
|
return self._submit_error("Missing credentials. Use :CP kattis login")
|
||||||
|
|
||||||
async with httpx.AsyncClient(follow_redirects=True) as client:
|
async with httpx.AsyncClient(follow_redirects=True) as client:
|
||||||
print(json.dumps({"status": "checking_login"}), flush=True)
|
|
||||||
await _load_kattis_cookies(client)
|
await _load_kattis_cookies(client)
|
||||||
if not client.cookies:
|
if client.cookies:
|
||||||
|
print(json.dumps({"status": "checking_login"}), flush=True)
|
||||||
|
else:
|
||||||
print(json.dumps({"status": "logging_in"}), flush=True)
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
||||||
ok = await _do_kattis_login(client, username, password)
|
ok = await _do_kattis_login(client, username, password)
|
||||||
if not ok:
|
if not ok:
|
||||||
|
|
@ -368,7 +377,7 @@ class KattisScraper(BaseScraper):
|
||||||
return self._submit_error(f"Submit request failed: {e}")
|
return self._submit_error(f"Submit request failed: {e}")
|
||||||
|
|
||||||
if r.status_code in (400, 403) or r.text == "Request validation failed":
|
if r.status_code in (400, 403) or r.text == "Request validation failed":
|
||||||
_COOKIE_PATH.unlink(missing_ok=True)
|
clear_platform_cookies("kattis")
|
||||||
print(json.dumps({"status": "logging_in"}), flush=True)
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
||||||
ok = await _do_kattis_login(client, username, password)
|
ok = await _do_kattis_login(client, username, password)
|
||||||
if not ok:
|
if not ok:
|
||||||
|
|
@ -399,6 +408,16 @@ class KattisScraper(BaseScraper):
|
||||||
return self._login_error("Missing username or password")
|
return self._login_error("Missing username or password")
|
||||||
|
|
||||||
async with httpx.AsyncClient(follow_redirects=True) as client:
|
async with httpx.AsyncClient(follow_redirects=True) as client:
|
||||||
|
await _load_kattis_cookies(client)
|
||||||
|
if client.cookies:
|
||||||
|
print(json.dumps({"status": "checking_login"}), flush=True)
|
||||||
|
if await _check_kattis_login(client):
|
||||||
|
return LoginResult(
|
||||||
|
success=True,
|
||||||
|
error="",
|
||||||
|
credentials={"username": username, "password": password},
|
||||||
|
)
|
||||||
|
|
||||||
print(json.dumps({"status": "logging_in"}), flush=True)
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
||||||
ok = await _do_kattis_login(client, username, password)
|
ok = await _do_kattis_login(client, username, password)
|
||||||
if not ok:
|
if not ok:
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,12 @@ from typing import Any, cast
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
from .base import BaseScraper, extract_precision
|
from .base import (
|
||||||
|
BaseScraper,
|
||||||
|
extract_precision,
|
||||||
|
load_platform_cookies,
|
||||||
|
save_platform_cookies,
|
||||||
|
)
|
||||||
from .timeouts import HTTP_TIMEOUT
|
from .timeouts import HTTP_TIMEOUT
|
||||||
from .models import (
|
from .models import (
|
||||||
ContestListResult,
|
ContestListResult,
|
||||||
|
|
@ -27,7 +32,6 @@ HEADERS = {
|
||||||
}
|
}
|
||||||
CONNECTIONS = 4
|
CONNECTIONS = 4
|
||||||
|
|
||||||
_COOKIE_PATH = Path.home() / ".cache" / "cp-nvim" / "usaco-cookies.json"
|
|
||||||
_LOGIN_PATH = "/current/tpcm/login-session.php"
|
_LOGIN_PATH = "/current/tpcm/login-session.php"
|
||||||
_SUBMIT_PATH = "/current/tpcm/submit-solution.php"
|
_SUBMIT_PATH = "/current/tpcm/submit-solution.php"
|
||||||
|
|
||||||
|
|
@ -202,20 +206,16 @@ def _parse_submit_form(
|
||||||
|
|
||||||
|
|
||||||
async def _load_usaco_cookies(client: httpx.AsyncClient) -> None:
|
async def _load_usaco_cookies(client: httpx.AsyncClient) -> None:
|
||||||
if not _COOKIE_PATH.exists():
|
data = load_platform_cookies("usaco")
|
||||||
return
|
if isinstance(data, dict):
|
||||||
try:
|
for k, v in data.items():
|
||||||
for k, v in json.loads(_COOKIE_PATH.read_text()).items():
|
|
||||||
client.cookies.set(k, v)
|
client.cookies.set(k, v)
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
async def _save_usaco_cookies(client: httpx.AsyncClient) -> None:
|
async def _save_usaco_cookies(client: httpx.AsyncClient) -> None:
|
||||||
cookies = {k: v for k, v in client.cookies.items()}
|
cookies = dict(client.cookies.items())
|
||||||
if cookies:
|
if cookies:
|
||||||
_COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True)
|
save_platform_cookies("usaco", cookies)
|
||||||
_COOKIE_PATH.write_text(json.dumps(cookies))
|
|
||||||
|
|
||||||
|
|
||||||
async def _check_usaco_login(client: httpx.AsyncClient, username: str) -> bool:
|
async def _check_usaco_login(client: httpx.AsyncClient, username: str) -> bool:
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue