cp.nvim/scrapers/cses.py
2025-09-24 21:35:57 -04:00

475 lines
17 KiB
Python

#!/usr/bin/env python3
import json
import re
import sys
from dataclasses import asdict
import backoff
import requests
from bs4 import BeautifulSoup, Tag
from .base import BaseScraper
from .models import (
ContestListResult,
ContestSummary,
MetadataResult,
ProblemSummary,
TestCase,
TestsResult,
)
def normalize_category_name(category_name: str) -> str:
return category_name.lower().replace(" ", "_").replace("&", "and")
def snake_to_title(name: str) -> str:
small_words = {
"a",
"an",
"the",
"and",
"but",
"or",
"nor",
"for",
"so",
"yet",
"at",
"by",
"in",
"of",
"on",
"per",
"to",
"vs",
"via",
}
words: list[str] = name.split("_")
n = len(words)
def fix_word(i_word):
i, word = i_word
lw = word.lower()
return lw.capitalize() if i == 0 or i == n - 1 or lw not in small_words else lw
return " ".join(map(fix_word, enumerate(words)))
@backoff.on_exception(
backoff.expo,
(requests.exceptions.RequestException, requests.exceptions.HTTPError),
max_tries=4,
jitter=backoff.random_jitter,
on_backoff=lambda details: print(
f"Request failed (attempt {details['tries']}), retrying in {details['wait']:.1f}s: {details['exception']}",
file=sys.stderr,
),
)
@backoff.on_predicate(
backoff.expo,
lambda response: response.status_code == 429,
max_tries=4,
jitter=backoff.random_jitter,
on_backoff=lambda details: print(
f"Rate limited, retrying in {details['wait']:.1f}s", file=sys.stderr
),
)
def make_request(url: str, headers: dict) -> requests.Response:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
return response
def scrape_category_problems(category_id: str) -> list[ProblemSummary]:
category_name = snake_to_title(category_id)
try:
problemset_url = "https://cses.fi/problemset/"
headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
response = make_request(problemset_url, headers)
soup = BeautifulSoup(response.text, "html.parser")
current_category = None
problems = []
target_found = False
for element in soup.find_all(["h1", "h2", "ul"]):
if not isinstance(element, Tag):
continue
if element.name in ["h1", "h2"]:
text = element.get_text(strip=True)
if not text or text.startswith("CSES") or text == "CSES Problem Set":
continue
if target_found and current_category != text:
break
current_category = text
if text.lower() == category_name.lower():
target_found = True
elif element.name == "ul" and current_category and target_found:
problem_links = element.find_all(
"a", href=lambda x: x and "/problemset/task/" in x
)
for link in problem_links:
href = link.get("href", "")
if not href:
continue
problem_id = href.split("/")[-1]
problem_name = link.get_text(strip=True)
if not problem_id.isdigit() or not problem_name:
continue
problems.append(ProblemSummary(id=problem_id, name=problem_name))
return problems
except Exception as e:
print(f"Failed to scrape CSES category {category_id}: {e}", file=sys.stderr)
return []
def parse_problem_url(problem_input: str) -> str | None:
if problem_input.startswith("https://cses.fi/problemset/task/"):
return problem_input.rstrip("/")
elif problem_input.isdigit():
return f"https://cses.fi/problemset/task/{problem_input}"
return None
def extract_problem_limits(soup: BeautifulSoup) -> tuple[int, float]:
timeout_ms = None
memory_mb = None
constraints_ul = soup.find("ul", class_="task-constraints")
if not constraints_ul or not isinstance(constraints_ul, Tag):
raise ValueError("Could not find task-constraints section")
for li in constraints_ul.find_all("li"):
text = li.get_text()
if "Time limit:" in text:
match = re.search(r"Time limit:\s*(\d+(?:\.\d+)?)\s*s", text)
if match:
seconds = float(match.group(1))
timeout_ms = int(seconds * 1000)
if "Memory limit:" in text:
match = re.search(r"Memory limit:\s*(\d+)\s*MB", text)
if match:
memory_mb = float(match.group(1))
if timeout_ms is None:
raise ValueError("Could not find valid timeout in task-constraints section")
if memory_mb is None:
raise ValueError(
"Could not find valid memory limit in task-constraints section"
)
return timeout_ms, memory_mb
def scrape_categories() -> list[ContestSummary]:
try:
headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
response = make_request("https://cses.fi/problemset/", headers)
soup = BeautifulSoup(response.text, "html.parser")
categories = []
for h2 in soup.find_all("h2"):
category_name = h2.get_text().strip()
if category_name == "General":
continue
category_id = normalize_category_name(category_name)
display_name = category_name
categories.append(
ContestSummary(
id=category_id, name=category_name, display_name=display_name
)
)
return categories
except Exception as e:
print(f"Failed to scrape CSES categories: {e}", file=sys.stderr)
return []
def process_problem_element(
element,
current_category: str | None,
all_categories: dict[str, list[ProblemSummary]],
) -> str | None:
if element.name == "h1":
category_name = element.get_text().strip()
if category_name not in all_categories:
all_categories[category_name] = []
return category_name
if element.name != "a" or "/problemset/task/" not in element.get("href", ""):
return current_category
href = element.get("href", "")
if not href:
return current_category
problem_id = href.split("/")[-1]
problem_name = element.get_text(strip=True)
if not (problem_id.isdigit() and problem_name and current_category):
return current_category
problem = ProblemSummary(id=problem_id, name=problem_name)
all_categories[current_category].append(problem)
return current_category
def scrape_all_problems() -> dict[str, list[ProblemSummary]]:
try:
problemset_url = "https://cses.fi/problemset/"
headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
response = requests.get(problemset_url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
all_categories: dict[str, list[ProblemSummary]] = {}
current_category = None
for element in soup.find_all(["h1", "h2", "ul"]):
if not isinstance(element, Tag):
continue
if element.name in ["h1", "h2"]:
text = element.get_text(strip=True)
if text and not text.startswith("CSES") and text != "CSES Problem Set":
current_category = text
if current_category not in all_categories:
all_categories[current_category] = []
print(f"Found category: {current_category}", file=sys.stderr)
elif element.name == "ul" and current_category:
problem_links = element.find_all(
"a", href=lambda x: x and "/problemset/task/" in x
)
for link in problem_links:
href = link.get("href", "")
if href:
problem_id = href.split("/")[-1]
problem_name = link.get_text(strip=True)
if problem_id.isdigit() and problem_name:
problem = ProblemSummary(id=problem_id, name=problem_name)
all_categories[current_category].append(problem)
print(
f"Found {len(all_categories)} categories with {sum(len(probs) for probs in all_categories.values())} problems",
file=sys.stderr,
)
return all_categories
except Exception as e:
print(f"Failed to scrape CSES problems: {e}", file=sys.stderr)
return {}
def _collect_section_after(header: Tag) -> list[Tag]:
out: list[Tag] = []
cur = header.find_next_sibling()
while cur and not (isinstance(cur, Tag) and cur.name in ("h1", "h2", "h3")):
if isinstance(cur, Tag):
out.append(cur)
cur = cur.find_next_sibling()
return out
def extract_example_test_cases(soup: BeautifulSoup) -> list[tuple[str, str]]:
example_headers = soup.find_all(
lambda t: isinstance(t, Tag)
and t.name in ("h1", "h2", "h3")
and t.get_text(strip=True).lower().startswith("example")
)
cases: list[tuple[str, str]] = []
for hdr in example_headers:
section = _collect_section_after(hdr)
def find_labeled(label: str) -> str | None:
for node in section:
if not isinstance(node, Tag):
continue
if node.name in ("p", "h4", "h5", "h6"):
txt = node.get_text(strip=True).lower().rstrip(":")
if txt == label:
pre = node.find_next_sibling("pre")
if pre:
return pre.get_text().strip()
return None
inp = find_labeled("input")
out = find_labeled("output")
if not inp or not out:
pres = [n for n in section if isinstance(n, Tag) and n.name == "pre"]
if len(pres) >= 2:
inp = inp or pres[0].get_text().strip()
out = out or pres[1].get_text().strip()
if inp and out:
cases.append((inp, out))
return cases
def scrape(url: str) -> list[TestCase]:
try:
headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
response = make_request(url, headers)
soup = BeautifulSoup(response.text, "html.parser")
pairs = extract_example_test_cases(soup)
return [TestCase(input=inp, expected=out) for (inp, out) in pairs]
except Exception as e:
print(f"Error scraping CSES: {e}", file=sys.stderr)
return []
class CSESScraper(BaseScraper):
@property
def platform_name(self) -> str:
return "cses"
def scrape_contest_metadata(self, contest_id: str) -> MetadataResult:
return self._safe_execute("metadata", self._scrape_metadata_impl, contest_id)
def scrape_problem_tests(self, contest_id: str, problem_id: str) -> TestsResult:
return self._safe_execute(
"tests", self._scrape_tests_impl, contest_id, problem_id
)
def scrape_contest_list(self) -> ContestListResult:
return self._safe_execute("contests", self._scrape_contests_impl)
def _safe_execute(self, operation: str, func, *args):
try:
return func(*args)
except Exception as e:
error_msg = f"{self.platform_name}: {str(e)}"
if operation == "metadata":
return MetadataResult(success=False, error=error_msg)
elif operation == "tests":
return TestsResult(
success=False,
error=error_msg,
problem_id="",
url="",
tests=[],
timeout_ms=0,
memory_mb=0,
)
elif operation == "contests":
return ContestListResult(success=False, error=error_msg)
def _scrape_metadata_impl(self, category_id: str) -> MetadataResult:
problems = scrape_category_problems(category_id)
if not problems:
return MetadataResult(
success=False,
error=f"{self.platform_name}: No problems found for category: {category_id}",
)
return MetadataResult(
success=True, error="", contest_id=category_id, problems=problems
)
def _scrape_tests_impl(self, category: str, problem_id: str) -> TestsResult:
url = parse_problem_url(problem_id)
if not url:
return TestsResult(
success=False,
error=f"{self.platform_name}: Invalid problem input: {problem_id}. Use either problem ID (e.g., 1068) or full URL",
problem_id=problem_id if problem_id.isdigit() else "",
url="",
tests=[],
timeout_ms=0,
memory_mb=0,
)
tests = scrape(url)
m = re.search(r"/task/(\d+)", url)
actual_problem_id = (
problem_id if problem_id.isdigit() else (m.group(1) if m else "")
)
headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
timeout_ms, memory_mb = extract_problem_limits(soup)
if not tests:
return TestsResult(
success=False,
error=f"{self.platform_name}: No tests found for {problem_id}",
problem_id=actual_problem_id,
url=url,
tests=[],
timeout_ms=timeout_ms,
memory_mb=memory_mb,
)
return TestsResult(
success=True,
error="",
problem_id=actual_problem_id,
url=url,
tests=tests,
timeout_ms=timeout_ms,
memory_mb=memory_mb,
)
def _scrape_contests_impl(self) -> ContestListResult:
categories = scrape_categories()
if not categories:
return ContestListResult(
success=False, error=f"{self.platform_name}: No contests found"
)
return ContestListResult(success=True, error="", contests=categories)
def main() -> None:
if len(sys.argv) < 2:
result = MetadataResult(
success=False,
error="Usage: cses.py metadata <category_id> OR cses.py tests <category> <problem_id> OR cses.py contests",
)
print(json.dumps(asdict(result)))
sys.exit(1)
mode: str = sys.argv[1]
scraper = CSESScraper()
if mode == "metadata":
if len(sys.argv) != 3:
result = MetadataResult(
success=False,
error="Usage: cses.py metadata <category_id>",
)
print(json.dumps(asdict(result)))
sys.exit(1)
category_id = sys.argv[2]
result = scraper.scrape_contest_metadata(category_id)
print(json.dumps(asdict(result)))
if not result.success:
sys.exit(1)
elif mode == "tests":
if len(sys.argv) != 4:
tests_result = TestsResult(
success=False,
error="Usage: cses.py tests <category> <problem_id>",
problem_id="",
url="",
tests=[],
timeout_ms=0,
memory_mb=0,
)
print(json.dumps(asdict(tests_result)))
sys.exit(1)
category = sys.argv[2]
problem_id = sys.argv[3]
tests_result = scraper.scrape_problem_tests(category, problem_id)
print(json.dumps(asdict(tests_result)))
if not tests_result.success:
sys.exit(1)
elif mode == "contests":
if len(sys.argv) != 2:
contest_result = ContestListResult(
success=False, error="Usage: cses.py contests"
)
print(json.dumps(asdict(contest_result)))
sys.exit(1)
contest_result = scraper.scrape_contest_list()
print(json.dumps(asdict(contest_result)))
if not contest_result.success:
sys.exit(1)
else:
result = MetadataResult(
success=False,
error=f"Unknown mode: {mode}. Use 'metadata <category>', 'tests <category> <problem_id>', or 'contests'",
)
print(json.dumps(asdict(result)))
sys.exit(1)
if __name__ == "__main__":
main()