diff --git a/airflow/example_dags/example_subdag_operator.py b/airflow/example_dags/example_subdag_operator.py index 84a303815af66..79d369d638d6a 100644 --- a/airflow/example_dags/example_subdag_operator.py +++ b/airflow/example_dags/example_subdag_operator.py @@ -19,18 +19,19 @@ """Example DAG demonstrating the usage of the SubDagOperator.""" # [START example_subdag_operator] +import datetime + from airflow import DAG from airflow.example_dags.subdags.subdag import subdag from airflow.operators.empty import EmptyOperator from airflow.operators.subdag import SubDagOperator -from airflow.utils.dates import days_ago DAG_NAME = 'example_subdag_operator' with DAG( dag_id=DAG_NAME, default_args={"retries": 2}, - start_date=days_ago(2), + start_date=datetime.datetime(2022, 1, 1), schedule_interval="@once", tags=['example'], ) as dag: diff --git a/tests/api/common/test_delete_dag.py b/tests/api/common/test_delete_dag.py index 2830020d29114..cf51856409cd2 100644 --- a/tests/api/common/test_delete_dag.py +++ b/tests/api/common/test_delete_dag.py @@ -16,14 +16,13 @@ # specific language governing permissions and limitations # under the License. - import pytest from airflow import models from airflow.api.common.delete_dag import delete_dag from airflow.exceptions import AirflowException, DagNotFound from airflow.operators.empty import EmptyOperator -from airflow.utils.dates import days_ago +from airflow.utils import timezone from airflow.utils.session import create_session from airflow.utils.state import State from airflow.utils.types import DagRunType @@ -73,11 +72,11 @@ def setup_dag_models(self, for_sub_dag=False): task = EmptyOperator( task_id='dummy', - dag=models.DAG(dag_id=self.key, default_args={'start_date': days_ago(2)}), + dag=models.DAG(dag_id=self.key, default_args={'start_date': timezone.datetime(2022, 1, 1)}), owner='airflow', ) - test_date = days_ago(1) + test_date = timezone.datetime(2022, 1, 1) with create_session() as session: session.add(DM(dag_id=self.key, fileloc=self.dag_file_path, is_subdag=for_sub_dag)) dr = DR(dag_id=self.key, run_type=DagRunType.MANUAL, run_id="test", execution_date=test_date) diff --git a/tests/api/common/test_mark_tasks.py b/tests/api/common/test_mark_tasks.py index 77064560f2e27..fee33a50e88e6 100644 --- a/tests/api/common/test_mark_tasks.py +++ b/tests/api/common/test_mark_tasks.py @@ -16,7 +16,7 @@ # specific language governing permissions and limitations # under the License. -from datetime import timedelta +import datetime from typing import Callable import pytest @@ -34,7 +34,6 @@ ) from airflow.models import DagRun from airflow.utils import timezone -from airflow.utils.dates import days_ago from airflow.utils.session import create_session, provide_session from airflow.utils.state import State from airflow.utils.types import DagRunType @@ -62,12 +61,12 @@ def create_dags(cls, dagbag): cls.dag2 = dagbag.get_dag('example_subdag_operator') cls.dag3 = dagbag.get_dag('example_trigger_target_dag') cls.dag4 = dagbag.get_dag('test_mapped_classic') - cls.execution_dates = [days_ago(2), days_ago(1)] + cls.execution_dates = [timezone.datetime(2022, 1, 1), timezone.datetime(2022, 1, 2)] start_date3 = cls.dag3.start_date cls.dag3_execution_dates = [ start_date3, - start_date3 + timedelta(days=1), - start_date3 + timedelta(days=2), + start_date3 + datetime.timedelta(days=1), + start_date3 + datetime.timedelta(days=2), ] @pytest.fixture(autouse=True) @@ -76,7 +75,7 @@ def setup(self): clear_db_runs() drs = _create_dagruns( self.dag1, - [_DagRunInfo(d, (d, d + timedelta(days=1))) for d in self.execution_dates], + [_DagRunInfo(d, (d, d + datetime.timedelta(days=1))) for d in self.execution_dates], state=State.RUNNING, run_type=DagRunType.SCHEDULED, ) @@ -88,7 +87,7 @@ def setup(self): [ _DagRunInfo( self.dag2.start_date, - (self.dag2.start_date, self.dag2.start_date + timedelta(days=1)), + (self.dag2.start_date, self.dag2.start_date + datetime.timedelta(days=1)), ), ], state=State.RUNNING, @@ -112,7 +111,7 @@ def setup(self): [ _DagRunInfo( self.dag4.start_date, - (self.dag4.start_date, self.dag4.start_date + timedelta(days=1)), + (self.dag4.start_date, self.dag4.start_date + datetime.timedelta(days=1)), ) ], state=State.SUCCESS, @@ -482,7 +481,11 @@ def setup_class(cls): cls.dag1.sync_to_db() cls.dag2 = dagbag.dags['example_subdag_operator'] cls.dag2.sync_to_db() - cls.execution_dates = [days_ago(2), days_ago(1), days_ago(0)] + cls.execution_dates = [ + timezone.datetime(2022, 1, 1), + timezone.datetime(2022, 1, 2), + timezone.datetime(2022, 1, 3), + ] def setup_method(self): clear_db_runs() diff --git a/tests/api_connexion/endpoints/test_extra_link_endpoint.py b/tests/api_connexion/endpoints/test_extra_link_endpoint.py index a1eb3b8a543f1..c209f661a8f66 100644 --- a/tests/api_connexion/endpoints/test_extra_link_endpoint.py +++ b/tests/api_connexion/endpoints/test_extra_link_endpoint.py @@ -14,11 +14,11 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + import os from urllib.parse import quote_plus import pytest -from parameterized import parameterized from airflow import DAG from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP @@ -28,8 +28,8 @@ from airflow.plugins_manager import AirflowPlugin from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator from airflow.security import permissions +from airflow.utils import timezone from airflow.utils.state import DagRunState -from airflow.utils.timezone import datetime from airflow.utils.types import DagRunType from tests.test_utils.api_connexion_utils import create_user, delete_user from tests.test_utils.db import clear_db_runs, clear_db_xcom @@ -61,7 +61,7 @@ def configured_app(minimal_app_for_api): class TestGetExtraLinks: @pytest.fixture(autouse=True) def setup_attrs(self, configured_app, session) -> None: - self.default_time = datetime(2020, 1, 1) + self.default_time = timezone.datetime(2020, 1, 1) clear_db_runs() clear_db_xcom() @@ -90,40 +90,35 @@ def teardown_method(self) -> None: clear_db_xcom() def _create_dag(self): - with DAG( - dag_id="TEST_DAG_ID", - default_args=dict( - start_date=self.default_time, - ), - ) as dag: + with DAG(dag_id="TEST_DAG_ID", default_args={"start_date": self.default_time}) as dag: BigQueryExecuteQueryOperator(task_id="TEST_SINGLE_QUERY", sql="SELECT 1") BigQueryExecuteQueryOperator(task_id="TEST_MULTIPLE_QUERY", sql=["SELECT 1", "SELECT 2"]) return dag - @parameterized.expand( + @pytest.mark.parametrize( + "url, expected_title, expected_detail", [ - ( - "missing_dag", + pytest.param( "/api/v1/dags/INVALID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_SINGLE_QUERY/links", "DAG not found", 'DAG with ID = "INVALID" not found', + id="missing_dag", ), - ( - "missing_dag_run", + pytest.param( "/api/v1/dags/TEST_DAG_ID/dagRuns/INVALID/taskInstances/TEST_SINGLE_QUERY/links", "DAG Run not found", 'DAG Run with ID = "INVALID" not found', + id="missing_dag_run", ), - ( - "missing_task", + pytest.param( "/api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/INVALID/links", "Task not found", 'Task with ID = "INVALID" not found', + id="missing_task", ), - ] + ], ) - def test_should_respond_404(self, name, url, expected_title, expected_detail): - del name + def test_should_respond_404(self, url, expected_title, expected_detail): response = self.client.get(url, environ_overrides={'REMOTE_USER': "test"}) assert 404 == response.status_code diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py index ffcc8603888d9..1f4662b753397 100644 --- a/tests/cli/commands/test_task_command.py +++ b/tests/cli/commands/test_task_command.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# + import io import json import logging @@ -24,7 +24,6 @@ import unittest from argparse import ArgumentParser from contextlib import redirect_stdout -from datetime import datetime from unittest import mock import pytest @@ -38,14 +37,13 @@ from airflow.models import DagBag, DagRun, Pool, TaskInstance from airflow.models.serialized_dag import SerializedDagModel from airflow.utils import timezone -from airflow.utils.dates import days_ago from airflow.utils.session import create_session from airflow.utils.state import State from airflow.utils.types import DagRunType from tests.test_utils.config import conf_vars from tests.test_utils.db import clear_db_pools, clear_db_runs -DEFAULT_DATE = days_ago(1) +DEFAULT_DATE = timezone.datetime(2022, 1, 1) ROOT_FOLDER = os.path.realpath( os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir, os.pardir) ) @@ -374,7 +372,7 @@ def test_task_states_for_dag_run(self): dag2 = DagBag().dags['example_python_operator'] task2 = dag2.get_task(task_id='print_the_context') - default_date2 = timezone.make_aware(datetime(2016, 1, 9)) + default_date2 = timezone.datetime(2016, 1, 9) dag2.clear() dagrun = dag2.create_dagrun( state=State.RUNNING, @@ -417,7 +415,7 @@ def test_task_states_for_dag_run_when_dag_run_not_exists(self): task_states_for_dag_run should return an AirflowException when invalid dag id is passed """ with pytest.raises(DagRunNotFound): - default_date2 = timezone.make_aware(datetime(2016, 1, 9)) + default_date2 = timezone.datetime(2016, 1, 9) task_command.task_states_for_dag_run( self.parser.parse_args( [ @@ -455,7 +453,7 @@ def setUp(self) -> None: self.run_id = "test_run" self.dag_path = os.path.join(ROOT_FOLDER, "dags", "test_logging_in_dag.py") reset(self.dag_id) - self.execution_date = timezone.make_aware(datetime(2017, 1, 1)) + self.execution_date = timezone.datetime(2017, 1, 1) self.execution_date_str = self.execution_date.isoformat() self.task_args = ['tasks', 'run', self.dag_id, self.task_id, '--local', self.execution_date_str] self.log_dir = conf.get_mandatory_value('logging', 'base_log_folder') diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index 9b8716bded720..e2f9165131f2f 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -16,7 +16,6 @@ # specific language governing permissions and limitations # under the License. # - import datetime import os from unittest import mock @@ -34,7 +33,6 @@ from airflow.models.taskinstance import SimpleTaskInstance from airflow.operators.empty import EmptyOperator from airflow.utils import timezone -from airflow.utils.dates import days_ago from airflow.utils.session import create_session from airflow.utils.state import State from airflow.utils.types import DagRunType @@ -113,7 +111,7 @@ def test_dag_file_processor_sla_miss_callback(self, create_dummy_dag): # Create dag with a start of 1 day ago, but an sla of 0 # so we'll already have an sla_miss on the books. - test_start_date = days_ago(1) + test_start_date = timezone.utcnow() - datetime.timedelta(days=1) dag, task = create_dummy_dag( dag_id='test_sla_miss', task_id='dummy', @@ -142,7 +140,7 @@ def test_dag_file_processor_sla_miss_callback_invalid_sla(self, create_dummy_dag # Create dag with a start of 1 day ago, but an sla of 0 # so we'll already have an sla_miss on the books. # Pass anything besides a timedelta object to the sla argument. - test_start_date = days_ago(1) + test_start_date = timezone.utcnow() - datetime.timedelta(days=1) dag, task = create_dummy_dag( dag_id='test_sla_miss', task_id='dummy', @@ -170,7 +168,7 @@ def test_dag_file_processor_sla_miss_callback_sent_notification(self, create_dum # Create dag with a start of 2 days ago, but an sla of 1 day # ago so we'll already have an sla_miss on the books - test_start_date = days_ago(2) + test_start_date = timezone.utcnow() - datetime.timedelta(days=2) dag, task = create_dummy_dag( dag_id='test_sla_miss', task_id='dummy', @@ -206,7 +204,7 @@ def test_dag_file_processor_sla_miss_doesnot_raise_integrity_error(self, dag_mak # Create dag with a start of 2 days ago, but an sla of 1 day # ago so we'll already have an sla_miss on the books - test_start_date = days_ago(2) + test_start_date = timezone.utcnow() - datetime.timedelta(days=2) with dag_maker( dag_id='test_sla_miss', default_args={'start_date': test_start_date, 'sla': datetime.timedelta(days=1)}, @@ -247,7 +245,7 @@ def test_dag_file_processor_sla_miss_callback_exception(self, mock_stats_incr, c sla_callback = MagicMock(side_effect=RuntimeError('Could not call function')) - test_start_date = days_ago(2) + test_start_date = timezone.utcnow() - datetime.timedelta(days=1) dag, task = create_dummy_dag( dag_id='test_sla_miss', task_id='dummy', @@ -277,7 +275,7 @@ def test_dag_file_processor_only_collect_emails_from_sla_missed_tasks( ): session = settings.Session() - test_start_date = days_ago(2) + test_start_date = timezone.utcnow() - datetime.timedelta(days=1) email1 = 'test1@test.com' dag, task = create_dummy_dag( dag_id='test_sla_miss', @@ -317,7 +315,7 @@ def test_dag_file_processor_sla_miss_email_exception( # Mock the callback function so we can verify that it was not called mock_send_email.side_effect = RuntimeError('Could not send an email') - test_start_date = days_ago(2) + test_start_date = timezone.utcnow() - datetime.timedelta(days=1) dag, task = create_dummy_dag( dag_id='test_sla_miss', task_id='dummy', @@ -347,7 +345,7 @@ def test_dag_file_processor_sla_miss_deleted_task(self, create_dummy_dag): """ session = settings.Session() - test_start_date = days_ago(2) + test_start_date = timezone.utcnow() - datetime.timedelta(days=1) dag, task = create_dummy_dag( dag_id='test_sla_miss', task_id='dummy', diff --git a/tests/dags/test_default_views.py b/tests/dags/test_default_views.py index ca51c10e62795..6a1aefb4ee28e 100644 --- a/tests/dags/test_default_views.py +++ b/tests/dags/test_default_views.py @@ -15,10 +15,12 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + +import pendulum + from airflow.models import DAG -from airflow.utils.dates import days_ago -args = {'owner': 'airflow', 'retries': 3, 'start_date': days_ago(2)} +args = {'owner': 'airflow', 'retries': 3, 'start_date': pendulum.datetime(2022, 1, 1)} tree_dag = DAG( dag_id='test_tree_view', diff --git a/tests/dags/test_example_bash_operator.py b/tests/dags/test_example_bash_operator.py index 4e44f55ff6169..288667ad06b7b 100644 --- a/tests/dags/test_example_bash_operator.py +++ b/tests/dags/test_example_bash_operator.py @@ -15,20 +15,17 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from datetime import timedelta +import datetime from airflow.models import DAG from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator -from airflow.utils.dates import days_ago - -args = {'owner': 'airflow', 'retries': 3, 'start_date': days_ago(2)} dag = DAG( dag_id='test_example_bash_operator', - default_args=args, + default_args={'owner': 'airflow', 'retries': 3, 'start_date': datetime.datetime(2022, 1, 1)}, schedule_interval='0 0 * * *', - dagrun_timeout=timedelta(minutes=60), + dagrun_timeout=datetime.timedelta(minutes=60), ) cmd = 'ls -l' diff --git a/tests/dags/test_mapped_classic.py b/tests/dags/test_mapped_classic.py index cbf3a8a5b8178..c3e0d413beba8 100644 --- a/tests/dags/test_mapped_classic.py +++ b/tests/dags/test_mapped_classic.py @@ -15,10 +15,11 @@ # specific language governing permissions and limitations # under the License. +import datetime + from airflow import DAG from airflow.decorators import task from airflow.operators.python import PythonOperator -from airflow.utils.dates import days_ago @task @@ -30,7 +31,7 @@ def consumer(value): print(repr(value)) -with DAG(dag_id='test_mapped_classic', start_date=days_ago(2)) as dag: +with DAG(dag_id='test_mapped_classic', start_date=datetime.datetime(2022, 1, 1)) as dag: PythonOperator.partial(task_id='consumer', python_callable=consumer).expand(op_args=make_arg_lists()) PythonOperator.partial(task_id='consumer_literal', python_callable=consumer).expand( op_args=[[1], [2], [3]], diff --git a/tests/dags/test_mapped_taskflow.py b/tests/dags/test_mapped_taskflow.py index e4e796c3e45de..a803d9afbbe84 100644 --- a/tests/dags/test_mapped_taskflow.py +++ b/tests/dags/test_mapped_taskflow.py @@ -15,10 +15,11 @@ # specific language governing permissions and limitations # under the License. +import datetime + from airflow import DAG -from airflow.utils.dates import days_ago -with DAG(dag_id='test_mapped_taskflow', start_date=days_ago(2)) as dag: +with DAG(dag_id='test_mapped_taskflow', start_date=datetime.datetime(2022, 1, 1)) as dag: @dag.task def make_list(): diff --git a/tests/dags/test_miscellaneous.py b/tests/dags/test_miscellaneous.py index 7f77340e623c2..2174a614782c5 100644 --- a/tests/dags/test_miscellaneous.py +++ b/tests/dags/test_miscellaneous.py @@ -18,12 +18,11 @@ """Example DAG demonstrating the usage of the BashOperator.""" -from datetime import timedelta +import datetime from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator -from airflow.utils.dates import days_ago args = { 'owner': 'airflow', @@ -33,8 +32,8 @@ dag_id='miscellaneous_test_dag', default_args=args, schedule_interval='0 0 * * *', - start_date=days_ago(2), - dagrun_timeout=timedelta(minutes=60), + start_date=datetime.datetime(2022, 1, 1), + dagrun_timeout=datetime.timedelta(minutes=60), tags=['example', 'example2'], params={"example_key": "example_value"}, ) diff --git a/tests/dags/test_missing_owner.py b/tests/dags/test_missing_owner.py index dc70a5b757f10..cc20db08c63e2 100644 --- a/tests/dags/test_missing_owner.py +++ b/tests/dags/test_missing_owner.py @@ -16,17 +16,16 @@ # specific language governing permissions and limitations # under the License. -from datetime import timedelta +import datetime from airflow import DAG from airflow.operators.empty import EmptyOperator -from airflow.utils.dates import days_ago with DAG( dag_id="test_missing_owner", schedule_interval="0 0 * * *", - start_date=days_ago(2), - dagrun_timeout=timedelta(minutes=60), + start_date=datetime.datetime(2022, 1, 1), + dagrun_timeout=datetime.timedelta(minutes=60), tags=["example"], ) as dag: run_this_last = EmptyOperator( diff --git a/tests/dags/test_multiple_dags.py b/tests/dags/test_multiple_dags.py index 44aa5cdfc2c5b..67ae544123150 100644 --- a/tests/dags/test_multiple_dags.py +++ b/tests/dags/test_multiple_dags.py @@ -15,13 +15,12 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from datetime import timedelta +import datetime from airflow.models import DAG from airflow.operators.bash import BashOperator -from airflow.utils.dates import days_ago -args = {'owner': 'airflow', 'retries': 3, 'start_date': days_ago(2)} +args = {'owner': 'airflow', 'retries': 3, 'start_date': datetime.datetime(2022, 1, 1)} def create_dag(suffix): @@ -29,7 +28,7 @@ def create_dag(suffix): dag_id=f'test_multiple_dags__{suffix}', default_args=args, schedule_interval='0 0 * * *', - dagrun_timeout=timedelta(minutes=60), + dagrun_timeout=datetime.timedelta(minutes=60), ) with dag: diff --git a/tests/dags/test_with_non_default_owner.py b/tests/dags/test_with_non_default_owner.py index eb7d1f3d9927b..43a2953f9c1f5 100644 --- a/tests/dags/test_with_non_default_owner.py +++ b/tests/dags/test_with_non_default_owner.py @@ -16,17 +16,16 @@ # specific language governing permissions and limitations # under the License. -from datetime import timedelta +import datetime from airflow import DAG from airflow.operators.empty import EmptyOperator -from airflow.utils.dates import days_ago with DAG( dag_id="test_with_non_default_owner", schedule_interval="0 0 * * *", - start_date=days_ago(2), - dagrun_timeout=timedelta(minutes=60), + start_date=datetime.datetime(2022, 1, 1), + dagrun_timeout=datetime.timedelta(minutes=60), tags=["example"], ) as dag: run_this_last = EmptyOperator( diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py index c358ada979e5b..d1d461a4e01f4 100644 --- a/tests/jobs/test_backfill_job.py +++ b/tests/jobs/test_backfill_job.py @@ -23,7 +23,6 @@ import threading from unittest.mock import patch -import pendulum import pytest from airflow import settings @@ -1600,7 +1599,7 @@ def test_mapped_dag(self, dag_id, executor_name, session): self.dagbag.process_file(str(TEST_DAGS_FOLDER / f'{dag_id}.py')) dag = self.dagbag.get_dag(dag_id) - when = pendulum.today('UTC') + when = datetime.datetime(2022, 1, 1) job = BackfillJob( dag=dag, diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index b50c8bfd7fde0..fd32e6dd7dd6d 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3955,7 +3955,6 @@ def test_mapped_dag(self, dag_id, session): """End-to-end test of a simple mapped dag""" # Use SequentialExecutor for more predictable test behaviour from airflow.executors.sequential_executor import SequentialExecutor - from airflow.utils.dates import days_ago self.dagbag.process_file(str(TEST_DAGS_FOLDER / f'{dag_id}.py')) dag = self.dagbag.get_dag(dag_id) @@ -3964,7 +3963,7 @@ def test_mapped_dag(self, dag_id, session): run_type=DagRunType.MANUAL, start_date=timezone.utcnow(), state=State.RUNNING, - execution_date=days_ago(2), + execution_date=timezone.utcnow() - datetime.timedelta(days=2), session=session, ) diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index b53df7bf9ba23..f73f5d1c45147 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -37,7 +37,6 @@ from airflow.serialization.serialized_objects import SerializedDAG from airflow.stats import Stats from airflow.utils import timezone -from airflow.utils.dates import days_ago from airflow.utils.state import DagRunState, State, TaskInstanceState from airflow.utils.trigger_rule import TriggerRule from airflow.utils.types import DagRunType @@ -785,7 +784,7 @@ def test_no_scheduling_delay_for_nonscheduled_runs(self, stats_mock, session): Tests that dag scheduling delay stat is not called if the dagrun is not a scheduled run. This case is manual run. Simple test for coherence check. """ - dag = DAG(dag_id='test_dagrun_stats', start_date=days_ago(1)) + dag = DAG(dag_id='test_dagrun_stats', start_date=DEFAULT_DATE) dag_task = EmptyOperator(task_id='dummy', dag=dag) initial_task_states = { @@ -809,7 +808,7 @@ def test_emit_scheduling_delay(self, session, schedule_interval, expected): Tests that dag scheduling delay stat is set properly once running scheduled dag. dag_run.update_state() invokes the _emit_true_scheduling_delay_stats_for_finished_state method. """ - dag = DAG(dag_id='test_emit_dag_stats', start_date=days_ago(1), schedule_interval=schedule_interval) + dag = DAG(dag_id='test_emit_dag_stats', start_date=DEFAULT_DATE, schedule_interval=schedule_interval) dag_task = EmptyOperator(task_id='dummy', dag=dag, owner='airflow') try: @@ -860,7 +859,7 @@ def test_states_sets(self, session): """ Tests that adding State.failed_states and State.success_states work as expected. """ - dag = DAG(dag_id='test_dagrun_states', start_date=days_ago(1)) + dag = DAG(dag_id='test_dagrun_states', start_date=DEFAULT_DATE) dag_task_success = EmptyOperator(task_id='dummy', dag=dag) dag_task_failed = EmptyOperator(task_id='dummy2', dag=dag) diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py index e58b424de6453..a925e52fa986e 100644 --- a/tests/operators/test_python.py +++ b/tests/operators/test_python.py @@ -42,7 +42,6 @@ ) from airflow.utils import timezone from airflow.utils.context import AirflowContextDeprecationWarning, Context -from airflow.utils.dates import days_ago from airflow.utils.python_virtualenv import prepare_virtualenv from airflow.utils.session import create_session from airflow.utils.state import State @@ -1161,7 +1160,7 @@ def f(): DEFAULT_ARGS = { "owner": "test", "depends_on_past": True, - "start_date": days_ago(1), + "start_date": timezone.datetime(2022, 1, 1), "end_date": datetime.today(), "schedule_interval": "@once", "retries": 1, diff --git a/tests/providers/google/cloud/operators/test_mlengine.py b/tests/providers/google/cloud/operators/test_mlengine.py index e12a7e9621135..af7a487f42af0 100644 --- a/tests/providers/google/cloud/operators/test_mlengine.py +++ b/tests/providers/google/cloud/operators/test_mlengine.py @@ -42,7 +42,6 @@ ) from airflow.serialization.serialized_objects import SerializedDAG from airflow.utils import timezone -from airflow.utils.dates import days_ago DEFAULT_DATE = timezone.datetime(2017, 6, 6) @@ -410,7 +409,7 @@ def test_success_create_training_job_with_master_image(self, hook): 'imageUri': 'eu.gcr.io/test-project/test-image:test-version', }, 'task_id': 'test-training', - 'start_date': days_ago(1), + 'start_date': DEFAULT_DATE, } request = { 'jobId': 'test_training', diff --git a/tests/test_utils/perf/dags/perf_dag_1.py b/tests/test_utils/perf/dags/perf_dag_1.py index 3757c7d40e092..4305ec4f2ec9b 100644 --- a/tests/test_utils/perf/dags/perf_dag_1.py +++ b/tests/test_utils/perf/dags/perf_dag_1.py @@ -18,23 +18,27 @@ """ This dag tests performance of simple bash commands executed with Airflow. """ -from datetime import timedelta +import datetime from airflow.models import DAG from airflow.operators.bash_operator import BashOperator -from airflow.utils.dates import days_ago args = { 'owner': 'airflow', - 'start_date': days_ago(3), + 'start_date': datetime.datetime(2022, 1, 1), } dag = DAG( - dag_id='perf_dag_1', default_args=args, schedule_interval='@daily', dagrun_timeout=timedelta(minutes=60) + dag_id='perf_dag_1', + default_args=args, + schedule_interval='@daily', + dagrun_timeout=datetime.timedelta(minutes=60), ) task_1 = BashOperator( - task_id='perf_task_1', bash_command='sleep 5; echo "run_id={{ run_id }} | dag_run={{ dag_run }}"', dag=dag + task_id='perf_task_1', + bash_command='sleep 5; echo "run_id={{ run_id }} | dag_run={{ dag_run }}"', + dag=dag, ) for i in range(2, 5): diff --git a/tests/test_utils/perf/dags/perf_dag_2.py b/tests/test_utils/perf/dags/perf_dag_2.py index 208ea49b927df..5b2c02486491c 100644 --- a/tests/test_utils/perf/dags/perf_dag_2.py +++ b/tests/test_utils/perf/dags/perf_dag_2.py @@ -18,23 +18,27 @@ """ This dag tests performance of simple bash commands executed with Airflow. """ -from datetime import timedelta +import datetime from airflow.models import DAG from airflow.operators.bash import BashOperator -from airflow.utils.dates import days_ago args = { 'owner': 'airflow', - 'start_date': days_ago(3), + 'start_date': datetime.datetime(2022, 1, 1), } dag = DAG( - dag_id='perf_dag_2', default_args=args, schedule_interval='@daily', dagrun_timeout=timedelta(minutes=60) + dag_id='perf_dag_2', + default_args=args, + schedule_interval='@daily', + dagrun_timeout=datetime.timedelta(minutes=60), ) task_1 = BashOperator( - task_id='perf_task_1', bash_command='sleep 5; echo "run_id={{ run_id }} | dag_run={{ dag_run }}"', dag=dag + task_id='perf_task_1', + bash_command='sleep 5; echo "run_id={{ run_id }} | dag_run={{ dag_run }}"', + dag=dag, ) for i in range(2, 5): diff --git a/tests/utils/test_task_group.py b/tests/utils/test_task_group.py index 9a65c8d621e90..4b47a0c9bb4bd 100644 --- a/tests/utils/test_task_group.py +++ b/tests/utils/test_task_group.py @@ -25,7 +25,6 @@ from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator from airflow.operators.python import PythonOperator -from airflow.utils.dates import days_ago from airflow.utils.task_group import TaskGroup from airflow.www.views import dag_edges, task_group_to_dict from tests.models import DEFAULT_DATE @@ -1025,7 +1024,7 @@ def add_one(i): def increment(num): return num + 1 - @dag(schedule_interval=None, start_date=days_ago(1), default_args={"owner": "airflow"}) + @dag(schedule_interval=None, start_date=pendulum.DateTime(2022, 1, 1), default_args={"owner": "airflow"}) def wrap(): total_1 = one() assert isinstance(total_1, XComArg) diff --git a/tests/www/views/test_views_decorators.py b/tests/www/views/test_views_decorators.py index c211806bd1b04..0e4fc12857a8f 100644 --- a/tests/www/views/test_views_decorators.py +++ b/tests/www/views/test_views_decorators.py @@ -22,7 +22,7 @@ import pytest from airflow.models import DagBag, DagRun, Log, TaskInstance -from airflow.utils import dates, timezone +from airflow.utils import timezone from airflow.utils.state import State from airflow.utils.types import DagRunType from airflow.www import app @@ -30,7 +30,7 @@ from tests.test_utils.db import clear_db_runs from tests.test_utils.www import check_content_in_response -EXAMPLE_DAG_DEFAULT_DATE = dates.days_ago(2) +EXAMPLE_DAG_DEFAULT_DATE = timezone.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) @pytest.fixture(scope="module") diff --git a/tests/www/views/test_views_extra_links.py b/tests/www/views/test_views_extra_links.py index b9283923f7702..669499b0a4dcf 100644 --- a/tests/www/views/test_views_extra_links.py +++ b/tests/www/views/test_views_extra_links.py @@ -24,7 +24,7 @@ from airflow.models import DAG from airflow.models.baseoperator import BaseOperator, BaseOperatorLink -from airflow.utils import dates, timezone +from airflow.utils import timezone from airflow.utils.session import create_session from airflow.utils.state import DagRunState, TaskInstanceState from airflow.utils.types import DagRunType @@ -157,9 +157,8 @@ def test_global_extra_links_works(dag_run, task_1, viewer_client, session): def test_extra_link_in_gantt_view(dag, create_dag_run, viewer_client): - exec_date = dates.days_ago(2) + exec_date = timezone.datetime(2022, 1, 1) start_date = timezone.datetime(2020, 4, 10, 2, 0, 0) - end_date = exec_date + datetime.timedelta(seconds=30) with create_session() as session: dag_run = create_dag_run(execution_date=exec_date, session=session) @@ -167,7 +166,7 @@ def test_extra_link_in_gantt_view(dag, create_dag_run, viewer_client): ti.refresh_from_task(dag.get_task(ti.task_id)) ti.state = TaskInstanceState.SUCCESS ti.start_date = start_date - ti.end_date = end_date + ti.end_date = start_date + datetime.timedelta(seconds=30) session.merge(ti) url = f'gantt?dag_id={dag.dag_id}&execution_date={exec_date}' diff --git a/tests/www/views/test_views_task_norun.py b/tests/www/views/test_views_task_norun.py index 4f5dc092b354b..3790148fc954f 100644 --- a/tests/www/views/test_views_task_norun.py +++ b/tests/www/views/test_views_task_norun.py @@ -15,14 +15,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import datetime import urllib.parse import pytest -from airflow.utils import dates from tests.test_utils.db import clear_db_runs -DEFAULT_DATE = dates.days_ago(2) +DEFAULT_DATE = datetime.datetime(2022, 1, 1) DEFAULT_VAL = urllib.parse.quote_plus(str(DEFAULT_DATE)) diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py index ebed9ab05fe11..3428ac848aecd 100644 --- a/tests/www/views/test_views_tasks.py +++ b/tests/www/views/test_views_tasks.py @@ -32,7 +32,7 @@ from airflow.operators.bash import BashOperator from airflow.security import permissions from airflow.ti_deps.dependencies_states import QUEUEABLE_STATES, RUNNABLE_STATES -from airflow.utils import dates, timezone +from airflow.utils import timezone from airflow.utils.log.logging_mixin import ExternalLoggingMixin from airflow.utils.session import create_session from airflow.utils.state import State @@ -43,7 +43,7 @@ from tests.test_utils.db import clear_db_runs from tests.test_utils.www import check_content_in_response, check_content_not_in_response, client_with_login -DEFAULT_DATE = dates.days_ago(2) +DEFAULT_DATE = timezone.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) DEFAULT_VAL = urllib.parse.quote_plus(str(DEFAULT_DATE))