Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Link Astro docs to the webpage not github #238

Merged
merged 6 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions airflow/dags/ingestion/ask-astro-load-astronomer-docs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import datetime
import os

from include.tasks import split
from include.tasks.extract.astro_docs import extract_astro_docs
from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
sunank200 marked this conversation as resolved.
Show resolved Hide resolved

from airflow.decorators import dag, task

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")

ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)


default_args = {"retries": 3, "retry_delay": 30}

schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@dag(
schedule_interval=schedule_interval,
start_date=datetime.datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_astronomer_docs():
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
"""
This DAG performs incremental load for any new docs in astronomer docs.
"""
astro_docs = task(extract_astro_docs)()

split_md_docs = task(split.split_markdown).expand(dfs=[astro_docs])

_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
.partial(
class_name=WEAVIATE_CLASS,
existing="upsert",
doc_key="docLink",
batch_params={"batch_size": 1000},
verbose=True,
)
.expand(dfs=[split_md_docs])
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
)


ask_astro_load_astronomer_docs()
14 changes: 2 additions & 12 deletions airflow/dags/ingestion/ask-astro-load-github.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@
ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)

markdown_docs_sources = [
{"doc_dir": "learn", "repo_base": "astronomer/docs"},
{"doc_dir": "astro", "repo_base": "astronomer/docs"},
{"doc_dir": "", "repo_base": "OpenLineage/docs"},
{"doc_dir": "", "repo_base": "OpenLineage/OpenLineage"},
]
code_samples_sources = [
{"doc_dir": "code-samples", "repo_base": "astronomer/docs"},
]

issues_docs_sources = [
"apache/airflow",
]
Expand Down Expand Up @@ -60,14 +56,8 @@ def ask_astro_load_github():
.expand(repo_base=issues_docs_sources)
)

code_samples = (
task(github.extract_github_python).partial(github_conn_id=_GITHUB_CONN_ID).expand(source=code_samples_sources)
)

split_md_docs = task(split.split_markdown).expand(dfs=[md_docs, issues_docs])

split_code_docs = task(split.split_python).expand(dfs=[code_samples])

_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
.partial(
Expand All @@ -77,7 +67,7 @@ def ask_astro_load_github():
batch_params={"batch_size": 1000},
verbose=True,
)
.expand(dfs=[split_md_docs, split_code_docs])
.expand(dfs=[split_md_docs])
)


Expand Down
44 changes: 21 additions & 23 deletions airflow/dags/ingestion/ask-astro-load.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pandas as pd
from include.tasks import split
from include.tasks.extract import airflow_docs, astro_cli_docs, blogs, github, registry, stack_overflow
from include.tasks.extract.astro_docs import extract_astro_docs
from include.tasks.extract.astro_forum_docs import get_forum_df
from include.tasks.extract.astro_sdk_docs import extract_astro_sdk_docs
from include.tasks.extract.astronomer_providers_docs import extract_provider_docs
Expand All @@ -28,14 +29,10 @@
ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)

markdown_docs_sources = [
{"doc_dir": "learn", "repo_base": "astronomer/docs"},
{"doc_dir": "astro", "repo_base": "astronomer/docs"},
{"doc_dir": "", "repo_base": "OpenLineage/docs"},
{"doc_dir": "", "repo_base": "OpenLineage/OpenLineage"},
]
code_samples_sources = [
{"doc_dir": "code-samples", "repo_base": "astronomer/docs"},
]

issues_docs_sources = [
"apache/airflow",
]
Expand Down Expand Up @@ -147,12 +144,12 @@ def check_seed_baseline(seed_baseline_url: str = None) -> str | set:
"extract_astro_registry_cell_types",
"extract_github_issues",
"extract_astro_blogs",
"extract_github_python",
"extract_astro_registry_dags",
"extract_astro_cli_docs",
"extract_astro_sdk_doc",
"extract_astro_provider_doc",
"extract_astro_forum_doc",
"extract_astronomer_docs",
}

@task(trigger_rule="none_failed")
Expand All @@ -170,21 +167,6 @@ def extract_github_markdown(source: dict):

return df

@task(trigger_rule="none_failed")
def extract_github_python(source: dict):
parquet_file = f"include/data/{source['repo_base']}/{source['doc_dir']}.parquet"

if os.path.isfile(parquet_file):
if os.access(parquet_file, os.R_OK):
df = pd.read_parquet(parquet_file)
else:
raise Exception("Parquet file exists locally but is not readable.")
else:
df = github.extract_github_python(source, _GITHUB_CONN_ID)
df.to_parquet(parquet_file)

