Skip to content

Commit

Permalink
fixes #113, fixes #114, fixes #115, fixes #116
Browse files Browse the repository at this point in the history
  • Loading branch information
mpgreg authored and sunank200 committed Nov 23, 2023
1 parent 41c28d8 commit 76ae2ae
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 80 deletions.
6 changes: 3 additions & 3 deletions airflow/dags/ingestion/ask-astro-load-blogs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import datetime
import os
from datetime import datetime

from include.tasks import ingest, split
from include.tasks.extract import blogs
Expand All @@ -11,7 +11,7 @@
_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsProd")

blog_cutoff_date = datetime.strptime("2023-01-19", "%Y-%m-%d")
blog_cutoff_date = datetime.date(2023, 1, 19)

default_args = {"retries": 3, "retry_delay": 30}

Expand All @@ -20,7 +20,7 @@

@dag(
schedule_interval=schedule_interval,
start_date=datetime(2023, 9, 27),
start_date=datetime.datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
Expand Down
15 changes: 9 additions & 6 deletions airflow/dags/ingestion/ask-astro-load-github.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import os
from datetime import datetime

from dateutil.relativedelta import relativedelta
from include.tasks import ingest, split
from include.tasks.extract import github

Expand All @@ -21,7 +22,11 @@
{"doc_dir": "code-samples", "repo_base": "astronomer/docs"},
]
issues_docs_sources = [
"apache/airflow",
{
"repo_base": "apache/airflow",
"cutoff_date": datetime.date.today() - relativedelta(months=1),
"cutoff_issue_number": 30000,
}
]

default_args = {"retries": 3, "retry_delay": 30}
Expand All @@ -31,7 +36,7 @@

@dag(
schedule_interval=schedule_interval,
start_date=datetime(2023, 9, 27),
start_date=datetime.datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
Expand All @@ -50,7 +55,7 @@ def ask_astro_load_github():
)

issues_docs = (
task(github.extract_github_issues).partial(github_conn_id=_GITHUB_CONN_ID).expand(repo_base=issues_docs_sources)
task(github.extract_github_issues).partial(github_conn_id=_GITHUB_CONN_ID).expand(source=issues_docs_sources)
)

code_samples = (
Expand All @@ -68,7 +73,5 @@ def ask_astro_load_github():
class_name=WEAVIATE_CLASS, primary_key="docLink"
).expand(dfs=[split_md_docs, split_code_docs])

issues_docs >> md_docs >> code_samples


ask_astro_load_github()
35 changes: 12 additions & 23 deletions airflow/dags/ingestion/ask-astro-load.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import datetime
import json
import os
from datetime import datetime
from pathlib import Path
from textwrap import dedent

Expand Down Expand Up @@ -29,7 +29,7 @@
{"doc_dir": "code-samples", "repo_base": "astronomer/docs"},
]
issues_docs_sources = [
"apache/airflow",
{"repo_base": "apache/airflow", "cutoff_date": datetime.date(2020, 1, 1), "cutoff_issue_number": 30000}
]
slack_channel_sources = [
{
Expand All @@ -41,7 +41,7 @@
}
]

blog_cutoff_date = datetime.strptime("2023-01-19", "%Y-%m-%d")
blog_cutoff_date = datetime.date(2023, 1, 19)

