Skip to content

Commit 2d1ab84

Browse files
committed
Remove example_dags support from list_py_file_paths - it's now a bundle!
1 parent 7758680 commit 2d1ab84

File tree

5 files changed

+99
-88
lines changed

5 files changed

+99
-88
lines changed

airflow/dag_processing/bundles/manager.py

+17
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,23 @@ def parse_config(self) -> None:
6464
"Bundle config is not a list. Check config value"
6565
" for section `dag_bundles` and key `backends`."
6666
)
67+
68+
# example dags!
69+
if conf.getboolean("core", "LOAD_EXAMPLES"):
70+
from airflow import example_dags
71+
72+
example_dag_folder = next(iter(example_dags.__path__))
73+
backends.append(
74+
{
75+
"name": "example_dags",
76+
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
77+
"kwargs": {
78+
"local_folder": example_dag_folder,
79+
"refresh_interval": conf.getint("scheduler", "dag_dir_list_interval"),
80+
},
81+
}
82+
)
83+
6784
seen = set()
6885
for cfg in backends:
6986
name = cfg["name"]

airflow/models/dagbag.py

+11-5
Original file line numberDiff line numberDiff line change
@@ -568,11 +568,17 @@ def collect_dags(
568568

569569
# Ensure dag_folder is a str -- it may have been a pathlib.Path
570570
dag_folder = correct_maybe_zipped(str(dag_folder))
571-
for filepath in list_py_file_paths(
572-
dag_folder,
573-
safe_mode=safe_mode,
574-
include_examples=include_examples,
575-
):
571+
572+
files_to_parse = list_py_file_paths(dag_folder, safe_mode=safe_mode)
573+
574+
if include_examples:
575+
from airflow import example_dags
576+
577+
example_dag_folder = next(iter(example_dags.__path__))
578+
579+
files_to_parse.extend(list_py_file_paths(example_dag_folder, safe_mode=safe_mode))
580+
581+
for filepath in files_to_parse:
576582
try:
577583
file_parse_start_dttm = timezone.utcnow()
578584
found_dags = self.process_file(filepath, only_if_updated=only_if_updated, safe_mode=safe_mode)

airflow/utils/file.py

-9
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,6 @@ def find_path_from_directory(
245245
def list_py_file_paths(
246246
directory: str | os.PathLike[str] | None,
247247
safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE", fallback=True),
248-
include_examples: bool | None = None,
249248
) -> list[str]:
250249
"""
251250
Traverse a directory and look for Python files.
@@ -255,23 +254,15 @@ def list_py_file_paths(
255254
contains Airflow DAG definitions. If not provided, use the
256255
core.DAG_DISCOVERY_SAFE_MODE configuration setting. If not set, default
257256
to safe.
258-
:param include_examples: include example DAGs
259257
:return: a list of paths to Python files in the specified directory
260258
"""
261-
if include_examples is None:
262-
include_examples = conf.getboolean("core", "LOAD_EXAMPLES")
263259
file_paths: list[str] = []
264260
if directory is None:
265261
file_paths = []
266262
elif os.path.isfile(directory):
267263
file_paths = [str(directory)]
268264
elif os.path.isdir(directory):
269265
file_paths.extend(find_dag_file_paths(directory, safe_mode))
270-
if include_examples:
271-
from airflow import example_dags
272-
273-
example_dag_folder = next(iter(example_dags.__path__))
274-
file_paths.extend(list_py_file_paths(example_dag_folder, safe_mode, include_examples=False))
275266
return file_paths
276267

277268

tests/dag_processing/bundles/test_dag_bundle_manager.py

+15-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@
6868
)
6969
def test_parse_bundle_config(value, expected):
7070
"""Test that bundle_configs are read from configuration."""
71-
envs = {"AIRFLOW__DAG_BUNDLES__BACKENDS": value} if value else {}
71+
envs = {"AIRFLOW__CORE__LOAD_EXAMPLES": "False"}
72+
if value:
73+
envs["AIRFLOW__DAG_BUNDLES__BACKENDS"] = value
7274
cm = nullcontext()
7375
exp_fail = False
7476
if isinstance(expected, str):
@@ -133,6 +135,7 @@ def clear_db():
133135

134136

135137
@pytest.mark.db_test
138+
@conf_vars({("core", "LOAD_EXAMPLES"): "False"})
136139
def test_sync_bundles_to_db(clear_db):
137140
def _get_bundle_names_and_active():
138141
with create_session() as session:
@@ -167,3 +170,14 @@ def test_view_url(version):
167170
with patch.object(BaseDagBundle, "view_url") as view_url_mock:
168171
bundle_manager.view_url("my-test-bundle", version=version)
169172
view_url_mock.assert_called_once_with(version=version)
173+
174+
175+
def test_example_dags_bundle_added():
176+
manager = DagBundlesManager()
177+
manager.parse_config()
178+
assert "example_dags" in manager._bundle_config
179+
180+
with conf_vars({("core", "LOAD_EXAMPLES"): "False"}):
181+
manager = DagBundlesManager()
182+
manager.parse_config()
183+
assert "example_dags" not in manager._bundle_config

tests/jobs/test_scheduler_job.py

+56-73
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@
107107
ELASTIC_DAG_FILE = os.path.join(PERF_DAGS_FOLDER, "elastic_dag.py")
108108

109109
TEST_DAG_FOLDER = os.environ["AIRFLOW__CORE__DAGS_FOLDER"]
110+
EXAMPLE_DAGS_FOLDER = airflow.example_dags.__path__[0]
110111
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
111112
DEFAULT_LOGICAL_DATE = timezone.coerce_datetime(DEFAULT_DATE)
112113
TRY_NUMBER = 1
@@ -119,12 +120,6 @@ def disable_load_example():
119120
yield
120121

121122

122-
@pytest.fixture
123-
def load_examples():
124-
with conf_vars({("core", "load_examples"): "True"}):
125-
yield
126-
127-
128123
# Patch the MockExecutor into the dict of known executors in the Loader
129124
@contextlib.contextmanager
130125
def _loader_mock(mock_executors):
@@ -3562,21 +3557,7 @@ def test_list_py_file_paths(self):
35623557
if file_name.endswith((".py", ".zip")):
35633558
if file_name not in ignored_files:
35643559
expected_files.add(f"{root}/{file_name}")
3565-
for file_path in list_py_file_paths(TEST_DAG_FOLDER, include_examples=False):
3566-
detected_files.add(file_path)
3567-
assert detected_files == expected_files
3568-
3569-
ignored_files = {
3570-
"helper.py",
3571-
}
3572-
example_dag_folder = airflow.example_dags.__path__[0]
3573-
for root, _, files in os.walk(example_dag_folder):
3574-
for file_name in files:
3575-
if file_name.endswith((".py", ".zip")):
3576-
if file_name not in ["__init__.py"] and file_name not in ignored_files:
3577-
expected_files.add(os.path.join(root, file_name))
3578-
detected_files.clear()
3579-
for file_path in list_py_file_paths(TEST_DAG_FOLDER, include_examples=True):
3560+
for file_path in list_py_file_paths(TEST_DAG_FOLDER):
35803561
detected_files.add(file_path)
35813562
assert detected_files == expected_files
35823563

@@ -5662,9 +5643,9 @@ def test_find_and_purge_zombies_nothing(self):
56625643
self.job_runner._find_and_purge_zombies()
56635644
executor.callback_sink.send.assert_not_called()
56645645

5665-
def test_find_and_purge_zombies(self, load_examples, session, testing_dag_bundle):
5666-
dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False)
5667-
5646+
def test_find_and_purge_zombies(self, session, testing_dag_bundle):
5647+
dagfile = os.path.join(EXAMPLE_DAGS_FOLDER, "example_branch_operator.py")
5648+
dagbag = DagBag(dagfile)
56685649
dag = dagbag.get_dag("example_branch_operator")
56695650
DAG.bulk_write_to_db("testing", None, [dag])
56705651
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
@@ -5718,68 +5699,70 @@ def test_find_and_purge_zombies(self, load_examples, session, testing_dag_bundle
57185699
assert callback_request.ti.run_id == ti.run_id
57195700
assert callback_request.ti.map_index == ti.map_index
57205701

5721-
def test_zombie_message(self, load_examples, testing_dag_bundle):
5702+
def test_zombie_message(self, testing_dag_bundle, session):
57225703
"""
57235704
Check that the zombie message comes out as expected
57245705
"""
57255706

57265707
dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False)
5727-
with create_session() as session:
5728-
session.query(Job).delete()
5729-
dag = dagbag.get_dag("example_branch_operator")
5730-
DAG.bulk_write_to_db("testing", None, [dag])
5731-
5732-
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
5733-
triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
5734-
dag_run = dag.create_dagrun(
5735-
state=DagRunState.RUNNING,
5736-
logical_date=DEFAULT_DATE,
5737-
run_type=DagRunType.SCHEDULED,
5738-
session=session,
5739-
data_interval=data_interval,
5740-
**triggered_by_kwargs,
5741-
)
5708+
dagfile = os.path.join(EXAMPLE_DAGS_FOLDER, "example_branch_operator.py")
5709+
dagbag = DagBag(dagfile)
5710+
dag = dagbag.get_dag("example_branch_operator")
5711+
DAG.bulk_write_to_db("testing", None, [dag])
57425712

