From fe9678310e3bbda1717e9a32d7cd83415dc8f46e Mon Sep 17 00:00:00 2001 From: Barrett Ruth Date: Wed, 17 Sep 2025 23:54:37 -0400 Subject: [PATCH] feat: refactor scrapers --- scrapers/atcoder.py | 115 ++++++++++++++++++------------ scrapers/codeforces.py | 155 ++++++++++++++++++++++++++++++++++------- scrapers/cses.py | 106 +++++++++++++++++----------- 3 files changed, 266 insertions(+), 110 deletions(-) diff --git a/scrapers/atcoder.py b/scrapers/atcoder.py index 46d673d..7983cc7 100644 --- a/scrapers/atcoder.py +++ b/scrapers/atcoder.py @@ -12,10 +12,36 @@ def parse_problem_url(contest_id: str, problem_letter: str) -> str: return f"https://atcoder.jp/contests/{contest_id}/tasks/{task_id}" +def extract_problem_from_row(row, contest_id: str) -> dict[str, str] | None: + cells = row.find_all("td") + if len(cells) < 2: + return None + + task_link = cells[1].find("a") + if not task_link: + return None + + task_name = task_link.get_text(strip=True) + task_href = task_link.get("href", "") + + if not task_href: + return None + + task_id = task_href.split("/")[-1] + if not task_id.startswith(contest_id + "_"): + return None + + problem_letter = task_id[len(contest_id) + 1 :] + if not problem_letter or not task_name: + return None + + return {"id": problem_letter.lower(), "name": task_name} + + def scrape_contest_problems(contest_id: str) -> list[dict[str, str]]: try: - contest_url: str = f"https://atcoder.jp/contests/{contest_id}/tasks" - headers: dict[str, str] = { + contest_url = f"https://atcoder.jp/contests/{contest_id}/tasks" + headers = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } @@ -23,31 +49,18 @@ def scrape_contest_problems(contest_id: str) -> list[dict[str, str]]: response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") - problems: list[dict[str, str]] = [] - task_table = soup.find("table", class_="table") + if not task_table: return [] - rows = task_table.find_all("tr")[1:] # Skip header row + rows = task_table.find_all("tr")[1:] + problems = [] for row in rows: - cells = row.find_all("td") - if len(cells) >= 2: - task_link = cells[1].find("a") - if task_link: - task_name: str = task_link.get_text(strip=True) - task_href: str = task_link.get("href", "") - - # Extract problem letter from task name or URL - task_id: str = task_href.split("/")[-1] if task_href else "" - if task_id.startswith(contest_id + "_"): - problem_letter: str = task_id[len(contest_id) + 1 :] - - if problem_letter and task_name: - problems.append( - {"id": problem_letter.lower(), "name": task_name} - ) + problem = extract_problem_from_row(row, contest_id) + if problem: + problems.append(problem) problems.sort(key=lambda x: x["id"]) return problems @@ -57,9 +70,38 @@ def scrape_contest_problems(contest_id: str) -> list[dict[str, str]]: return [] +def extract_test_case_from_headers(sample_headers, i: int) -> tuple[str, str] | None: + if i >= len(sample_headers): + return None + + header = sample_headers[i] + if "input" not in header.get_text().lower(): + return None + + input_pre = header.find_next("pre") + if not input_pre or i + 1 >= len(sample_headers): + return None + + next_header = sample_headers[i + 1] + if "output" not in next_header.get_text().lower(): + return None + + output_pre = next_header.find_next("pre") + if not output_pre: + return None + + input_text = input_pre.get_text().strip().replace("\r", "") + output_text = output_pre.get_text().strip().replace("\r", "") + + if not input_text or not output_text: + return None + + return (input_text, output_text) + + def scrape(url: str) -> list[tuple[str, str]]: try: - headers: dict[str, str] = { + headers = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } @@ -68,33 +110,20 @@ def scrape(url: str) -> list[tuple[str, str]]: soup = BeautifulSoup(response.text, "html.parser") - tests: list[tuple[str, str]] = [] - sample_headers = soup.find_all( "h3", string=lambda x: x and "sample" in x.lower() if x else False ) + tests = [] i = 0 + while i < len(sample_headers): - header = sample_headers[i] - if "input" in header.get_text().lower(): - input_pre = header.find_next("pre") - if input_pre and i + 1 < len(sample_headers): - next_header = sample_headers[i + 1] - if "output" in next_header.get_text().lower(): - output_pre = next_header.find_next("pre") - if output_pre: - input_text: str = ( - input_pre.get_text().strip().replace("\r", "") - ) - output_text: str = ( - output_pre.get_text().strip().replace("\r", "") - ) - if input_text and output_text: - tests.append((input_text, output_text)) - i += 2 - continue - i += 1 + test_case = extract_test_case_from_headers(sample_headers, i) + if test_case: + tests.append(test_case) + i += 2 + else: + i += 1 return tests diff --git a/scrapers/codeforces.py b/scrapers/codeforces.py index 8e89b0a..bf252bd 100644 --- a/scrapers/codeforces.py +++ b/scrapers/codeforces.py @@ -7,6 +7,80 @@ import cloudscraper from bs4 import BeautifulSoup +def extract_combined_text(sections) -> list[str]: + texts = [] + + for section in sections: + pre = section.find("pre") + if not pre: + continue + + divs = pre.find_all("div") + if divs: + lines = [div.get_text().strip() for div in divs] + text = "\n".join(lines) + else: + text = pre.get_text().replace("\r", "").strip() + texts.append(text) + + return texts + + +def extract_lines_by_test_number(sections) -> dict[int, list[str]]: + lines_by_test = {} + + for section in sections: + pre = section.find("pre") + if not pre: + continue + + divs = pre.find_all("div") + for div in divs: + classes = div.get("class", []) + for class_name in classes: + if not class_name.startswith("test-example-line-"): + continue + + try: + test_num = int(class_name.split("-")[-1]) + if test_num not in lines_by_test: + lines_by_test[test_num] = [] + lines_by_test[test_num].append(div.get_text().strip()) + except (ValueError, IndexError): + continue + + return lines_by_test + + +def extract_individual_test_cases( + input_sections, output_sections +) -> list[tuple[str, str]]: + if not input_sections or not output_sections: + return [] + + input_by_test = extract_lines_by_test_number(input_sections) + output_by_test = extract_lines_by_test_number(output_sections) + + if not input_by_test or not output_by_test: + return [] + + tests = [] + test_numbers = sorted(set(input_by_test.keys()) & set(output_by_test.keys())) + + for test_num in test_numbers: + input_lines = input_by_test.get(test_num, []) + output_lines = output_by_test.get(test_num, []) + + if not input_lines or not output_lines: + continue + + input_text = "\n".join(input_lines) + output_text = "\n".join(output_lines) + tests.append((input_text, output_text)) + + return tests + + def scrape(url: str) -> list[tuple[str, str]]: try: scraper = cloudscraper.create_scraper() @@ -19,30 +93,15 @@ def scrape(url: str) -> list[tuple[str, str]]: input_sections = soup.find_all("div", class_="input") output_sections = soup.find_all("div", class_="output") - all_inputs = [] - all_outputs = [] + individual_tests = extract_individual_test_cases( + input_sections, output_sections + ) - for inp_section in input_sections: - inp_pre = inp_section.find("pre") - if inp_pre: - divs = inp_pre.find_all("div") - if divs: - lines = [div.get_text().strip() for div in divs] - text = "\n".join(lines) - else: - text = inp_pre.get_text().replace("\r", "").strip() - all_inputs.append(text) + if individual_tests: + return individual_tests - for out_section in output_sections: - out_pre = out_section.find("pre") - if out_pre: - divs = out_pre.find_all("div") - if divs: - lines = [div.get_text().strip() for div in divs] - text = "\n".join(lines) - else: - text = out_pre.get_text().replace("\r", "").strip() - all_outputs.append(text) + all_inputs = extract_combined_text(input_sections) + all_outputs = extract_combined_text(output_sections) if all_inputs and all_outputs: combined_input = "\n".join(all_inputs) @@ -106,6 +165,39 @@ def scrape_sample_tests(url: str) -> list[tuple[str, str]]: return scrape(url) +def scrape_with_both_formats( + url: str, +) -> tuple[list[tuple[str, str]], tuple[str, str] | None]: + try: + scraper = cloudscraper.create_scraper() + response = scraper.get(url, timeout=10) + response.raise_for_status() + + soup = BeautifulSoup(response.text, "html.parser") + + input_sections = soup.find_all("div", class_="input") + output_sections = soup.find_all("div", class_="output") + + individual_tests = extract_individual_test_cases( + input_sections, output_sections + ) + + all_inputs = extract_combined_text(input_sections) + all_outputs = extract_combined_text(output_sections) + + combined = None + if all_inputs and all_outputs: + combined_input = "\n".join(all_inputs) + combined_output = "\n".join(all_outputs) + combined = (combined_input, combined_output) + + return individual_tests, combined + + except Exception as e: + print(f"CloudScraper failed: {e}", file=sys.stderr) + return [], None + + def main() -> None: if len(sys.argv) < 2: result: dict[str, str | bool] = { @@ -158,9 +250,10 @@ def main() -> None: problem_id: str = contest_id + problem_letter.lower() url: str = parse_problem_url(contest_id, problem_letter) - tests: list[tuple[str, str]] = scrape_sample_tests(url) + print(f"Scraping: {url}", file=sys.stderr) + individual_tests, combined = scrape_with_both_formats(url) - if not tests: + if not individual_tests and not combined: result: dict[str, str | bool] = { "success": False, "error": f"No tests found for {contest_id} {problem_letter}", @@ -171,15 +264,25 @@ def main() -> None: sys.exit(1) test_cases: list[dict[str, str]] = [] - for input_data, output_data in tests: - test_cases.append({"input": input_data, "output": output_data}) + has_individual = len(individual_tests) > 0 + + if has_individual: + for input_data, output_data in individual_tests: + test_cases.append({"input": input_data, "output": output_data}) + elif combined: + test_cases.append({"input": combined[0], "output": combined[1]}) result: dict[str, str | bool | list] = { "success": True, "problem_id": problem_id, "url": url, "test_cases": test_cases, + "has_individual_tests": has_individual, } + + if combined: + result["combined"] = {"input": combined[0], "output": combined[1]} + print(json.dumps(result)) else: diff --git a/scrapers/cses.py b/scrapers/cses.py index 8cd6020..17ecc85 100755 --- a/scrapers/cses.py +++ b/scrapers/cses.py @@ -15,10 +15,36 @@ def parse_problem_url(problem_input: str) -> str | None: return None +def process_problem_element( + element, current_category: str, all_categories: dict +) -> str | None: + if element.name == "h1": + category_name = element.get_text().strip() + if category_name not in all_categories: + all_categories[category_name] = [] + return category_name + + if element.name != "a" or "/problemset/task/" not in element.get("href", ""): + return current_category + + href = element.get("href", "") + if not href: + return current_category + + problem_id = href.split("/")[-1] + problem_name = element.get_text(strip=True) + + if not (problem_id.isdigit() and problem_name and current_category): + return current_category + + all_categories[current_category].append({"id": problem_id, "name": problem_name}) + return current_category + + def scrape_all_problems() -> dict[str, list[dict[str, str]]]: try: - problemset_url: str = "https://cses.fi/problemset/" - headers: dict[str, str] = { + problemset_url = "https://cses.fi/problemset/" + headers = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } @@ -26,28 +52,18 @@ def scrape_all_problems() -> dict[str, list[dict[str, str]]]: response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") - all_categories: dict[str, list[dict[str, str]]] = {} + all_categories = {} problem_links = soup.find_all( "a", href=lambda x: x and "/problemset/task/" in x ) print(f"Found {len(problem_links)} problem links", file=sys.stderr) - current_category: str | None = None + current_category = None for element in soup.find_all(["h1", "a"]): - if element.name == "h1": - current_category = element.get_text().strip() - if current_category not in all_categories: - all_categories[current_category] = [] - elif element.name == "a" and "/problemset/task/" in element.get("href", ""): - href: str = element.get("href", "") - problem_id: str = href.split("/")[-1] - problem_name: str = element.get_text(strip=True) - - if problem_id.isdigit() and problem_name and current_category: - all_categories[current_category].append( - {"id": problem_id, "name": problem_name} - ) + current_category = process_problem_element( + element, current_category, all_categories + ) for category in all_categories: all_categories[category].sort(key=lambda x: int(x["id"])) @@ -60,9 +76,36 @@ def scrape_all_problems() -> dict[str, list[dict[str, str]]]: return {} +def extract_example_test_case(soup) -> tuple[str, str] | None: + example_header = soup.find("h1", string="Example") + if not example_header: + return None + + current = example_header.find_next_sibling() + input_text = None + output_text = None + + while current: + if current.name == "p" and "Input:" in current.get_text(): + input_pre = current.find_next_sibling("pre") + if input_pre: + input_text = input_pre.get_text().strip() + elif current.name == "p" and "Output:" in current.get_text(): + output_pre = current.find_next_sibling("pre") + if output_pre: + output_text = output_pre.get_text().strip() + break + current = current.find_next_sibling() + + if not input_text or not output_text: + return None + + return (input_text, output_text) + + def scrape(url: str) -> list[tuple[str, str]]: try: - headers: dict[str, str] = { + headers = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } @@ -71,30 +114,11 @@ def scrape(url: str) -> list[tuple[str, str]]: soup = BeautifulSoup(response.text, "html.parser") - tests: list[tuple[str, str]] = [] - example_header = soup.find("h1", string="Example") + test_case = extract_example_test_case(soup) + if not test_case: + return [] - if example_header: - current = example_header.find_next_sibling() - input_text: str | None = None - output_text: str | None = None - - while current: - if current.name == "p" and "Input:" in current.get_text(): - input_pre = current.find_next_sibling("pre") - if input_pre: - input_text = input_pre.get_text().strip() - elif current.name == "p" and "Output:" in current.get_text(): - output_pre = current.find_next_sibling("pre") - if output_pre: - output_text = output_pre.get_text().strip() - break - current = current.find_next_sibling() - - if input_text and output_text: - tests.append((input_text, output_text)) - - return tests + return [test_case] except Exception as e: print(f"Error scraping CSES: {e}", file=sys.stderr)