Skip to content

Commit 72ab1d5

Browse files
AIP-66: Add support for parsing DAG bundles (apache#45371)
Let's start parsing DAG bundles! This moves us away from parsing a single local directory to being able to parse many different bundles, including optional support for versioning. This is just the basics - it keeps the parsing loop largely untouched. We still have a single list of "dag files" to parse, and queue of them. However, instead of just a path, this list and queue now contain `DagFilePath`s, which hold both a local path and the bundle its from. There are a number of things that are not fully functional at this stage, like versioned callbacks. These will be refactored later. There is enough churn with the basics (particularly with the number of test changes). Co-authored-by: Daniel Standish <[email protected]>
1 parent 9f74a65 commit 72ab1d5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+1043
-682
lines changed

Diff for: airflow/cli/commands/local_commands/dag_processor_command.py

-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner:
3939
job=Job(),
4040
processor=DagFileProcessorManager(
4141
processor_timeout=processor_timeout_seconds,
42-
dag_directory=args.subdir,
4342
max_runs=args.num_runs,
4443
),
4544
)

Diff for: airflow/dag_processing/bundles/manager.py

+17
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,23 @@ def parse_config(self) -> None:
6464
"Bundle config is not a list. Check config value"
6565
" for section `dag_bundles` and key `backends`."
6666
)
67+
68+
# example dags!
69+
if conf.getboolean("core", "LOAD_EXAMPLES"):
70+
from airflow import example_dags
71+
72+
example_dag_folder = next(iter(example_dags.__path__))
73+
backends.append(
74+
{
75+
"name": "example_dags",
76+
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
77+
"kwargs": {
78+
"local_folder": example_dag_folder,
79+
"refresh_interval": conf.getint("scheduler", "dag_dir_list_interval"),
80+
},
81+
}
82+
)
83+
6784
seen = set()
6885
for cfg in backends:
6986
name = cfg["name"]

Diff for: airflow/dag_processing/collection.py

+13-3
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,14 @@
7474
log = logging.getLogger(__name__)
7575

7676

77-
def _create_orm_dags(dags: Iterable[MaybeSerializedDAG], *, session: Session) -> Iterator[DagModel]:
77+
def _create_orm_dags(
78+
bundle_name: str, dags: Iterable[MaybeSerializedDAG], *, session: Session
79+
) -> Iterator[DagModel]:
7880
for dag in dags:
7981
orm_dag = DagModel(dag_id=dag.dag_id)
8082
if dag.is_paused_upon_creation is not None:
8183
orm_dag.is_paused = dag.is_paused_upon_creation
84+
orm_dag.bundle_name = bundle_name
8285
log.info("Creating ORM DAG for %s", dag.dag_id)
8386
session.add(orm_dag)
8487
yield orm_dag
@@ -270,6 +273,8 @@ def _update_import_errors(files_parsed: set[str], import_errors: dict[str, str],
270273

271274

272275
def update_dag_parsing_results_in_db(
276+
bundle_name: str,
277+
bundle_version: str | None,
273278
dags: Collection[MaybeSerializedDAG],
274279
import_errors: dict[str, str],
275280
warnings: set[DagWarning],
@@ -307,7 +312,7 @@ def update_dag_parsing_results_in_db(
307312
)
308313
log.debug("Calling the DAG.bulk_sync_to_db method")
309314
try:
310-
DAG.bulk_write_to_db(dags, session=session)
315+
DAG.bulk_write_to_db(bundle_name, bundle_version, dags, session=session)
311316
# Write Serialized DAGs to DB, capturing errors
312317
# Write Serialized DAGs to DB, capturing errors
313318
for dag in dags:
@@ -346,6 +351,8 @@ class DagModelOperation(NamedTuple):
346351
"""Collect DAG objects and perform database operations for them."""
347352

348353
dags: dict[str, MaybeSerializedDAG]
354+
bundle_name: str
355+
bundle_version: str | None
349356

350357
def find_orm_dags(self, *, session: Session) -> dict[str, DagModel]:
351358
"""Find existing DagModel objects from DAG objects."""
@@ -365,7 +372,8 @@ def add_dags(self, *, session: Session) -> dict[str, DagModel]:
365372
orm_dags.update(
366373
(model.dag_id, model)
367374
for model in _create_orm_dags(
368-
(dag for dag_id, dag in self.dags.items() if dag_id not in orm_dags),
375+
bundle_name=self.bundle_name,
376+
dags=(dag for dag_id, dag in self.dags.items() if dag_id not in orm_dags),
369377
session=session,
370378
)
371379
)
@@ -430,6 +438,8 @@ def update_dags(
430438
dm.timetable_summary = dag.timetable.summary
431439
dm.timetable_description = dag.timetable.description
432440
dm.asset_expression = dag.timetable.asset_condition.as_expression()
441+
dm.bundle_name = self.bundle_name
442+
dm.latest_bundle_version = self.bundle_version
433443

434444
last_automated_run: DagRun | None = run_info.latest_runs.get(dag.dag_id)
435445
if last_automated_run is None:

0 commit comments

Comments
 (0)