From 8c654710e8520cb2a45228e6a04102cae9891a15 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Thu, 28 Dec 2023 14:17:02 +0530 Subject: [PATCH] clean html extractor --- .../ingestion/ask-astro-load-airflow-docs.py | 15 +- airflow/include/tasks/extract/airflow_docs.py | 55 +------ .../include/tasks/extract/astro_forum_docs.py | 93 +---------- .../include/tasks/extract/astro_sdk_docs.py | 6 +- .../extract/astronomer_providers_docs.py | 6 +- .../tasks/extract/utils/html_helpers.py | 77 --------- .../{html_url_extractor.py => html_utils.py} | 147 ++++++++++-------- airflow/include/tasks/split.py | 4 + 8 files changed, 112 insertions(+), 291 deletions(-) delete mode 100644 airflow/include/tasks/extract/utils/html_helpers.py rename airflow/include/tasks/extract/utils/{html_url_extractor.py => html_utils.py} (54%) diff --git a/airflow/dags/ingestion/ask-astro-load-airflow-docs.py b/airflow/dags/ingestion/ask-astro-load-airflow-docs.py index e367ab1b..fba4c71f 100644 --- a/airflow/dags/ingestion/ask-astro-load-airflow-docs.py +++ b/airflow/dags/ingestion/ask-astro-load-airflow-docs.py @@ -1,6 +1,7 @@ import os from datetime import datetime +import pandas as pd from include.utils.slack import send_failure_notification from airflow.decorators import dag, task @@ -19,6 +20,15 @@ schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None +@task +def split_docs(urls, chunk_size=100) -> list[list[pd.Dataframe]]: + from include.tasks import split + from include.tasks.extract.utils.html_utils import urls_to_dataframe + + chunked_urls = split.split_list(list(urls), chunk_size=chunk_size) + return [[urls_to_dataframe(chunk_url)] for chunk_url in chunked_urls] + + @dag( schedule_interval=schedule_interval, start_date=datetime(2023, 9, 27), @@ -35,13 +45,10 @@ def ask_astro_load_airflow_docs(): data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator any existing documents that have been updated will be removed and re-added. """ - from include.tasks import split from include.tasks.extract import airflow_docs extracted_airflow_docs = task(airflow_docs.extract_airflow_docs)(docs_base_url=airflow_docs_base_url) - split_md_docs = task(split.split_html).expand(dfs=[extracted_airflow_docs]) - _import_data = WeaviateDocumentIngestOperator.partial( class_name=WEAVIATE_CLASS, existing="replace", @@ -50,7 +57,7 @@ def ask_astro_load_airflow_docs(): verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", - ).expand(input_data=[split_md_docs]) + ).expand(input_data=split_docs(extracted_airflow_docs, chunk_size=100)) ask_astro_load_airflow_docs() diff --git a/airflow/include/tasks/extract/airflow_docs.py b/airflow/include/tasks/extract/airflow_docs.py index 5c0fbd98..c67cef5c 100644 --- a/airflow/include/tasks/extract/airflow_docs.py +++ b/airflow/include/tasks/extract/airflow_docs.py @@ -1,36 +1,13 @@ from __future__ import annotations -import re -import urllib.parse - import pandas as pd -import requests -from bs4 import BeautifulSoup -from weaviate.util import generate_uuid5 -from include.tasks.extract.utils.html_helpers import get_all_links +from include.tasks.extract.utils.html_utils import get_internal_links def extract_airflow_docs(docs_base_url: str) -> list[pd.DataFrame]: """ - This task scrapes docs from the Airflow website and returns a list of pandas dataframes. Return - type is a list in order to map to upstream dynamic tasks. The code recursively generates a list - of html files relative to 'docs_base_url' and then extracts each as text. - - Note: Only the (class_: body, role: main) tag and children are extracted. - - Note: This code will also pickup https://airflow.apache.org/howto/* - which are also referenced in the docs pages. These are useful for Ask Astro and also they are relatively few - pages so we leave them in. - - param docs_base_url: Base URL to start extract. - type docs_base_url: str - - The returned data includes the following fields: - 'docSource': 'apache/airflow/docs' - 'docLink': URL for the page - 'content': HTML content of the page - 'sha': A UUID from the other fields + This task return all internal url for Airflow docs """ # we exclude the following docs which are not useful and/or too large for easy processing. @@ -48,30 +25,6 @@ def extract_airflow_docs(docs_base_url: str) -> list[pd.DataFrame]: "cli-and-env-variables-ref.html", ] - docs_url_parts = urllib.parse.urlsplit(docs_base_url) - docs_url_base = f"{docs_url_parts.scheme}://{docs_url_parts.netloc}" - - all_links = {docs_base_url} - get_all_links(url=list(all_links)[0], all_links=all_links, exclude_docs=exclude_docs) - - # make sure we didn't accidentally pickup any unrelated links in recursion - non_doc_links = {link if docs_url_base not in link else "" for link in all_links} - docs_links = all_links - non_doc_links - - df = pd.DataFrame(docs_links, columns=["docLink"]) - - df["html_content"] = df["docLink"].apply(lambda x: requests.get(x).content) - - df["content"] = df["html_content"].apply( - lambda x: str(BeautifulSoup(x, "html.parser").find(class_="body", role="main")) - ) - df["content"] = df["content"].apply(lambda x: re.sub("ΒΆ", "", x)) - - df["sha"] = df["content"].apply(generate_uuid5) - df["docSource"] = "apache/airflow/docs" - df.reset_index(drop=True, inplace=True) - - # column order matters for uuid generation - df = df[["docSource", "sha", "content", "docLink"]] + all_links = get_internal_links(docs_base_url, exclude_literal=exclude_docs) - return [df] + return all_links diff --git a/airflow/include/tasks/extract/astro_forum_docs.py b/airflow/include/tasks/extract/astro_forum_docs.py index eef43056..81044e04 100644 --- a/airflow/include/tasks/extract/astro_forum_docs.py +++ b/airflow/include/tasks/extract/astro_forum_docs.py @@ -7,7 +7,8 @@ import pytz import requests from bs4 import BeautifulSoup -from weaviate.util import generate_uuid5 + +from include.tasks.extract.utils.html_utils import fetch_page_content, urls_to_dataframe cutoff_date = datetime(2022, 1, 1, tzinfo=pytz.UTC) @@ -77,7 +78,7 @@ def get_cutoff_questions(forum_url: str) -> set[str]: page_url = f"{base_url}{page_number}" logger.info(page_url) page_number = page_number + 1 - html_content = requests.get(page_url).content + html_content = fetch_page_content(page_url) questions_urls = get_questions_urls(html_content) if not questions_urls: # reached at the end of page return set(all_valid_url) @@ -85,97 +86,11 @@ def get_cutoff_questions(forum_url: str) -> set[str]: all_valid_url.extend(filter_questions_urls) -def truncate_tokens(text: str, encoding_name: str, max_length: int = 8192) -> str: - """ - Truncates a text string based on the maximum number of tokens. - - param string (str): The input text string to be truncated. - param encoding_name (str): The name of the encoding model. - param max_length (int): The maximum number of tokens allowed. Default is 8192. - """ - import tiktoken - - try: - encoding = tiktoken.encoding_for_model(encoding_name) - except ValueError as e: - raise ValueError(f"Invalid encoding_name: {e}") - - encoded_string = encoding.encode(text) - num_tokens = len(encoded_string) - - if num_tokens > max_length: - text = encoding.decode(encoded_string[:max_length]) - - return text - - -def clean_content(row_content: str) -> str | None: - """ - Cleans and extracts text content from HTML. - - param row_content (str): The HTML content to be cleaned. - """ - soup = BeautifulSoup(row_content, "html.parser").find("body") - - if soup is None: - return - # Remove script and style tags - for script_or_style in soup(["script", "style"]): - script_or_style.extract() - - # Get text and handle whitespaces - text = " ".join(soup.stripped_strings) - # Need to truncate because in some cases the token size - # exceeding the max token size. Better solution can be get summary and ingest it. - return truncate_tokens(text, "gpt-3.5-turbo", 7692) - - -def fetch_url_content(url) -> str | None: - """ - Fetches the content of a URL. - - param url (str): The URL to fetch content from. - """ - try: - response = requests.get(url) - response.raise_for_status() # Raise an HTTPError for bad responses - return response.content - except requests.RequestException: - logger.info("Error fetching content for %s: %s", url, url) - return None - - -def process_url(url: str, doc_source: str = "") -> dict | None: - """ - Process a URL by fetching its content, cleaning it, and generating a unique identifier (SHA) based on the cleaned content. - - param url (str): The URL to be processed. - """ - content = fetch_url_content(url) - if content is not None: - cleaned_content = clean_content(content) - sha = generate_uuid5(cleaned_content) - return {"docSource": doc_source, "sha": sha, "content": cleaned_content, "docLink": url} - - -def url_to_df(urls: set[str], doc_source: str = "") -> pd.DataFrame: - """ - Create a DataFrame from a list of URLs by processing each URL and organizing the results. - - param urls (list): A list of URLs to be processed. - """ - df_data = [process_url(url, doc_source) for url in urls] - df_data = [entry for entry in df_data if entry is not None] # Remove failed entries - df = pd.DataFrame(df_data) - df = df[["docSource", "sha", "content", "docLink"]] # Reorder columns if needed - return df - - def get_forum_df() -> list[pd.DataFrame]: """ Retrieves question links from a forum, converts them into a DataFrame, and returns a list containing the DataFrame. """ questions_links = get_cutoff_questions("https://forum.astronomer.io/latest") logger.info(questions_links) - df = url_to_df(questions_links, "astro-forum") + df = urls_to_dataframe(questions_links, "astro-forum") return [df] diff --git a/airflow/include/tasks/extract/astro_sdk_docs.py b/airflow/include/tasks/extract/astro_sdk_docs.py index 489de94c..8f8ad7cf 100644 --- a/airflow/include/tasks/extract/astro_sdk_docs.py +++ b/airflow/include/tasks/extract/astro_sdk_docs.py @@ -4,7 +4,7 @@ import pandas as pd -from include.tasks.extract.utils.html_url_extractor import extract_internal_url, url_to_df +from include.tasks.extract.utils.html_utils import get_internal_links, urls_to_dataframe logger = logging.getLogger("airflow.task") @@ -13,12 +13,12 @@ def extract_astro_sdk_docs() -> list[pd.DataFrame]: exclude_docs = ["autoapi", "genindex.html", "py-modindex.html", ".md", ".py"] base_url = "https://astro-sdk-python.readthedocs.io/en/stable/" - urls = extract_internal_url(base_url, exclude_docs) + urls = get_internal_links(base_url, exclude_docs) new_urls = [url for url in urls if "stable" in url] logger.info("******ingesting****") logger.info(new_urls) logger.info("*********************") - df = url_to_df(new_urls, "astro-sdk") + df = urls_to_dataframe(new_urls, "astro-sdk") return [df] diff --git a/airflow/include/tasks/extract/astronomer_providers_docs.py b/airflow/include/tasks/extract/astronomer_providers_docs.py index c0420841..8eafc053 100644 --- a/airflow/include/tasks/extract/astronomer_providers_docs.py +++ b/airflow/include/tasks/extract/astronomer_providers_docs.py @@ -2,13 +2,13 @@ import pandas as pd -from include.tasks.extract.utils.html_url_extractor import extract_internal_url, url_to_df +from include.tasks.extract.utils.html_utils import get_internal_links, urls_to_dataframe def extract_provider_docs() -> list[pd.DataFrame]: exclude_docs = ["_api", "_modules", "_sources", "changelog.html", "genindex.html", "py-modindex.html", "#"] base_url = "https://astronomer-providers.readthedocs.io/en/stable/" - urls = extract_internal_url(base_url, exclude_docs) - df = url_to_df(urls, "astronomer-providers") + urls = get_internal_links(base_url, exclude_docs) + df = urls_to_dataframe(urls, "astronomer-providers") return [df] diff --git a/airflow/include/tasks/extract/utils/html_helpers.py b/airflow/include/tasks/extract/utils/html_helpers.py deleted file mode 100644 index 34029303..00000000 --- a/airflow/include/tasks/extract/utils/html_helpers.py +++ /dev/null @@ -1,77 +0,0 @@ -from __future__ import annotations - -import logging -import urllib.parse -from time import sleep - -import requests -from bs4 import BeautifulSoup - - -def get_links(url: str, exclude_docs: list) -> set: - """ - Given a HTML url this function scrapes the page for any HTML links ( tags) and returns a set of links which: - a) starts with the same base (ie. scheme + netloc) - b) is a relative link from the currently read page - Relative links are converted to absolute links.Note that the absolute link may not be unique due to redirects. - - :param url: The url to scrape for links. - :param exclude_docs: A list of strings to exclude from the returned links. - """ - response = requests.get(url) - data = response.text - soup = BeautifulSoup(data, "lxml") - - url_parts = urllib.parse.urlsplit(url) - url_base = f"{url_parts.scheme}://{url_parts.netloc}" - - links = set() - for link in soup.find_all("a"): - link_url = link.get("href") - - if link_url.endswith(".html"): - if link_url.startswith(url_base) and not any(substring in link_url for substring in exclude_docs): - links.add(link_url) - elif not link_url.startswith("http"): - absolute_url = urllib.parse.urljoin(url, link_url) - if not any(substring in absolute_url for substring in exclude_docs): - links.add(absolute_url) - - return links - - -def get_all_links(url: str, all_links: set, exclude_docs: list, retry_count: int = 0, max_retries: int = 5): - """ - Recursive function to find all sub-pages of a webpage. - - :param url: The url to scrape for links. - :param all_links: A set of all links found so far. - :param exclude_docs: A list of strings to exclude from the returned links. - :param retry_count: Current retry attempt. - :param max_retries: Maximum number of retries allowed for a single URL. - """ - try: - links = get_links(url=url, exclude_docs=exclude_docs) - for link in links: - # check if the linked page actually exists and get the redirect which is hopefully unique - - response = requests.head(link, allow_redirects=True) - if response.ok: - redirect_url = response.url - if redirect_url not in all_links: - logging.info(redirect_url) - all_links.add(redirect_url) - get_all_links(url=redirect_url, all_links=all_links, exclude_docs=exclude_docs) - except requests.exceptions.ConnectionError as ce: - if retry_count < max_retries: - logging.warning(f"Connection error for {url}: {ce}. Retrying ({retry_count + 1}/{max_retries})") - sleep(2**retry_count) # Exponential backoff - get_all_links( - url=url, - all_links=all_links, - exclude_docs=exclude_docs, - retry_count=retry_count + 1, - max_retries=max_retries, - ) - else: - logging.warning(f"Max retries reached for {url}. Skipping this URL.") diff --git a/airflow/include/tasks/extract/utils/html_url_extractor.py b/airflow/include/tasks/extract/utils/html_utils.py similarity index 54% rename from airflow/include/tasks/extract/utils/html_url_extractor.py rename to airflow/include/tasks/extract/utils/html_utils.py index d28b0dd6..91c1c30b 100644 --- a/airflow/include/tasks/extract/utils/html_url_extractor.py +++ b/airflow/include/tasks/extract/utils/html_utils.py @@ -10,10 +10,11 @@ logger = logging.getLogger("airflow.task") + internal_urls = set() -def is_valid_url(url): +def is_valid_url(url: str) -> bool: """ Check if the given URL is valid by ensuring it has a valid scheme and network location. @@ -23,28 +24,74 @@ def is_valid_url(url): return bool(parsed.netloc) and bool(parsed.scheme) -def exclude_path(url, exclude_docs=None): +def fetch_page_content(url: str) -> str: + try: + response = requests.get(url) + response.raise_for_status() # Raise an HTTPError for bad responses + return response.content + except requests.RequestException: + logger.info("Error fetching content for %s: %s", url, url) + return "" + + +def is_excluded_url(url: str, exclude_literal: list[str]) -> bool: + url_path = urlparse(url).path + return any(literal in url_path for literal in exclude_literal) + + +def clean_tags(text_content: str, tags: list[str] | None) -> str | None: """ - Check if the URL path contains any of the specified strings to be excluded. + Clean the HTML content by removing script and style tags, collapsing whitespaces, and extracting text. - param url (str): The URL to check. - param exclude_docs (list): List of strings to exclude from the URL path. + param text_content (str): The HTML content to be cleaned. """ - if exclude_docs is None: - exclude_docs = [] - url_path = urlparse(url).path - return any(docs in url_path for docs in exclude_docs) + if tags is None: + tags = ["script", "style"] + soup = BeautifulSoup(text_content, "html.parser").find("body") + if soup is None: + return + # Remove script and style tags + for script_or_style in soup(tags): + script_or_style.extract() + + # Get text and handle whitespaces + text = " ".join(soup.stripped_strings) + + return text -def get_all_links(url, exclude_docs=None): + +def truncate_tokens(text: str, encoding_name: str = "gpt-3.5-turbo", max_length: int = 8192) -> str | None: + """ + Truncates a text string based on the maximum number of tokens. + + param string (str): The input text string to be truncated. + param encoding_name (str): The name of the encoding model. + param max_length (int): The maximum number of tokens allowed. Default is 8192. + """ + import tiktoken + + try: + encoding = tiktoken.encoding_for_model(encoding_name) + encoded_string = encoding.encode(text, disallowed_special=()) + + num_tokens = len(encoded_string) + + if num_tokens > max_length: + text = encoding.decode(encoded_string[:max_length]) + return text + except Exception as e: + logger.info("Unable to encode text %s", text) + logger.info(e) + + +def get_page_links(url: str, exclude_literal: list[str]) -> set[str]: """ Extract all valid and internal links from the given URL. param url (str): The URL to extract links from. param exclude_docs (list): List of strings to exclude from the URL path. """ - if exclude_docs is None: - exclude_docs = [] urls = set() domain_name = urlparse(url).netloc soup = BeautifulSoup(requests.get(url).content, "html.parser") @@ -59,7 +106,7 @@ def get_all_links(url, exclude_docs=None): not is_valid_url(href) or href in internal_urls or domain_name not in href - or exclude_path(href, exclude_docs) + or is_excluded_url(href, exclude_literal) ): continue urls.add(href) @@ -69,77 +116,49 @@ def get_all_links(url, exclude_docs=None): return urls -def extract_internal_url(url, exclude_docs=None): - """ - Recursively extract all valid and internal links from the given URL. +def get_internal_links(base_url: str, exclude_literal: list[str] | None = None) -> set[str]: + if exclude_literal is None: + exclude_literal = [] - param url (str): The URL to start the extraction from. - param exclude_docs (list): List of strings to exclude from the URL path. - """ - if exclude_docs is None: - exclude_docs = [] - - links = get_all_links(url, exclude_docs) + links = get_page_links(base_url, exclude_literal) for link in links: - extract_internal_url(link, exclude_docs) + get_page_links(link, exclude_literal) return internal_urls -def clean_content(text_content: str) -> str | None: - """ - Clean the HTML content by removing script and style tags, collapsing whitespaces, and extracting text. - - param text_content (str): The HTML content to be cleaned. - """ - soup = BeautifulSoup(text_content, "html.parser").find("body") - - if soup is None: - return - # Remove script and style tags - for script_or_style in soup(["script", "style"]): - script_or_style.extract() - - # Get text and handle whitespaces - text = " ".join(soup.stripped_strings) - - return text - - -def fetch_url_content(url): - try: - response = requests.get(url) - response.raise_for_status() # Raise an HTTPError for bad responses - return response.content - except requests.RequestException: - logger.info("Error fetching content for %s: %s", url, url) - return None - - -def process_url(url, doc_source=""): +def process_url(url, doc_source="", clean_tag: bool = True, truncate_text: bool = True) -> dict | None: """ Process a URL by fetching its content, cleaning it, and generating a unique identifier (SHA) based on the cleaned content. param url (str): The URL to be processed. """ - content = fetch_url_content(url) - if content is not None: - cleaned_content = clean_content(content) - sha = generate_uuid5(cleaned_content) - return {"docSource": doc_source, "sha": sha, "content": cleaned_content, "docLink": url} + html_text = fetch_page_content(url) + if html_text: + if clean_tag: + html_text = clean_tags(html_text) + if truncate_text: + html_text = truncate_tokens(html_text) + sha = generate_uuid5(html_text) + return {"docSource": doc_source, "sha": sha, "content": html_text, "docLink": url} else: return None -def url_to_df(urls, doc_source=""): +def urls_to_dataframe( + urls: set[str], doc_source: str = "", clean_tag: bool = True, truncate_text: bool = True +) -> pd.DataFrame: """ Create a DataFrame from a list of URLs by processing each URL and organizing the results. param urls (list): A list of URLs to be processed. """ - df_data = [process_url(url, doc_source) for url in urls] - df_data = [entry for entry in df_data if entry is not None] # Remove failed entries - df = pd.DataFrame(df_data) + content_list = [] + for url in urls: + data = process_url(url, doc_source, clean_tag, truncate_text) + if data: + content_list.append(data) + df = pd.DataFrame(content_list) df = df[["docSource", "sha", "content", "docLink"]] # Reorder columns if needed return df diff --git a/airflow/include/tasks/split.py b/airflow/include/tasks/split.py index d6720fad..233b49ec 100644 --- a/airflow/include/tasks/split.py +++ b/airflow/include/tasks/split.py @@ -103,3 +103,7 @@ def split_html(dfs: list[pd.DataFrame]) -> pd.DataFrame: df.reset_index(inplace=True, drop=True) return df + + +def split_list(urls: list[str], chunk_size: int = 0) -> list[list]: + return [urls[i : i + chunk_size] for i in range(0, len(urls), chunk_size)]