From 58fdce6a170b11f0c96d90affd53dab0dff9cb55 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 2 Oct 2024 19:31:41 -0700 Subject: [PATCH] Remove DagRunNotBackfillDep (#42552) This is part of AIP-78. It is a dep that is designed to make tasks not run if they are in a backfill dag run. No longer needed. --- airflow/ti_deps/dependencies_deps.py | 2 - airflow/ti_deps/deps/dagrun_backfill_dep.py | 48 ------------------ tests/ti_deps/deps/test_dagrun_id_dep.py | 54 --------------------- 3 files changed, 104 deletions(-) delete mode 100644 airflow/ti_deps/deps/dagrun_backfill_dep.py delete mode 100644 tests/ti_deps/deps/test_dagrun_id_dep.py diff --git a/airflow/ti_deps/dependencies_deps.py b/airflow/ti_deps/dependencies_deps.py index 30a11366c3be88..44d6bfc5c7db79 100644 --- a/airflow/ti_deps/dependencies_deps.py +++ b/airflow/ti_deps/dependencies_deps.py @@ -23,7 +23,6 @@ ) from airflow.ti_deps.deps.dag_ti_slots_available_dep import DagTISlotsAvailableDep from airflow.ti_deps.deps.dag_unpaused_dep import DagUnpausedDep -from airflow.ti_deps.deps.dagrun_backfill_dep import DagRunNotBackfillDep from airflow.ti_deps.deps.dagrun_exists_dep import DagrunRunningDep from airflow.ti_deps.deps.exec_date_after_start_date_dep import ExecDateAfterStartDateDep from airflow.ti_deps.deps.pool_slots_available_dep import PoolSlotsAvailableDep @@ -77,7 +76,6 @@ TaskConcurrencyDep(), PoolSlotsAvailableDep(), DagrunRunningDep(), - DagRunNotBackfillDep(), DagUnpausedDep(), ExecDateAfterStartDateDep(), TaskNotRunningDep(), diff --git a/airflow/ti_deps/deps/dagrun_backfill_dep.py b/airflow/ti_deps/deps/dagrun_backfill_dep.py deleted file mode 100644 index ba359929a8cb6f..00000000000000 --- a/airflow/ti_deps/deps/dagrun_backfill_dep.py +++ /dev/null @@ -1,48 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""This module defines dep for making sure DagRun not a backfill.""" - -from __future__ import annotations - -from airflow.ti_deps.deps.base_ti_dep import BaseTIDep -from airflow.utils.session import provide_session -from airflow.utils.types import DagRunType - - -class DagRunNotBackfillDep(BaseTIDep): - """Dep for valid DagRun run_id to schedule from scheduler.""" - - NAME = "DagRun is not backfill job" - IGNORABLE = True - - @provide_session - def _get_dep_statuses(self, ti, session, dep_context=None): - """ - Determine if the DagRun is valid for scheduling from scheduler. - - :param ti: the task instance to get the dependency status for - :param session: database session - :param dep_context: the context for which this dependency should be evaluated for - :return: True if DagRun is valid for scheduling from scheduler. - """ - dagrun = ti.get_dagrun(session) - - if dagrun.run_type == DagRunType.BACKFILL_JOB: - yield self._failing_status( - reason=f"Task's DagRun run_type is {dagrun.run_type} and cannot be run by the scheduler" - ) diff --git a/tests/ti_deps/deps/test_dagrun_id_dep.py b/tests/ti_deps/deps/test_dagrun_id_dep.py deleted file mode 100644 index 6aced00b8efe84..00000000000000 --- a/tests/ti_deps/deps/test_dagrun_id_dep.py +++ /dev/null @@ -1,54 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from unittest.mock import Mock - -import pytest - -from airflow.models.dagrun import DagRun -from airflow.ti_deps.deps.dagrun_backfill_dep import DagRunNotBackfillDep -from airflow.utils.types import DagRunType - -pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] - - -class TestDagrunRunningDep: - def test_run_id_is_backfill(self): - """ - Task instances whose run_id is a backfill dagrun run_id should fail this dep. - """ - dagrun = DagRun() - dagrun.run_id = "anything" - dagrun.run_type = DagRunType.BACKFILL_JOB - ti = Mock(get_dagrun=Mock(return_value=dagrun)) - assert not DagRunNotBackfillDep().is_met(ti=ti) - - def test_run_id_is_not_backfill(self): - """ - Task instances whose run_id is not a backfill run_id should pass this dep. - """ - dagrun = DagRun() - dagrun.run_type = "custom_type" - ti = Mock(get_dagrun=Mock(return_value=dagrun)) - assert DagRunNotBackfillDep().is_met(ti=ti) - - dagrun = DagRun() - dagrun.run_id = None - ti = Mock(get_dagrun=Mock(return_value=dagrun)) - assert DagRunNotBackfillDep().is_met(ti=ti)