stackoverflow_cutoff_date = "2021-09-01"
stackoverflow_tags = [
Expand All @@ -52,12 +52,10 @@

default_args = {"retries": 3, "retry_delay": 30}

schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@dag(
schedule_interval=schedule_interval,
start_date=datetime(2023, 9, 27),
schedule_interval=None,
start_date=datetime.datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
Expand All @@ -69,7 +67,7 @@ def ask_astro_load_bulk():
If seed_baseline_url (set above) points to a parquet file with pre-embedded data it will be
ingested. Otherwise new data is extracted, split, embedded and ingested.
The first time this DAG runs (without seeded baseline) it will take at lease 20 minutes to
The first time this DAG runs (without seeded baseline) it will take at lease 90 minutes to
extract data from all sources. Extracted data is then serialized to disk in the project
directory in order to simplify later iterations of ingest with different chunking strategies,
vector databases or embedding models.
Expand Down Expand Up @@ -174,12 +172,12 @@ def extract_stack_overflow(tag: str, stackoverflow_cutoff_date: str):
# return df

@task(trigger_rule="none_failed")
def extract_github_issues(repo_base: str):
def extract_github_issues(source: dict):
try:
df = pd.read_parquet(f"include/data/{repo_base}/issues.parquet")
df = pd.read_parquet(f"include/data/{source['repo_base']}/issues.parquet")
except Exception:
df = github.extract_github_issues(repo_base, _GITHUB_CONN_ID)
df.to_parquet(f"include/data/{repo_base}/issues.parquet")
df = github.extract_github_issues(source, _GITHUB_CONN_ID)
df.to_parquet(f"include/data/{source['repo_base']}/issues.parquet")

return df

Expand Down Expand Up @@ -217,7 +215,7 @@ def extract_astro_blogs():

md_docs = extract_github_markdown.expand(source=markdown_docs_sources)

issues_docs = extract_github_issues.expand(repo_base=issues_docs_sources)
issues_docs = extract_github_issues.expand(source=issues_docs_sources)

stackoverflow_docs = extract_stack_overflow.partial(stackoverflow_cutoff_date=stackoverflow_cutoff_date).expand(
tag=stackoverflow_tags
Expand Down Expand Up @@ -266,16 +264,7 @@ def extract_astro_blogs():

_create_schema >> markdown_tasks + python_code_tasks + html_tasks + [_check_seed_baseline]

_check_seed_baseline >> issues_docs >> md_docs
# (
# _check_seed_baseline
# >> [stackoverflow_docs, slack_docs, blogs_docs, registry_cells_docs, _import_baseline] + python_code_tasks
# )

(
_check_seed_baseline
>> [stackoverflow_docs, blogs_docs, registry_cells_docs, _import_baseline] + python_code_tasks + html_tasks
)
_check_seed_baseline >> markdown_tasks + python_code_tasks + html_tasks + [_import_baseline]


ask_astro_load_bulk()
Binary file modified airflow/include/airflow_provider_weaviate-0.0.1-py3-none-any.whl
Binary file not shown.
34 changes: 16 additions & 18 deletions airflow/include/tasks/extract/blogs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from datetime import datetime
import datetime
from urllib.parse import urljoin

import pandas as pd
import requests
Expand All @@ -13,7 +14,7 @@
page_url = base_url + "/blog/{page}/#archive"


def extract_astro_blogs(blog_cutoff_date: datetime) -> list[pd.DataFrame]:
def extract_astro_blogs(blog_cutoff_date: datetime.date) -> list[pd.DataFrame]:
"""
This task downloads Blogs from the Astronomer website and returns a list of pandas dataframes. Return
type is a list in order to map to upstream dynamic tasks.
Expand All @@ -30,37 +31,34 @@ def extract_astro_blogs(blog_cutoff_date: datetime) -> list[pd.DataFrame]:

headers = {}
links = []
dates = []
page = 1

response = requests.get(page_url.format(page=page), headers=headers)
while response.ok:
soup = BeautifulSoup(response.text, "lxml")
cards = soup.find_all(class_="post-card__cover")
card_links = [base_url + card.find("a", href=True)["href"] for card in cards]
links.extend(card_links)
meta = soup.find_all(class_="post-card__meta")
dates.extend([post.find("time")["datetime"] for post in meta])
for card in soup.find_all(class_="post-card__meta"):
blog_date = datetime.datetime.strptime(card.find("time")["datetime"], "%Y-%m-%dT%H:%M:%S.%fZ")
if blog_date.date() >= blog_cutoff_date:
url = urljoin(base_url, card.find("a", href=True)["href"])
response = requests.head(url, allow_redirects=True)
if response.ok:
links.append(
{
"docLink": response.url,
"title": card.find(class_="title").get_text(),
}
)

page = page + 1
response = requests.get(page_url.format(page=page), headers=headers)

df = pd.DataFrame(zip(links, dates), columns=["docLink", "date"])

df["date"] = pd.to_datetime(df["date"]).dt.date
df = df[df["date"] > blog_cutoff_date.date()]
df.drop("date", inplace=True, axis=1)
df = pd.DataFrame(links)
df.drop_duplicates(inplace=True)

df["content"] = df["docLink"].apply(lambda x: requests.get(x).content)
df["title"] = df["content"].apply(
lambda x: BeautifulSoup(x, "lxml").find(class_="post-card__meta").find(class_="title").get_text()
)

df["content"] = df["content"].apply(lambda x: BeautifulSoup(x, "lxml").find(class_="prose").get_text())
df["content"] = df.apply(lambda x: blog_format.format(title=x.title, content=x.content), axis=1)

df.drop("title", axis=1, inplace=True)
df["sha"] = df["content"].apply(generate_uuid5)
df["docSource"] = "astro blog"
df.reset_index(drop=True, inplace=True)
Expand Down
66 changes: 36 additions & 30 deletions airflow/include/tasks/extract/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,19 @@ def extract_github_python(source: dict, github_conn_id: str) -> pd.DataFrame:
return df


def extract_github_issues(repo_base: str, github_conn_id: str) -> pd.DataFrame:
def extract_github_issues(source: dict, github_conn_id: str) -> pd.DataFrame:
"""
This task downloads github issues as markdown documents in a pandas dataframe. Text from templated
auto responses for issues are removed while building a markdown document for each issue.
auto responses for issues are removed while building a markdown document for each issue. Extraction will
only pull issues after a provided "cutoff_date" AND "cutoff_issue_number".
param repo_base: The name of of organization/repository (ie. "apache/airflow") from which to extract
issues.
type repo_base: str
param source: A dictionary specifying what to ingest.
example:
{"repo_base": "apache/airflow",
"cutoff_date": datetime.date.today() - relativedelta(months=1),
"cutoff_issue_number": 30000
}
type repo_base: dict
param github_conn_id: The connection ID to use with the GithubHook
param github_conn_id: str
Expand All @@ -202,8 +207,8 @@ def extract_github_issues(repo_base: str, github_conn_id: str) -> pd.DataFrame:

gh_hook = GithubHook(github_conn_id)

repo = gh_hook.client.get_repo(repo_base)
issues = repo.get_issues()
repo = gh_hook.client.get_repo(source["repo_base"])
issues = repo.get_issues(state="all")

issue_autoresponse_text = "Thanks for opening your first issue here!"
pr_autoresponse_text = "Congratulations on your first Pull Request and welcome to the Apache Airflow community!"
Expand Down Expand Up @@ -243,30 +248,31 @@ def extract_github_issues(repo_base: str, github_conn_id: str) -> pd.DataFrame:

while page:
for issue in page:
print(issue.number)
comments = []
for comment in issue.get_comments():
if not any(substring in comment.body for substring in drop_content):
comments.append(
comment_markdown_template.format(
user=comment.user.login, date=issue.created_at.strftime("%m-%d-%Y"), body=comment.body
if issue.updated_at.date() >= source["cutoff_date"] and issue.number >= source["cutoff_issue_number"]:
print(issue.number)
comments = []
for comment in issue.get_comments():
if not any(substring in comment.body for substring in drop_content):
comments.append(
comment_markdown_template.format(
user=comment.user.login, date=issue.created_at.strftime("%m-%d-%Y"), body=comment.body
)
)
)
downloaded_docs.append(
{
"docLink": issue.html_url,
"sha": "",
"content": issue_markdown_template.format(
title=issue.title,
date=issue.created_at.strftime("%m-%d-%Y"),
user=issue.user.login,
state=issue.state,
body=issue.body,
comments="\n".join(comments),
),
"docSource": f"{repo_base}/issues",
}
)
downloaded_docs.append(
{
"docLink": issue.html_url,
"sha": "",
"content": issue_markdown_template.format(
title=issue.title,
date=issue.created_at.strftime("%m-%d-%Y"),
user=issue.user.login,
state=issue.state,
body=issue.body,
comments="\n".join(comments),
),
"docSource": f"{source['repo_base']}/issues",
}
)
page_num = page_num + 1
page = issues.get_page(page_num)

Expand Down

0 comments on commit 76ae2ae

Please sign in to comment.