#!/usr/bin/env python3 import json import re import sys from dataclasses import asdict import backoff import requests from bs4 import BeautifulSoup, Tag from .models import ( ContestListResult, ContestSummary, MetadataResult, ProblemSummary, TestCase, TestsResult, ) def extract_problem_limits(soup: BeautifulSoup) -> tuple[int, float]: timeout_ms = None memory_mb = None paragraphs = soup.find_all("p") for p in paragraphs: text = p.get_text() if "Time Limit:" in text and "Memory Limit:" in text: time_match = re.search(r"Time Limit:\s*(\d+)\s*sec", text) if time_match: seconds = int(time_match.group(1)) timeout_ms = seconds * 1000 memory_match = re.search(r"Memory Limit:\s*(\d+)\s*MiB", text) if memory_match: memory_mib = int(memory_match.group(1)) memory_mb = round(memory_mib * 1.048576, 2) break if timeout_ms is None: raise ValueError("Could not find valid timeout in problem constraints") if memory_mb is None: raise ValueError("Could not find valid memory limit in problem constraints") return timeout_ms, memory_mb def parse_problem_url(contest_id: str, problem_letter: str) -> str: task_id: str = f"{contest_id}_{problem_letter}" return f"https://atcoder.jp/contests/{contest_id}/tasks/{task_id}" def extract_problem_from_row(row, contest_id: str) -> ProblemSummary | None: cells = row.find_all("td") if len(cells) < 2: return None task_link = cells[1].find("a") if not task_link: return None task_name = task_link.get_text(strip=True) task_href = task_link.get("href", "") if not task_href: return None task_id = task_href.split("/")[-1] if not task_id.startswith(contest_id + "_"): return None problem_letter = task_id[len(contest_id) + 1 :] if not problem_letter or not task_name: return None return ProblemSummary(id=problem_letter.lower(), name=task_name) def scrape_contest_problems(contest_id: str) -> list[ProblemSummary]: try: contest_url = f"https://atcoder.jp/contests/{contest_id}/tasks" headers = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } response = requests.get(contest_url, headers=headers, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") task_table = soup.find("table", class_="table") if not task_table or not isinstance(task_table, Tag): return [] rows = task_table.find_all("tr")[1:] problems: list[ProblemSummary] = [] for row in rows: problem = extract_problem_from_row(row, contest_id) if problem: problems.append(problem) return problems except Exception as e: print(f"Failed to scrape AtCoder contest problems: {e}", file=sys.stderr) return [] def extract_test_case_from_headers(sample_headers, i: int) -> tuple[str, str] | None: if i >= len(sample_headers): return None header = sample_headers[i] if "input" not in header.get_text().lower(): return None input_pre = header.find_next("pre") if not input_pre or i + 1 >= len(sample_headers): return None next_header = sample_headers[i + 1] if "output" not in next_header.get_text().lower(): return None output_pre = next_header.find_next("pre") if not output_pre: return None input_text = input_pre.get_text().strip().replace("\r", "") output_text = output_pre.get_text().strip().replace("\r", "") if not input_text or not output_text: return None return (input_text, output_text) def scrape(url: str) -> list[TestCase]: try: headers = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") sample_headers = soup.find_all( "h3", string=lambda x: x and "sample" in x.lower() if x else False ) tests: list[TestCase] = [] i = 0 while i < len(sample_headers): test_case = extract_test_case_from_headers(sample_headers, i) if test_case: input_text, output_text = test_case tests.append(TestCase(input=input_text, expected=output_text)) i += 2 else: i += 1 return tests except Exception as e: print(f"Error scraping AtCoder: {e}", file=sys.stderr) return [] def scrape_contests() -> list[ContestSummary]: import concurrent.futures def get_max_pages() -> int: try: headers = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } response = requests.get( "https://atcoder.jp/contests/archive", headers=headers, timeout=10 ) response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") pagination = soup.find("ul", class_="pagination") if not pagination or not isinstance(pagination, Tag): return 15 lis = pagination.find_all("li") if lis and isinstance(lis[-1], Tag): last_li_text = lis[-1].get_text().strip() try: return int(last_li_text) except ValueError: return 15 return 15 except Exception: return 15 def scrape_page(page: int) -> list[ContestSummary]: @backoff.on_exception( backoff.expo, (requests.exceptions.RequestException, requests.exceptions.HTTPError), max_tries=4, jitter=backoff.random_jitter, on_backoff=lambda details: print( f"Request failed on page {page} (attempt {details['tries']}), retrying in {details['wait']:.1f}s: {details['exception']}", file=sys.stderr, ), ) @backoff.on_predicate( backoff.expo, lambda response: response.status_code == 429, max_tries=4, jitter=backoff.random_jitter, on_backoff=lambda details: print( f"Rate limited on page {page}, retrying in {details['wait']:.1f}s", file=sys.stderr, ), ) def make_request() -> requests.Response: headers = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } url = f"https://atcoder.jp/contests/archive?page={page}" response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() return response try: response = make_request() except Exception: return [] soup = BeautifulSoup(response.text, "html.parser") table = soup.find("table", class_="table") if not table: return [] tbody = table.find("tbody") if not tbody or not isinstance(tbody, Tag): return [] rows = tbody.find_all("tr") if not rows: return [] contests = [] for row in rows: cells = row.find_all("td") if len(cells) < 2: continue contest_cell = cells[1] link = contest_cell.find("a") if not link or not link.get("href"): continue href = link.get("href") contest_id = href.split("/")[-1] name = link.get_text().strip() try: name = name.encode().decode("unicode_escape") except (UnicodeDecodeError, UnicodeEncodeError): pass name = ( name.replace("\uff08", "(") .replace("\uff09", ")") .replace("\u3000", " ") ) name = re.sub( r"[\uff01-\uff5e]", lambda m: chr(ord(m.group()) - 0xFEE0), name ) contests.append( ContestSummary(id=contest_id, name=name, display_name=name) ) return contests max_pages = get_max_pages() page_results = {} with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: future_to_page = { executor.submit(scrape_page, page): page for page in range(1, max_pages + 1) } for future in concurrent.futures.as_completed(future_to_page): page = future_to_page[future] page_contests = future.result() page_results[page] = page_contests all_contests = [] for page in sorted(page_results.keys()): all_contests.extend(page_results[page]) return all_contests def main() -> None: if len(sys.argv) < 2: result = MetadataResult( success=False, error="Usage: atcoder.py metadata OR atcoder.py tests OR atcoder.py contests", ) print(json.dumps(asdict(result))) sys.exit(1) mode: str = sys.argv[1] if mode == "metadata": if len(sys.argv) != 3: result = MetadataResult( success=False, error="Usage: atcoder.py metadata ", ) print(json.dumps(asdict(result))) sys.exit(1) contest_id: str = sys.argv[2] problems: list[ProblemSummary] = scrape_contest_problems(contest_id) if not problems: result = MetadataResult( success=False, error=f"No problems found for contest {contest_id}", ) print(json.dumps(asdict(result))) sys.exit(1) result = MetadataResult( success=True, error="", contest_id=contest_id, problems=problems, ) print(json.dumps(asdict(result))) elif mode == "tests": if len(sys.argv) != 4: tests_result = TestsResult( success=False, error="Usage: atcoder.py tests ", problem_id="", url="", tests=[], timeout_ms=0, memory_mb=0, ) print(json.dumps(asdict(tests_result))) sys.exit(1) test_contest_id: str = sys.argv[2] problem_letter: str = sys.argv[3] problem_id: str = f"{test_contest_id}_{problem_letter.lower()}" url: str = parse_problem_url(test_contest_id, problem_letter) tests: list[TestCase] = scrape(url) try: headers = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") timeout_ms, memory_mb = extract_problem_limits(soup) except Exception as e: tests_result = TestsResult( success=False, error=f"Failed to extract constraints: {e}", problem_id=problem_id, url=url, tests=[], timeout_ms=0, memory_mb=0, ) print(json.dumps(asdict(tests_result))) sys.exit(1) if not tests: tests_result = TestsResult( success=False, error=f"No tests found for {test_contest_id} {problem_letter}", problem_id=problem_id, url=url, tests=[], timeout_ms=timeout_ms, memory_mb=memory_mb, ) print(json.dumps(asdict(tests_result))) sys.exit(1) tests_result = TestsResult( success=True, error="", problem_id=problem_id, url=url, tests=tests, timeout_ms=timeout_ms, memory_mb=memory_mb, ) print(json.dumps(asdict(tests_result))) elif mode == "contests": if len(sys.argv) != 2: contest_result = ContestListResult( success=False, error="Usage: atcoder.py contests" ) print(json.dumps(asdict(contest_result))) sys.exit(1) contests = scrape_contests() if not contests: contest_result = ContestListResult(success=False, error="No contests found") print(json.dumps(asdict(contest_result))) sys.exit(1) contest_result = ContestListResult(success=True, error="", contests=contests) print(json.dumps(asdict(contest_result))) else: result = MetadataResult( success=False, error=f"Unknown mode: {mode}. Use 'metadata', 'tests', or 'contests'", ) print(json.dumps(asdict(result))) sys.exit(1) if __name__ == "__main__": main()