From b43dcecd99c361b180cf1b461cb4dfbb22188a22 Mon Sep 17 00:00:00 2001 From: Ankit Chaurasia <8670962+sunank200@users.noreply.github.com> Date: Thu, 28 Dec 2023 15:43:08 +0545 Subject: [PATCH 1/6] Astro docs links to the webpage not github --- .../ask-astro-load-astronomer-docs.py | 51 ++++++++++ .../dags/ingestion/ask-astro-load-github.py | 14 +-- airflow/dags/ingestion/ask-astro-load.py | 30 ++++-- airflow/include/tasks/extract/astro_docs.py | 98 +++++++++++++++++++ 4 files changed, 174 insertions(+), 19 deletions(-) create mode 100644 airflow/dags/ingestion/ask-astro-load-astronomer-docs.py create mode 100644 airflow/include/tasks/extract/astro_docs.py diff --git a/airflow/dags/ingestion/ask-astro-load-astronomer-docs.py b/airflow/dags/ingestion/ask-astro-load-astronomer-docs.py new file mode 100644 index 00000000..172d0896 --- /dev/null +++ b/airflow/dags/ingestion/ask-astro-load-astronomer-docs.py @@ -0,0 +1,51 @@ +import datetime +import os + +from include.tasks import split +from include.tasks.extract.astro_docs import extract_astro_docs +from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook + +from airflow.decorators import dag, task + +ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev") + +_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}" +WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev") + +ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID) + + +default_args = {"retries": 3, "retry_delay": 30} + +schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None + + +@dag( + schedule_interval=schedule_interval, + start_date=datetime.datetime(2023, 9, 27), + catchup=False, + is_paused_upon_creation=True, + default_args=default_args, +) +def ask_astro_load_astronomer_docs(): + """ + This DAG performs incremental load for any new docs in astronomer docs. + """ + astro_docs = task(extract_astro_docs)() + + split_md_docs = task(split.split_markdown).expand(dfs=[astro_docs]) + + _import_data = ( + task(ask_astro_weaviate_hook.ingest_data, retries=10) + .partial( + class_name=WEAVIATE_CLASS, + existing="upsert", + doc_key="docLink", + batch_params={"batch_size": 1000}, + verbose=True, + ) + .expand(dfs=[split_md_docs]) + ) + + +ask_astro_load_astronomer_docs() diff --git a/airflow/dags/ingestion/ask-astro-load-github.py b/airflow/dags/ingestion/ask-astro-load-github.py index 83f43655..7030abf7 100644 --- a/airflow/dags/ingestion/ask-astro-load-github.py +++ b/airflow/dags/ingestion/ask-astro-load-github.py @@ -17,14 +17,10 @@ ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID) markdown_docs_sources = [ - {"doc_dir": "learn", "repo_base": "astronomer/docs"}, - {"doc_dir": "astro", "repo_base": "astronomer/docs"}, {"doc_dir": "", "repo_base": "OpenLineage/docs"}, {"doc_dir": "", "repo_base": "OpenLineage/OpenLineage"}, ] -code_samples_sources = [ - {"doc_dir": "code-samples", "repo_base": "astronomer/docs"}, -] + issues_docs_sources = [ "apache/airflow", ] @@ -60,14 +56,8 @@ def ask_astro_load_github(): .expand(repo_base=issues_docs_sources) ) - code_samples = ( - task(github.extract_github_python).partial(github_conn_id=_GITHUB_CONN_ID).expand(source=code_samples_sources) - ) - split_md_docs = task(split.split_markdown).expand(dfs=[md_docs, issues_docs]) - split_code_docs = task(split.split_python).expand(dfs=[code_samples]) - _import_data = ( task(ask_astro_weaviate_hook.ingest_data, retries=10) .partial( @@ -77,7 +67,7 @@ def ask_astro_load_github(): batch_params={"batch_size": 1000}, verbose=True, ) - .expand(dfs=[split_md_docs, split_code_docs]) + .expand(dfs=[split_md_docs]) ) diff --git a/airflow/dags/ingestion/ask-astro-load.py b/airflow/dags/ingestion/ask-astro-load.py index 24d9b525..04008473 100644 --- a/airflow/dags/ingestion/ask-astro-load.py +++ b/airflow/dags/ingestion/ask-astro-load.py @@ -9,6 +9,7 @@ import pandas as pd from include.tasks import split from include.tasks.extract import airflow_docs, astro_cli_docs, blogs, github, registry, stack_overflow +from include.tasks.extract.astro_docs import extract_astro_docs from include.tasks.extract.astro_forum_docs import get_forum_df from include.tasks.extract.astro_sdk_docs import extract_astro_sdk_docs from include.tasks.extract.astronomer_providers_docs import extract_provider_docs @@ -28,14 +29,12 @@ ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID) markdown_docs_sources = [ - {"doc_dir": "learn", "repo_base": "astronomer/docs"}, - {"doc_dir": "astro", "repo_base": "astronomer/docs"}, {"doc_dir": "", "repo_base": "OpenLineage/docs"}, {"doc_dir": "", "repo_base": "OpenLineage/OpenLineage"}, ] -code_samples_sources = [ - {"doc_dir": "code-samples", "repo_base": "astronomer/docs"}, -] +# code_samples_sources = [ +# {"doc_dir": "code-samples", "repo_base": "astronomer/docs"}, +# ] issues_docs_sources = [ "apache/airflow", ] @@ -153,6 +152,7 @@ def check_seed_baseline(seed_baseline_url: str = None) -> str | set: "extract_astro_sdk_doc", "extract_astro_provider_doc", "extract_astro_forum_doc", + "extract_astronomer_docs", } @task(trigger_rule="none_failed") @@ -314,13 +314,28 @@ def extract_astro_blogs(): return [df] + @task(trigger_rule="none_failed") + def extract_astronomer_docs(): + parquet_file = "include/data/astronomer/blogs/astro_docs.parquet" + + if os.path.isfile(parquet_file): + if os.access(parquet_file, os.R_OK): + df = pd.read_parquet(parquet_file) + else: + raise Exception("Parquet file exists locally but is not readable.") + else: + df = extract_astro_docs()[0] + df.to_parquet(parquet_file) + + return [df] + md_docs = extract_github_markdown.expand(source=markdown_docs_sources) issues_docs = extract_github_issues.expand(repo_base=issues_docs_sources) stackoverflow_docs = extract_stack_overflow.expand(tag=stackoverflow_tags) registry_cells_docs = extract_astro_registry_cell_types() blogs_docs = extract_astro_blogs() registry_dags_docs = extract_astro_registry_dags() - code_samples = extract_github_python.expand(source=code_samples_sources) + _astro_docs = extract_astronomer_docs() _airflow_docs = extract_airflow_docs() _astro_cli_docs = extract_astro_cli_docs() _extract_astro_sdk_docs = extract_astro_sdk_doc() @@ -346,9 +361,10 @@ def extract_astro_blogs(): _extract_astro_sdk_docs, _extract_astro_providers_docs, _astro_forum_docs, + _astro_docs, ] - python_code_tasks = [registry_dags_docs, code_samples] + python_code_tasks = [registry_dags_docs] split_md_docs = task(split.split_markdown).expand(dfs=markdown_tasks) diff --git a/airflow/include/tasks/extract/astro_docs.py b/airflow/include/tasks/extract/astro_docs.py new file mode 100644 index 00000000..50753568 --- /dev/null +++ b/airflow/include/tasks/extract/astro_docs.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +import logging +from urllib.parse import urldefrag, urljoin + +import pandas as pd +import requests +from bs4 import BeautifulSoup +from weaviate.util import generate_uuid5 + +base_url = "https://docs.astronomer.io/" + + +def fetch_page_content(url: str) -> str: + """ + Fetches the content of a given URL. + + :param url: URL of the page to fetch. + :return: HTML content of the page. + """ + try: + response = requests.get(url, headers={"User-Agent": "Mozilla/5.0"}) + if response.status_code == 200: + return response.content + except requests.RequestException as e: + logging.error(f"Error fetching {url}: {e}") + return "" + + +def extract_links(soup: BeautifulSoup, base_url: str) -> list[str]: + """ + Extracts all valid links from a BeautifulSoup object. + + :param soup: BeautifulSoup object to extract links from. + :param base_url: Base URL for relative links. + :return: List of extracted URLs. + """ + links = [] + for link in soup.find_all("a", href=True): + href = link["href"] + if not href.startswith("http"): + href = urljoin(base_url, href) + if href.startswith(base_url): + links.append(href) + return links + + +def scrape_page(url: str, visited_urls: set, docs_data: list): + """ + Recursively scrapes a webpage and its subpages. + + :param url: URL of the page to scrape. + :param visited_urls: Set of URLs already visited. + :param docs_data: List to append extracted data to. + """ + if url in visited_urls or not url.startswith(base_url): + return + + # Normalize URL by stripping off the fragment + base_url_no_fragment, frag = urldefrag(url) + + # If the URL is the base URL plus a fragment, ignore it + if base_url_no_fragment == base_url and frag: + return + + visited_urls.add(url) + + logging.info(f"Scraping : {url}") + + page_content = fetch_page_content(url) + if not page_content: + return + + soup = BeautifulSoup(page_content, "lxml") + content = soup.get_text(strip=True) + sha = generate_uuid5(content) + docs_data.append({"docSource": "astro docs", "sha": sha, "content": content, "docLink": url}) + # Recursively scrape linked pages + for link in extract_links(soup, base_url): + scrape_page(link, visited_urls, docs_data) + + +def extract_astro_docs(base_url: str = base_url) -> list[pd.DataFrame]: + """ + Extract documentation pages from docs.astronomer.io and its subdomains. + + :return: A list of pandas dataframes with extracted data. + """ + visited_urls = set() + docs_data = [] + + scrape_page(base_url, visited_urls, docs_data) + + df = pd.DataFrame(docs_data) + df.drop_duplicates(subset="sha", inplace=True) + df.reset_index(drop=True, inplace=True) + + return [df] From f15774f91ad7c141d33b407da5a9244260521b48 Mon Sep 17 00:00:00 2001 From: Ankit Chaurasia <8670962+sunank200@users.noreply.github.com> Date: Thu, 28 Dec 2023 16:03:44 +0545 Subject: [PATCH 2/6] Change the bulk load DAG --- airflow/dags/ingestion/ask-astro-load.py | 20 +------------------- 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/airflow/dags/ingestion/ask-astro-load.py b/airflow/dags/ingestion/ask-astro-load.py index 04008473..a83e263e 100644 --- a/airflow/dags/ingestion/ask-astro-load.py +++ b/airflow/dags/ingestion/ask-astro-load.py @@ -32,9 +32,7 @@ {"doc_dir": "", "repo_base": "OpenLineage/docs"}, {"doc_dir": "", "repo_base": "OpenLineage/OpenLineage"}, ] -# code_samples_sources = [ -# {"doc_dir": "code-samples", "repo_base": "astronomer/docs"}, -# ] + issues_docs_sources = [ "apache/airflow", ] @@ -146,7 +144,6 @@ def check_seed_baseline(seed_baseline_url: str = None) -> str | set: "extract_astro_registry_cell_types", "extract_github_issues", "extract_astro_blogs", - "extract_github_python", "extract_astro_registry_dags", "extract_astro_cli_docs", "extract_astro_sdk_doc", @@ -170,21 +167,6 @@ def extract_github_markdown(source: dict): return df - @task(trigger_rule="none_failed") - def extract_github_python(source: dict): - parquet_file = f"include/data/{source['repo_base']}/{source['doc_dir']}.parquet" - - if os.path.isfile(parquet_file): - if os.access(parquet_file, os.R_OK): - df = pd.read_parquet(parquet_file) - else: - raise Exception("Parquet file exists locally but is not readable.") - else: - df = github.extract_github_python(source, _GITHUB_CONN_ID) - df.to_parquet(parquet_file) - - return df - @task(trigger_rule="none_failed") def extract_airflow_docs(): parquet_file = "include/data/apache/airflow/docs.parquet" From 60cfb2450793ad3fcd27797285d6065f089ba1b2 Mon Sep 17 00:00:00 2001 From: Ankit Chaurasia <8670962+sunank200@users.noreply.github.com> Date: Tue, 2 Jan 2024 21:30:36 +0545 Subject: [PATCH 3/6] Fix PR comments from Pankaj K and Pankaj S --- airflow/dags/ingestion/ask-astro-load-astronomer-docs.py | 6 +++--- airflow/dags/monitor/monitor_ingestion_dags.py | 4 +++- airflow/include/tasks/extract/astro_docs.py | 2 +- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/airflow/dags/ingestion/ask-astro-load-astronomer-docs.py b/airflow/dags/ingestion/ask-astro-load-astronomer-docs.py index 172d0896..69bbe292 100644 --- a/airflow/dags/ingestion/ask-astro-load-astronomer-docs.py +++ b/airflow/dags/ingestion/ask-astro-load-astronomer-docs.py @@ -3,7 +3,6 @@ from include.tasks import split from include.tasks.extract.astro_docs import extract_astro_docs -from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook from airflow.decorators import dag, task @@ -12,8 +11,6 @@ _WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}" WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev") -ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID) - default_args = {"retries": 3, "retry_delay": 30} @@ -31,6 +28,9 @@ def ask_astro_load_astronomer_docs(): """ This DAG performs incremental load for any new docs in astronomer docs. """ + from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook + + ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID) astro_docs = task(extract_astro_docs)() split_md_docs = task(split.split_markdown).expand(dfs=[astro_docs]) diff --git a/airflow/dags/monitor/monitor_ingestion_dags.py b/airflow/dags/monitor/monitor_ingestion_dags.py index fd740b54..490b63eb 100644 --- a/airflow/dags/monitor/monitor_ingestion_dags.py +++ b/airflow/dags/monitor/monitor_ingestion_dags.py @@ -19,12 +19,14 @@ "ask_astro_load_airflow_docs", "ask_astro_load_astro_cli_docs", "ask_astro_load_astronomer_providers", - "ask_astro_load_astro_sdk" "ask_astro_load_blogs", + "ask_astro_load_astro_sdk", + "ask_astro_load_blogs", "ask_astro_load_bulk", "ask_astro_load_github", "ask_astro_load_registry", # "ask_astro_load_slack", "ask_astro_load_stackoverflow", + "ask_astro_load_astronomer_docs", ] diff --git a/airflow/include/tasks/extract/astro_docs.py b/airflow/include/tasks/extract/astro_docs.py index 50753568..5cec7a54 100644 --- a/airflow/include/tasks/extract/astro_docs.py +++ b/airflow/include/tasks/extract/astro_docs.py @@ -45,7 +45,7 @@ def extract_links(soup: BeautifulSoup, base_url: str) -> list[str]: return links -def scrape_page(url: str, visited_urls: set, docs_data: list): +def scrape_page(url: str, visited_urls: set, docs_data: list) -> None: """ Recursively scrapes a webpage and its subpages. From ebedc12274caeb5c106901bb78107e3099e8c1dc Mon Sep 17 00:00:00 2001 From: Ankit Chaurasia <8670962+sunank200@users.noreply.github.com> Date: Wed, 3 Jan 2024 17:03:05 +0545 Subject: [PATCH 4/6] Update airflow/dags/ingestion/ask-astro-load.py Co-authored-by: Wei Lee --- airflow/dags/ingestion/ask-astro-load.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/airflow/dags/ingestion/ask-astro-load.py b/airflow/dags/ingestion/ask-astro-load.py index a83e263e..b2fe14c3 100644 --- a/airflow/dags/ingestion/ask-astro-load.py +++ b/airflow/dags/ingestion/ask-astro-load.py @@ -301,10 +301,9 @@ def extract_astronomer_docs(): parquet_file = "include/data/astronomer/blogs/astro_docs.parquet" if os.path.isfile(parquet_file): - if os.access(parquet_file, os.R_OK): - df = pd.read_parquet(parquet_file) - else: - raise Exception("Parquet file exists locally but is not readable.") + if not os.access(parquet_file, os.R_OK): + raise AirflowException("Parquet file exists locally but is not readable.") + df = pd.read_parquet(parquet_file) else: df = extract_astro_docs()[0] df.to_parquet(parquet_file) From 2204ba42ca5905a4f170279753fa0f14db0eebd4 Mon Sep 17 00:00:00 2001 From: Ankit Chaurasia <8670962+sunank200@users.noreply.github.com> Date: Wed, 3 Jan 2024 17:04:42 +0545 Subject: [PATCH 5/6] reorder the imports --- airflow/dags/ingestion/ask-astro-load-astronomer-docs.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/airflow/dags/ingestion/ask-astro-load-astronomer-docs.py b/airflow/dags/ingestion/ask-astro-load-astronomer-docs.py index 69bbe292..0a71d1e4 100644 --- a/airflow/dags/ingestion/ask-astro-load-astronomer-docs.py +++ b/airflow/dags/ingestion/ask-astro-load-astronomer-docs.py @@ -1,9 +1,6 @@ import datetime import os -from include.tasks import split -from include.tasks.extract.astro_docs import extract_astro_docs - from airflow.decorators import dag, task ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev") @@ -28,6 +25,8 @@ def ask_astro_load_astronomer_docs(): """ This DAG performs incremental load for any new docs in astronomer docs. """ + from include.tasks import split + from include.tasks.extract.astro_docs import extract_astro_docs from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID) From 3ec4f2c026035c233c2c4f882f76cb46855a3413 Mon Sep 17 00:00:00 2001 From: Ankit Chaurasia <8670962+sunank200@users.noreply.github.com> Date: Wed, 3 Jan 2024 18:03:55 +0545 Subject: [PATCH 6/6] Fix precommit errors --- airflow/dags/ingestion/ask-astro-load.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/dags/ingestion/ask-astro-load.py b/airflow/dags/ingestion/ask-astro-load.py index b2fe14c3..f70eae3a 100644 --- a/airflow/dags/ingestion/ask-astro-load.py +++ b/airflow/dags/ingestion/ask-astro-load.py @@ -16,6 +16,7 @@ from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook from airflow.decorators import dag, task +from airflow.exceptions import AirflowException seed_baseline_url = None stackoverflow_cutoff_date = "2021-09-01"