5743-
scheduler_job = Job(executor=MockExecutor())
5744-
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
5745-
self.job_runner.processor_agent = mock.MagicMock()
5713+
session.query(Job).delete()
57465714

5747-
# We will provision 2 tasks so we can check we only find zombies from this scheduler
5748-
tasks_to_setup = ["branching", "run_this_first"]
5715+
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
5716+
triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
5717+
dag_run = dag.create_dagrun(
5718+
state=DagRunState.RUNNING,
5719+
logical_date=DEFAULT_DATE,
5720+
run_type=DagRunType.SCHEDULED,
5721+
session=session,
5722+
data_interval=data_interval,
5723+
**triggered_by_kwargs,
5724+
)
57495725

5750-
for task_id in tasks_to_setup:
5751-
task = dag.get_task(task_id=task_id)
5752-
ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
5753-
ti.queued_by_job_id = 999
5726+
scheduler_job = Job(executor=MockExecutor())
5727+
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
5728+
self.job_runner.processor_agent = mock.MagicMock()
57545729

5755-
session.add(ti)
5756-
session.flush()
5730+
# We will provision 2 tasks so we can check we only find zombies from this scheduler
5731+
tasks_to_setup = ["branching", "run_this_first"]
57575732

5758-
assert task.task_id == "run_this_first" # Make sure we have the task/ti we expect
5733+
for task_id in tasks_to_setup:
5734+
task = dag.get_task(task_id=task_id)
5735+
ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
5736+
ti.queued_by_job_id = 999
57595737

5760-
ti.queued_by_job_id = scheduler_job.id
5738+
session.add(ti)
57615739
session.flush()
57625740

5763-
zombie_message = self.job_runner._generate_zombie_message_details(ti)
5764-
assert zombie_message == {
5765-
"DAG Id": "example_branch_operator",
5766-
"Task Id": "run_this_first",
5767-
"Run Id": "scheduled__2016-01-01T00:00:00+00:00",
5768-
}
5769-
5770-
ti.hostname = "10.10.10.10"
5771-
ti.map_index = 2
5772-
ti.external_executor_id = "abcdefg"
5773-
5774-
zombie_message = self.job_runner._generate_zombie_message_details(ti)
5775-
assert zombie_message == {
5776-
"DAG Id": "example_branch_operator",
5777-
"Task Id": "run_this_first",
5778-
"Run Id": "scheduled__2016-01-01T00:00:00+00:00",
5779-
"Hostname": "10.10.10.10",
5780-
"Map Index": 2,
5781-
"External Executor Id": "abcdefg",
5782-
}
5741+
assert task.task_id == "run_this_first" # Make sure we have the task/ti we expect
5742+
5743+
ti.queued_by_job_id = scheduler_job.id
5744+
session.flush()
5745+
5746+
zombie_message = self.job_runner._generate_zombie_message_details(ti)
5747+
assert zombie_message == {
5748+
"DAG Id": "example_branch_operator",
5749+
"Task Id": "run_this_first",
5750+
"Run Id": "scheduled__2016-01-01T00:00:00+00:00",
5751+
}
5752+
5753+
ti.hostname = "10.10.10.10"
5754+
ti.map_index = 2
5755+
ti.external_executor_id = "abcdefg"
5756+
5757+
zombie_message = self.job_runner._generate_zombie_message_details(ti)
5758+
assert zombie_message == {
5759+
"DAG Id": "example_branch_operator",
5760+
"Task Id": "run_this_first",
5761+
"Run Id": "scheduled__2016-01-01T00:00:00+00:00",
5762+
"Hostname": "10.10.10.10",
5763+
"Map Index": 2,
5764+
"External Executor Id": "abcdefg",
5765+
}
57835766

57845767
def test_find_zombies_handle_failure_callbacks_are_correctly_passed_to_dag_processor(
57855768
self, testing_dag_bundle

0 commit comments

Comments
 (0)