-
Notifications
You must be signed in to change notification settings - Fork 50
/
Copy pathask-astro-load-github.py
68 lines (53 loc) · 2.17 KB
/
ask-astro-load-github.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import datetime
import os
from include.utils.slack import send_failure_notification
from airflow.decorators import dag, task
from airflow.providers.weaviate.operators.weaviate import WeaviateDocumentIngestOperator
ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")
_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
_GITHUB_CONN_ID = "github_ro"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")
_GITHUB_ISSUE_CUTOFF_DATE = os.environ.get("GITHUB_ISSUE_CUTOFF_DATE", "2022-1-1")
markdown_docs_sources = [
{"doc_dir": "", "repo_base": "OpenLineage/docs"},
{"doc_dir": "", "repo_base": "OpenLineage/OpenLineage"},
]
issues_docs_sources = [
"apache/airflow",
]
default_args = {"retries": 3, "retry_delay": 30}
schedule_interval = os.environ.get("INGESTION_SCHEDULE", "0 5 * * 2") if ask_astro_env == "prod" else None
@dag(
schedule_interval=schedule_interval,
start_date=datetime.datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
on_failure_callback=send_failure_notification(
dag_id="{{ dag.dag_id }}", execution_date="{{ dag_run.execution_date }}"
),
)
def ask_astro_load_github():
"""
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""
from include.tasks import chunking_utils
from include.tasks.extract import github
md_docs = (
task(github.extract_github_markdown)
.partial(github_conn_id=_GITHUB_CONN_ID)
.expand(source=markdown_docs_sources)
)
split_md_docs = task(chunking_utils.split_markdown).expand(dfs=[md_docs])
_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
).expand(input_data=[split_md_docs])
ask_astro_load_github()