return df

@task(trigger_rule="none_failed")
def extract_airflow_docs():
parquet_file = "include/data/apache/airflow/docs.parquet"
Expand Down Expand Up @@ -314,13 +296,28 @@ def extract_astro_blogs():

return [df]

@task(trigger_rule="none_failed")
def extract_astronomer_docs():
parquet_file = "include/data/astronomer/blogs/astro_docs.parquet"

if os.path.isfile(parquet_file):
if os.access(parquet_file, os.R_OK):
df = pd.read_parquet(parquet_file)
else:
raise Exception("Parquet file exists locally but is not readable.")
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
else:
df = extract_astro_docs()[0]
df.to_parquet(parquet_file)

return [df]

md_docs = extract_github_markdown.expand(source=markdown_docs_sources)
issues_docs = extract_github_issues.expand(repo_base=issues_docs_sources)
stackoverflow_docs = extract_stack_overflow.expand(tag=stackoverflow_tags)
registry_cells_docs = extract_astro_registry_cell_types()
blogs_docs = extract_astro_blogs()
registry_dags_docs = extract_astro_registry_dags()
code_samples = extract_github_python.expand(source=code_samples_sources)
_astro_docs = extract_astronomer_docs()
_airflow_docs = extract_airflow_docs()
_astro_cli_docs = extract_astro_cli_docs()
_extract_astro_sdk_docs = extract_astro_sdk_doc()
Expand All @@ -346,9 +343,10 @@ def extract_astro_blogs():
_extract_astro_sdk_docs,
_extract_astro_providers_docs,
_astro_forum_docs,
_astro_docs,
]

python_code_tasks = [registry_dags_docs, code_samples]
python_code_tasks = [registry_dags_docs]

split_md_docs = task(split.split_markdown).expand(dfs=markdown_tasks)

Expand Down
98 changes: 98 additions & 0 deletions airflow/include/tasks/extract/astro_docs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from __future__ import annotations

import logging
from urllib.parse import urldefrag, urljoin

import pandas as pd
import requests
from bs4 import BeautifulSoup
from weaviate.util import generate_uuid5

base_url = "https://docs.astronomer.io/"


def fetch_page_content(url: str) -> str:
"""
Fetches the content of a given URL.

:param url: URL of the page to fetch.
:return: HTML content of the page.
"""
try:
response = requests.get(url, headers={"User-Agent": "Mozilla/5.0"})
if response.status_code == 200:
return response.content
except requests.RequestException as e:
logging.error(f"Error fetching {url}: {e}")
return ""


def extract_links(soup: BeautifulSoup, base_url: str) -> list[str]:
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
"""
Extracts all valid links from a BeautifulSoup object.

:param soup: BeautifulSoup object to extract links from.
:param base_url: Base URL for relative links.
:return: List of extracted URLs.
"""
links = []
for link in soup.find_all("a", href=True):
href = link["href"]
if not href.startswith("http"):
href = urljoin(base_url, href)
if href.startswith(base_url):
links.append(href)
return links


def scrape_page(url: str, visited_urls: set, docs_data: list):
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
"""
Recursively scrapes a webpage and its subpages.

:param url: URL of the page to scrape.
:param visited_urls: Set of URLs already visited.
:param docs_data: List to append extracted data to.
"""
if url in visited_urls or not url.startswith(base_url):
return

# Normalize URL by stripping off the fragment
base_url_no_fragment, frag = urldefrag(url)

# If the URL is the base URL plus a fragment, ignore it
if base_url_no_fragment == base_url and frag:
return

visited_urls.add(url)

logging.info(f"Scraping : {url}")

page_content = fetch_page_content(url)
if not page_content:
return

soup = BeautifulSoup(page_content, "lxml")
content = soup.get_text(strip=True)
sha = generate_uuid5(content)
docs_data.append({"docSource": "astro docs", "sha": sha, "content": content, "docLink": url})
# Recursively scrape linked pages
for link in extract_links(soup, base_url):
scrape_page(link, visited_urls, docs_data)


def extract_astro_docs(base_url: str = base_url) -> list[pd.DataFrame]:
"""
Extract documentation pages from docs.astronomer.io and its subdomains.

:return: A list of pandas dataframes with extracted data.
"""
visited_urls = set()
docs_data = []

scrape_page(base_url, visited_urls, docs_data)

df = pd.DataFrame(docs_data)
df.drop_duplicates(subset="sha", inplace=True)
df.reset_index(drop=True, inplace=True)

return [df]
Loading