feat: refactor scrapers
This commit is contained in:
parent
24624f9826
commit
fe9678310e
3 changed files with 266 additions and 110 deletions
|
|
@ -7,6 +7,80 @@ import cloudscraper
|
|||
from bs4 import BeautifulSoup
|
||||
|
||||
|
||||
def extract_combined_text(sections) -> list[str]:
|
||||
texts = []
|
||||
|
||||
for section in sections:
|
||||
pre = section.find("pre")
|
||||
if not pre:
|
||||
continue
|
||||
|
||||
divs = pre.find_all("div")
|
||||
if divs:
|
||||
lines = [div.get_text().strip() for div in divs]
|
||||
text = "\n".join(lines)
|
||||
else:
|
||||
text = pre.get_text().replace("\r", "").strip()
|
||||
texts.append(text)
|
||||
|
||||
return texts
|
||||
|
||||
|
||||
def extract_lines_by_test_number(sections) -> dict[int, list[str]]:
|
||||
lines_by_test = {}
|
||||
|
||||
for section in sections:
|
||||
pre = section.find("pre")
|
||||
if not pre:
|
||||
continue
|
||||
|
||||
divs = pre.find_all("div")
|
||||
for div in divs:
|
||||
classes = div.get("class", [])
|
||||
for class_name in classes:
|
||||
if not class_name.startswith("test-example-line-"):
|
||||
continue
|
||||
|
||||
try:
|
||||
test_num = int(class_name.split("-")[-1])
|
||||
if test_num not in lines_by_test:
|
||||
lines_by_test[test_num] = []
|
||||
lines_by_test[test_num].append(div.get_text().strip())
|
||||
except (ValueError, IndexError):
|
||||
continue
|
||||
|
||||
return lines_by_test
|
||||
|
||||
|
||||
def extract_individual_test_cases(
|
||||
input_sections, output_sections
|
||||
) -> list[tuple[str, str]]:
|
||||
if not input_sections or not output_sections:
|
||||
return []
|
||||
|
||||
input_by_test = extract_lines_by_test_number(input_sections)
|
||||
output_by_test = extract_lines_by_test_number(output_sections)
|
||||
|
||||
if not input_by_test or not output_by_test:
|
||||
return []
|
||||
|
||||
tests = []
|
||||
test_numbers = sorted(set(input_by_test.keys()) & set(output_by_test.keys()))
|
||||
|
||||
for test_num in test_numbers:
|
||||
input_lines = input_by_test.get(test_num, [])
|
||||
output_lines = output_by_test.get(test_num, [])
|
||||
|
||||
if not input_lines or not output_lines:
|
||||
continue
|
||||
|
||||
input_text = "\n".join(input_lines)
|
||||
output_text = "\n".join(output_lines)
|
||||
tests.append((input_text, output_text))
|
||||
|
||||
return tests
|
||||
|
||||
|
||||
def scrape(url: str) -> list[tuple[str, str]]:
|
||||
try:
|
||||
scraper = cloudscraper.create_scraper()
|
||||
|
|
@ -19,30 +93,15 @@ def scrape(url: str) -> list[tuple[str, str]]:
|
|||
input_sections = soup.find_all("div", class_="input")
|
||||
output_sections = soup.find_all("div", class_="output")
|
||||
|
||||
all_inputs = []
|
||||
all_outputs = []
|
||||
individual_tests = extract_individual_test_cases(
|
||||
input_sections, output_sections
|
||||
)
|
||||
|
||||
for inp_section in input_sections:
|
||||
inp_pre = inp_section.find("pre")
|
||||
if inp_pre:
|
||||
divs = inp_pre.find_all("div")
|
||||
if divs:
|
||||
lines = [div.get_text().strip() for div in divs]
|
||||
text = "\n".join(lines)
|
||||
else:
|
||||
text = inp_pre.get_text().replace("\r", "").strip()
|
||||
all_inputs.append(text)
|
||||
if individual_tests:
|
||||
return individual_tests
|
||||
|
||||
for out_section in output_sections:
|
||||
out_pre = out_section.find("pre")
|
||||
if out_pre:
|
||||
divs = out_pre.find_all("div")
|
||||
if divs:
|
||||
lines = [div.get_text().strip() for div in divs]
|
||||
text = "\n".join(lines)
|
||||
else:
|
||||
text = out_pre.get_text().replace("\r", "").strip()
|
||||
all_outputs.append(text)
|
||||
all_inputs = extract_combined_text(input_sections)
|
||||
all_outputs = extract_combined_text(output_sections)
|
||||
|
||||
if all_inputs and all_outputs:
|
||||
combined_input = "\n".join(all_inputs)
|
||||
|
|
@ -106,6 +165,39 @@ def scrape_sample_tests(url: str) -> list[tuple[str, str]]:
|
|||
return scrape(url)
|
||||
|
||||
|
||||
def scrape_with_both_formats(
|
||||
url: str,
|
||||
) -> tuple[list[tuple[str, str]], tuple[str, str] | None]:
|
||||
try:
|
||||
scraper = cloudscraper.create_scraper()
|
||||
response = scraper.get(url, timeout=10)
|
||||
response.raise_for_status()
|
||||
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
|
||||
input_sections = soup.find_all("div", class_="input")
|
||||
output_sections = soup.find_all("div", class_="output")
|
||||
|
||||
individual_tests = extract_individual_test_cases(
|
||||
input_sections, output_sections
|
||||
)
|
||||
|
||||
all_inputs = extract_combined_text(input_sections)
|
||||
all_outputs = extract_combined_text(output_sections)
|
||||
|
||||
combined = None
|
||||
if all_inputs and all_outputs:
|
||||
combined_input = "\n".join(all_inputs)
|
||||
combined_output = "\n".join(all_outputs)
|
||||
combined = (combined_input, combined_output)
|
||||
|
||||
return individual_tests, combined
|
||||
|
||||
except Exception as e:
|
||||
print(f"CloudScraper failed: {e}", file=sys.stderr)
|
||||
return [], None
|
||||
|
||||
|
||||
def main() -> None:
|
||||
if len(sys.argv) < 2:
|
||||
result: dict[str, str | bool] = {
|
||||
|
|
@ -158,9 +250,10 @@ def main() -> None:
|
|||
problem_id: str = contest_id + problem_letter.lower()
|
||||
|
||||
url: str = parse_problem_url(contest_id, problem_letter)
|
||||
tests: list[tuple[str, str]] = scrape_sample_tests(url)
|
||||
print(f"Scraping: {url}", file=sys.stderr)
|
||||
individual_tests, combined = scrape_with_both_formats(url)
|
||||
|
||||
if not tests:
|
||||
if not individual_tests and not combined:
|
||||
result: dict[str, str | bool] = {
|
||||
"success": False,
|
||||
"error": f"No tests found for {contest_id} {problem_letter}",
|
||||
|
|
@ -171,15 +264,25 @@ def main() -> None:
|
|||
sys.exit(1)
|
||||
|
||||
test_cases: list[dict[str, str]] = []
|
||||
for input_data, output_data in tests:
|
||||
test_cases.append({"input": input_data, "output": output_data})
|
||||
has_individual = len(individual_tests) > 0
|
||||
|
||||
if has_individual:
|
||||
for input_data, output_data in individual_tests:
|
||||
test_cases.append({"input": input_data, "output": output_data})
|
||||
elif combined:
|
||||
test_cases.append({"input": combined[0], "output": combined[1]})
|
||||
|
||||
result: dict[str, str | bool | list] = {
|
||||
"success": True,
|
||||
"problem_id": problem_id,
|
||||
"url": url,
|
||||
"test_cases": test_cases,
|
||||
"has_individual_tests": has_individual,
|
||||
}
|
||||
|
||||
if combined:
|
||||
result["combined"] = {"input": combined[0], "output": combined[1]}
|
||||
|
||||
print(json.dumps(result))
|
||||
|
||||
else:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue