fix(login): remove cookie fast-path from login subcommand
Problem: `:CP <platform> login` short-circuited on cached cookies/tokens. If an old session was still valid, the new credentials were never tested, so the user got "login successful" even with garbage input. Solution: Always validate credentials against the platform in the login path. Remove cookie/token loading from `_login_headless` (AtCoder), `_login_headless_cf` (CF), `_login_headless_codechef` (CodeChef), and `login` (CSES). For USACO submit, replace the `_check_usaco_login` roundtrip with cookie trust + retry-on-auth-failure (the Kattis pattern). Submit paths are unchanged — cookie fast-paths remain for contest speed. Closes #331
This commit is contained in:
parent
8465e70772
commit
84343d2045
5 changed files with 110 additions and 141 deletions
|
|
@ -306,12 +306,6 @@ def _login_headless(credentials: dict[str, str]) -> LoginResult:
|
||||||
|
|
||||||
cookie_cache = Path.home() / ".cache" / "cp-nvim" / "atcoder-cookies.json"
|
cookie_cache = Path.home() / ".cache" / "cp-nvim" / "atcoder-cookies.json"
|
||||||
cookie_cache.parent.mkdir(parents=True, exist_ok=True)
|
cookie_cache.parent.mkdir(parents=True, exist_ok=True)
|
||||||
saved_cookies: list[dict[str, Any]] = []
|
|
||||||
if cookie_cache.exists():
|
|
||||||
try:
|
|
||||||
saved_cookies = json.loads(cookie_cache.read_text())
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
logged_in = False
|
logged_in = False
|
||||||
login_error: str | None = None
|
login_error: str | None = None
|
||||||
|
|
@ -340,33 +334,25 @@ def _login_headless(credentials: dict[str, str]) -> LoginResult:
|
||||||
headless=True,
|
headless=True,
|
||||||
timeout=BROWSER_SESSION_TIMEOUT,
|
timeout=BROWSER_SESSION_TIMEOUT,
|
||||||
google_search=False,
|
google_search=False,
|
||||||
cookies=saved_cookies if saved_cookies else [],
|
|
||||||
) as session:
|
) as session:
|
||||||
if saved_cookies:
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
||||||
print(json.dumps({"status": "checking_login"}), flush=True)
|
session.fetch(
|
||||||
session.fetch(
|
f"{BASE_URL}/login",
|
||||||
f"{BASE_URL}/home", page_action=check_login, network_idle=True
|
page_action=login_action,
|
||||||
|
solve_cloudflare=True,
|
||||||
|
)
|
||||||
|
if login_error:
|
||||||
|
return LoginResult(
|
||||||
|
success=False, error=f"Login failed: {login_error}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
session.fetch(
|
||||||
|
f"{BASE_URL}/home", page_action=check_login, network_idle=True
|
||||||
|
)
|
||||||
if not logged_in:
|
if not logged_in:
|
||||||
print(json.dumps({"status": "logging_in"}), flush=True)
|
return LoginResult(
|
||||||
session.fetch(
|
success=False, error="Login failed (bad credentials?)"
|
||||||
f"{BASE_URL}/login",
|
|
||||||
page_action=login_action,
|
|
||||||
solve_cloudflare=True,
|
|
||||||
)
|
)
|
||||||
if login_error:
|
|
||||||
return LoginResult(
|
|
||||||
success=False, error=f"Login failed: {login_error}"
|
|
||||||
)
|
|
||||||
|
|
||||||
session.fetch(
|
|
||||||
f"{BASE_URL}/home", page_action=check_login, network_idle=True
|
|
||||||
)
|
|
||||||
if not logged_in:
|
|
||||||
return LoginResult(
|
|
||||||
success=False, error="Login failed (bad credentials?)"
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
browser_cookies = session.context.cookies()
|
browser_cookies = session.context.cookies()
|
||||||
|
|
|
||||||
|
|
@ -65,12 +65,6 @@ def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult:
|
||||||
_ensure_browser()
|
_ensure_browser()
|
||||||
|
|
||||||
_COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True)
|
_COOKIE_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||||
saved_cookies: list[dict[str, Any]] = []
|
|
||||||
if _COOKIE_PATH.exists():
|
|
||||||
try:
|
|
||||||
saved_cookies = json.loads(_COOKIE_PATH.read_text())
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
logged_in = False
|
logged_in = False
|
||||||
login_error: str | None = None
|
login_error: str | None = None
|
||||||
|
|
@ -100,29 +94,21 @@ def _login_headless_codechef(credentials: dict[str, str]) -> LoginResult:
|
||||||
headless=True,
|
headless=True,
|
||||||
timeout=BROWSER_SESSION_TIMEOUT,
|
timeout=BROWSER_SESSION_TIMEOUT,
|
||||||
google_search=False,
|
google_search=False,
|
||||||
cookies=saved_cookies if saved_cookies else [],
|
|
||||||
) as session:
|
) as session:
|
||||||
if saved_cookies:
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
||||||
print(json.dumps({"status": "checking_login"}), flush=True)
|
session.fetch(f"{BASE_URL}/login", page_action=login_action)
|
||||||
session.fetch(
|
if login_error:
|
||||||
f"{BASE_URL}/", page_action=check_login, network_idle=True
|
return LoginResult(
|
||||||
|
success=False, error=f"Login failed: {login_error}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
session.fetch(
|
||||||
|
f"{BASE_URL}/", page_action=check_login, network_idle=True
|
||||||
|
)
|
||||||
if not logged_in:
|
if not logged_in:
|
||||||
print(json.dumps({"status": "logging_in"}), flush=True)
|
return LoginResult(
|
||||||
session.fetch(f"{BASE_URL}/login", page_action=login_action)
|
success=False, error="Login failed (bad credentials?)"
|
||||||
if login_error:
|
|
||||||
return LoginResult(
|
|
||||||
success=False, error=f"Login failed: {login_error}"
|
|
||||||
)
|
|
||||||
|
|
||||||
session.fetch(
|
|
||||||
f"{BASE_URL}/", page_action=check_login, network_idle=True
|
|
||||||
)
|
)
|
||||||
if not logged_in:
|
|
||||||
return LoginResult(
|
|
||||||
success=False, error="Login failed (bad credentials?)"
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
browser_cookies = session.context.cookies()
|
browser_cookies = session.context.cookies()
|
||||||
|
|
|
||||||
|
|
@ -348,12 +348,6 @@ def _login_headless_cf(credentials: dict[str, str]) -> LoginResult:
|
||||||
|
|
||||||
cookie_cache = Path.home() / ".cache" / "cp-nvim" / "codeforces-cookies.json"
|
cookie_cache = Path.home() / ".cache" / "cp-nvim" / "codeforces-cookies.json"
|
||||||
cookie_cache.parent.mkdir(parents=True, exist_ok=True)
|
cookie_cache.parent.mkdir(parents=True, exist_ok=True)
|
||||||
saved_cookies: list[dict[str, Any]] = []
|
|
||||||
if cookie_cache.exists():
|
|
||||||
try:
|
|
||||||
saved_cookies = json.loads(cookie_cache.read_text())
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
logged_in = False
|
logged_in = False
|
||||||
login_error: str | None = None
|
login_error: str | None = None
|
||||||
|
|
@ -388,37 +382,27 @@ def _login_headless_cf(credentials: dict[str, str]) -> LoginResult:
|
||||||
headless=True,
|
headless=True,
|
||||||
timeout=BROWSER_SESSION_TIMEOUT,
|
timeout=BROWSER_SESSION_TIMEOUT,
|
||||||
google_search=False,
|
google_search=False,
|
||||||
cookies=saved_cookies if saved_cookies else [],
|
|
||||||
) as session:
|
) as session:
|
||||||
if saved_cookies:
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
||||||
print(json.dumps({"status": "checking_login"}), flush=True)
|
session.fetch(
|
||||||
session.fetch(
|
f"{BASE_URL}/enter",
|
||||||
f"{BASE_URL}/",
|
page_action=login_action,
|
||||||
page_action=check_login,
|
solve_cloudflare=True,
|
||||||
network_idle=True,
|
)
|
||||||
|
if login_error:
|
||||||
|
return LoginResult(
|
||||||
|
success=False, error=f"Login failed: {login_error}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
session.fetch(
|
||||||
|
f"{BASE_URL}/",
|
||||||
|
page_action=check_login,
|
||||||
|
network_idle=True,
|
||||||
|
)
|
||||||
if not logged_in:
|
if not logged_in:
|
||||||
print(json.dumps({"status": "logging_in"}), flush=True)
|
return LoginResult(
|
||||||
session.fetch(
|
success=False, error="Login failed (bad credentials?)"
|
||||||
f"{BASE_URL}/enter",
|
|
||||||
page_action=login_action,
|
|
||||||
solve_cloudflare=True,
|
|
||||||
)
|
)
|
||||||
if login_error:
|
|
||||||
return LoginResult(
|
|
||||||
success=False, error=f"Login failed: {login_error}"
|
|
||||||
)
|
|
||||||
|
|
||||||
session.fetch(
|
|
||||||
f"{BASE_URL}/",
|
|
||||||
page_action=check_login,
|
|
||||||
network_idle=True,
|
|
||||||
)
|
|
||||||
if not logged_in:
|
|
||||||
return LoginResult(
|
|
||||||
success=False, error="Login failed (bad credentials?)"
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
browser_cookies = session.context.cookies()
|
browser_cookies = session.context.cookies()
|
||||||
|
|
|
||||||
|
|
@ -239,21 +239,6 @@ class CSESScraper(BaseScraper):
|
||||||
return self._login_error("Missing username or password")
|
return self._login_error("Missing username or password")
|
||||||
|
|
||||||
async with httpx.AsyncClient(follow_redirects=True) as client:
|
async with httpx.AsyncClient(follow_redirects=True) as client:
|
||||||
token = credentials.get("token")
|
|
||||||
|
|
||||||
if token:
|
|
||||||
print(json.dumps({"status": "checking_login"}), flush=True)
|
|
||||||
if await self._check_token(client, token):
|
|
||||||
return LoginResult(
|
|
||||||
success=True,
|
|
||||||
error="",
|
|
||||||
credentials={
|
|
||||||
"username": username,
|
|
||||||
"password": password,
|
|
||||||
"token": token,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
print(json.dumps({"status": "logging_in"}), flush=True)
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
||||||
token = await self._web_login(client, username, password)
|
token = await self._web_login(client, username, password)
|
||||||
if not token:
|
if not token:
|
||||||
|
|
|
||||||
|
|
@ -423,11 +423,7 @@ class USACOScraper(BaseScraper):
|
||||||
|
|
||||||
async with httpx.AsyncClient(follow_redirects=True) as client:
|
async with httpx.AsyncClient(follow_redirects=True) as client:
|
||||||
await _load_usaco_cookies(client)
|
await _load_usaco_cookies(client)
|
||||||
print(json.dumps({"status": "checking_login"}), flush=True)
|
if not client.cookies:
|
||||||
logged_in = bool(client.cookies) and await _check_usaco_login(
|
|
||||||
client, username
|
|
||||||
)
|
|
||||||
if not logged_in:
|
|
||||||
print(json.dumps({"status": "logging_in"}), flush=True)
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
||||||
try:
|
try:
|
||||||
ok = await _do_usaco_login(client, username, password)
|
ok = await _do_usaco_login(client, username, password)
|
||||||
|
|
@ -437,45 +433,77 @@ class USACOScraper(BaseScraper):
|
||||||
return self._submit_error("Login failed (bad credentials?)")
|
return self._submit_error("Login failed (bad credentials?)")
|
||||||
await _save_usaco_cookies(client)
|
await _save_usaco_cookies(client)
|
||||||
|
|
||||||
print(json.dumps({"status": "submitting"}), flush=True)
|
result = await self._do_submit(
|
||||||
try:
|
client, problem_id, language_id, source
|
||||||
page_r = await client.get(
|
|
||||||
f"{_AUTH_BASE}/index.php?page=viewproblem2&cpid={problem_id}",
|
|
||||||
headers=HEADERS,
|
|
||||||
timeout=HTTP_TIMEOUT,
|
|
||||||
)
|
|
||||||
form_url, hidden_fields, lang_val = _parse_submit_form(
|
|
||||||
page_r.text, language_id
|
|
||||||
)
|
|
||||||
except Exception:
|
|
||||||
form_url = _AUTH_BASE + _SUBMIT_PATH
|
|
||||||
hidden_fields = {}
|
|
||||||
lang_val = None
|
|
||||||
|
|
||||||
data: dict[str, str] = {"cpid": problem_id, **hidden_fields}
|
|
||||||
data["language"] = lang_val if lang_val is not None else language_id
|
|
||||||
ext = "py" if "python" in language_id.lower() else "cpp"
|
|
||||||
try:
|
|
||||||
r = await client.post(
|
|
||||||
form_url,
|
|
||||||
data=data,
|
|
||||||
files={"sourcefile": (f"solution.{ext}", source, "text/plain")},
|
|
||||||
headers=HEADERS,
|
|
||||||
timeout=HTTP_TIMEOUT,
|
|
||||||
)
|
|
||||||
r.raise_for_status()
|
|
||||||
except Exception as e:
|
|
||||||
return self._submit_error(f"Submit request failed: {e}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
resp = r.json()
|
|
||||||
sid = str(resp.get("submission_id", resp.get("id", "")))
|
|
||||||
except Exception:
|
|
||||||
sid = ""
|
|
||||||
return SubmitResult(
|
|
||||||
success=True, error="", submission_id=sid, verdict="submitted"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if result.success or result.error != "auth_failure":
|
||||||
|
return result
|
||||||
|
|
||||||
|
client.cookies.clear()
|
||||||
|
print(json.dumps({"status": "logging_in"}), flush=True)
|
||||||
|
try:
|
||||||
|
ok = await _do_usaco_login(client, username, password)
|
||||||
|
except Exception as e:
|
||||||
|
return self._submit_error(f"Login failed: {e}")
|
||||||
|
if not ok:
|
||||||
|
return self._submit_error("Login failed (bad credentials?)")
|
||||||
|
await _save_usaco_cookies(client)
|
||||||
|
|
||||||
|
return await self._do_submit(
|
||||||
|
client, problem_id, language_id, source
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _do_submit(
|
||||||
|
self,
|
||||||
|
client: httpx.AsyncClient,
|
||||||
|
problem_id: str,
|
||||||
|
language_id: str,
|
||||||
|
source: bytes,
|
||||||
|
) -> SubmitResult:
|
||||||
|
print(json.dumps({"status": "submitting"}), flush=True)
|
||||||
|
try:
|
||||||
|
page_r = await client.get(
|
||||||
|
f"{_AUTH_BASE}/index.php?page=viewproblem2&cpid={problem_id}",
|
||||||
|
headers=HEADERS,
|
||||||
|
timeout=HTTP_TIMEOUT,
|
||||||
|
)
|
||||||
|
if "login" in page_r.url.path.lower() or "Login" in page_r.text[:2000]:
|
||||||
|
return self._submit_error("auth_failure")
|
||||||
|
form_url, hidden_fields, lang_val = _parse_submit_form(
|
||||||
|
page_r.text, language_id
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
form_url = _AUTH_BASE + _SUBMIT_PATH
|
||||||
|
hidden_fields = {}
|
||||||
|
lang_val = None
|
||||||
|
|
||||||
|
data: dict[str, str] = {"cpid": problem_id, **hidden_fields}
|
||||||
|
data["language"] = lang_val if lang_val is not None else language_id
|
||||||
|
ext = "py" if "python" in language_id.lower() else "cpp"
|
||||||
|
try:
|
||||||
|
r = await client.post(
|
||||||
|
form_url,
|
||||||
|
data=data,
|
||||||
|
files={"sourcefile": (f"solution.{ext}", source, "text/plain")},
|
||||||
|
headers=HEADERS,
|
||||||
|
timeout=HTTP_TIMEOUT,
|
||||||
|
)
|
||||||
|
r.raise_for_status()
|
||||||
|
except Exception as e:
|
||||||
|
return self._submit_error(f"Submit request failed: {e}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
resp = r.json()
|
||||||
|
if resp.get("code") == 0 and "login" in resp.get("message", "").lower():
|
||||||
|
return self._submit_error("auth_failure")
|
||||||
|
sid = str(resp.get("submission_id", resp.get("id", "")))
|
||||||
|
except Exception:
|
||||||
|
sid = ""
|
||||||
|
return SubmitResult(
|
||||||
|
success=True, error="", submission_id=sid, verdict="submitted"
|
||||||
|
)
|
||||||
|
|
||||||
async def login(self, credentials: dict[str, str]) -> LoginResult:
|
async def login(self, credentials: dict[str, str]) -> LoginResult:
|
||||||
username = credentials.get("username", "")
|
username = credentials.get("username", "")
|
||||||
password = credentials.get("password", "")
|
password = credentials.get("password", "")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue