cp.nvim/scrapers/codeforces.py
2025-09-21 15:11:10 -04:00

374 lines
12 KiB
Python

#!/usr/bin/env python3
import json
import re
import sys
from dataclasses import asdict
import cloudscraper
from bs4 import BeautifulSoup, Tag
from .models import (
ContestListResult,
ContestSummary,
MetadataResult,
ProblemSummary,
TestCase,
TestsResult,
)
def scrape(url: str) -> list[TestCase]:
try:
scraper = cloudscraper.create_scraper()
response = scraper.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
input_sections = soup.find_all("div", class_="input")
output_sections = soup.find_all("div", class_="output")
individual_inputs: dict[str, list[str]] = {}
individual_outputs: dict[str, list[str]] = {}
for inp_section in input_sections:
inp_pre = inp_section.find("pre")
if not inp_pre or not isinstance(inp_pre, Tag):
continue
test_line_divs = inp_pre.find_all(
"div", class_=lambda x: x and "test-example-line-" in x
)
if not test_line_divs:
continue
for div in test_line_divs:
classes = div.get("class", [])
class_name = next(
(
cls
for cls in classes
if "test-example-line-" in cls and cls.split("-")[-1].isdigit()
),
None,
)
if not class_name:
continue
test_num = class_name.replace("test-example-line-", "")
if test_num not in individual_inputs:
individual_inputs[test_num] = []
individual_inputs[test_num].append(div.get_text().strip())
for out_section in output_sections:
out_pre = out_section.find("pre")
if not out_pre or not isinstance(out_pre, Tag):
continue
test_line_divs = out_pre.find_all(
"div", class_=lambda x: x and "test-example-line-" in x
)
if not test_line_divs:
continue
for div in test_line_divs:
classes = div.get("class", [])
class_name = next(
(
cls
for cls in classes
if "test-example-line-" in cls and cls.split("-")[-1].isdigit()
),
None,
)
if not class_name:
continue
test_num = class_name.replace("test-example-line-", "")
if test_num not in individual_outputs:
individual_outputs[test_num] = []
individual_outputs[test_num].append(div.get_text().strip())
if individual_inputs and individual_outputs:
common_tests = set(individual_inputs.keys()) & set(
individual_outputs.keys()
)
if common_tests:
tests = []
for test_num in sorted(common_tests):
input_text = "\n".join(individual_inputs[test_num])
output_text = "\n".join(individual_outputs[test_num])
prefixed_input = "1\n" + input_text
tests.append(TestCase(input=prefixed_input, expected=output_text))
return tests
all_inputs = []
all_outputs = []
for inp_section in input_sections:
inp_pre = inp_section.find("pre")
if not inp_pre or not isinstance(inp_pre, Tag):
continue
divs = inp_pre.find_all("div")
if divs:
lines = [div.get_text().strip() for div in divs if isinstance(div, Tag)]
text = "\n".join(lines)
else:
text = inp_pre.get_text().replace("\r", "").strip()
all_inputs.append(text)
for out_section in output_sections:
out_pre = out_section.find("pre")
if not out_pre or not isinstance(out_pre, Tag):
continue
divs = out_pre.find_all("div")
if divs:
lines = [div.get_text().strip() for div in divs if isinstance(div, Tag)]
text = "\n".join(lines)
else:
text = out_pre.get_text().replace("\r", "").strip()
all_outputs.append(text)
if not all_inputs or not all_outputs:
return []
combined_input = "\n".join(all_inputs)
combined_output = "\n".join(all_outputs)
return [TestCase(input=combined_input, expected=combined_output)]
except Exception as e:
print(f"CloudScraper failed: {e}", file=sys.stderr)
return []
def parse_problem_url(contest_id: str, problem_letter: str) -> str:
return (
f"https://codeforces.com/contest/{contest_id}/problem/{problem_letter.upper()}"
)
def extract_problem_limits(soup: BeautifulSoup) -> tuple[int, float]:
timeout_ms = None
memory_mb = None
time_limit_div = soup.find("div", class_="time-limit")
if time_limit_div:
text = time_limit_div.get_text().strip()
match = re.search(r"(\d+) seconds?", text)
if match:
seconds = int(match.group(1))
timeout_ms = seconds * 1000
if timeout_ms is None:
raise ValueError("Could not find valid timeout in time-limit section")
memory_limit_div = soup.find("div", class_="memory-limit")
if memory_limit_div:
text = memory_limit_div.get_text().strip()
match = re.search(r"(\d+) megabytes", text)
if match:
memory_mb = float(match.group(1))
if memory_mb is None:
raise ValueError("Could not find valid memory limit in memory-limit section")
return timeout_ms, memory_mb
def scrape_contest_problems(contest_id: str) -> list[ProblemSummary]:
try:
contest_url: str = f"https://codeforces.com/contest/{contest_id}"
scraper = cloudscraper.create_scraper()
response = scraper.get(contest_url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
problems: list[ProblemSummary] = []
problem_links = soup.find_all(
"a", href=lambda x: x and f"/contest/{contest_id}/problem/" in x
)
for link in problem_links:
if not isinstance(link, Tag):
continue
href: str = str(link.get("href", ""))
if f"/contest/{contest_id}/problem/" in href:
problem_letter: str = href.split("/")[-1].lower()
problem_name: str = link.get_text(strip=True)
if problem_letter and problem_name:
problems.append(
ProblemSummary(id=problem_letter, name=problem_name)
)
seen: set[str] = set()
unique_problems: list[ProblemSummary] = []
for p in problems:
if p.id not in seen:
seen.add(p.id)
unique_problems.append(p)
return unique_problems
except Exception as e:
print(f"Failed to scrape contest problems: {e}", file=sys.stderr)
return []
def scrape_sample_tests(url: str) -> list[TestCase]:
print(f"Scraping: {url}", file=sys.stderr)
return scrape(url)
def scrape_contests() -> list[ContestSummary]:
try:
scraper = cloudscraper.create_scraper()
response = scraper.get("https://codeforces.com/api/contest.list", timeout=10)
response.raise_for_status()
data = response.json()
if data["status"] != "OK":
return []
contests = []
for contest in data["result"]:
contest_id = str(contest["id"])
name = contest["name"]
contests.append(ContestSummary(id=contest_id, name=name, display_name=name))
return contests
except Exception as e:
print(f"Failed to fetch contests: {e}", file=sys.stderr)
return []
def main() -> None:
if len(sys.argv) < 2:
result = MetadataResult(
success=False,
error="Usage: codeforces.py metadata <contest_id> OR codeforces.py tests <contest_id> <problem_letter> OR codeforces.py contests",
)
print(json.dumps(asdict(result)))
sys.exit(1)
mode: str = sys.argv[1]
if mode == "metadata":
if len(sys.argv) != 3:
result = MetadataResult(
success=False, error="Usage: codeforces.py metadata <contest_id>"
)
print(json.dumps(asdict(result)))
sys.exit(1)
contest_id: str = sys.argv[2]
problems: list[ProblemSummary] = scrape_contest_problems(contest_id)
if not problems:
result = MetadataResult(
success=False, error=f"No problems found for contest {contest_id}"
)
print(json.dumps(asdict(result)))
sys.exit(1)
result = MetadataResult(
success=True, error="", contest_id=contest_id, problems=problems
)
print(json.dumps(asdict(result)))
elif mode == "tests":
if len(sys.argv) != 4:
tests_result = TestsResult(
success=False,
error="Usage: codeforces.py tests <contest_id> <problem_letter>",
problem_id="",
url="",
tests=[],
timeout_ms=0,
memory_mb=0,
)
print(json.dumps(asdict(tests_result)))
sys.exit(1)
tests_contest_id: str = sys.argv[2]
problem_letter: str = sys.argv[3]
problem_id: str = tests_contest_id + problem_letter.lower()
url: str = parse_problem_url(tests_contest_id, problem_letter)
tests: list[TestCase] = scrape_sample_tests(url)
try:
scraper = cloudscraper.create_scraper()
response = scraper.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
timeout_ms, memory_mb = extract_problem_limits(soup)
except Exception as e:
tests_result = TestsResult(
success=False,
error=f"Failed to extract constraints: {e}",
problem_id=problem_id,
url=url,
tests=[],
timeout_ms=0,
memory_mb=0,
)
print(json.dumps(asdict(tests_result)))
sys.exit(1)
if not tests:
tests_result = TestsResult(
success=False,
error=f"No tests found for {tests_contest_id} {problem_letter}",
problem_id=problem_id,
url=url,
tests=[],
timeout_ms=timeout_ms,
memory_mb=memory_mb,
)
print(json.dumps(asdict(tests_result)))
sys.exit(1)
tests_result = TestsResult(
success=True,
error="",
problem_id=problem_id,
url=url,
tests=tests,
timeout_ms=timeout_ms,
memory_mb=memory_mb,
)
print(json.dumps(asdict(tests_result)))
elif mode == "contests":
if len(sys.argv) != 2:
contest_result = ContestListResult(
success=False, error="Usage: codeforces.py contests"
)
print(json.dumps(asdict(contest_result)))
sys.exit(1)
contests = scrape_contests()
if not contests:
contest_result = ContestListResult(success=False, error="No contests found")
print(json.dumps(asdict(contest_result)))
sys.exit(1)
contest_result = ContestListResult(success=True, error="", contests=contests)
print(json.dumps(asdict(contest_result)))
else:
result = MetadataResult(
success=False,
error=f"Unknown mode: {mode}. Use 'metadata', 'tests', or 'contests'",
)
print(json.dumps(asdict(result)))
sys.exit(1)
if __name__ == "__main__":
main()