diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 61ca6bd777e2e..9775c9f1cab12 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -273,7 +273,7 @@ def update_state(self, session=None): none_task_concurrency = all(t.task.task_concurrency is None for t in unfinished_tasks) # small speed up - if unfinished_tasks and none_depends_on_past and none_task_concurrency: + if unfinished_tasks: scheduleable_tasks = [ut for ut in unfinished_tasks if ut.state in SCHEDULEABLE_STATES] self.log.debug( "number of scheduleable tasks for %s: %s task(s)", diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index e7050670aa13c..ab406be3ca622 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -477,6 +477,104 @@ def test_find_executable_task_instances_none(self): states=[State.SCHEDULED], session=session))) + @parameterized.expand([ + [State.NONE, None, None], + [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30), + timezone.utcnow() - datetime.timedelta(minutes=15)], + [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30), + timezone.utcnow() - datetime.timedelta(minutes=15)], + ]) + def test_process_task_instances_with_task_concurrency( + self, state, start_date, end_date, + ): + """ + Test if _process_task_instances puts the right task instances into the + mock_list. + """ + dag = DAG( + dag_id='test_scheduler_process_execute_task_with_task_concurrency', + start_date=DEFAULT_DATE) + dag_task1 = DummyOperator( + task_id='dummy', + task_concurrency=2, + dag=dag, + owner='airflow') + + with create_session() as session: + orm_dag = DagModel(dag_id=dag.dag_id) + session.merge(orm_dag) + + scheduler_job = SchedulerJob() + dag.clear() + dr = scheduler_job.create_dag_run(dag) + self.assertIsNotNone(dr) + + with create_session() as session: + tis = dr.get_task_instances(session=session) + for ti in tis: + ti.state = state + ti.start_date = start_date + ti.end_date = end_date + + ti_to_schedule = [] + scheduler_job._process_task_instances(dag, task_instances_list=ti_to_schedule) + + assert ti_to_schedule == [ + (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER), + ] + + @parameterized.expand([ + [State.NONE, None, None], + [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30), + timezone.utcnow() - datetime.timedelta(minutes=15)], + [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30), + timezone.utcnow() - datetime.timedelta(minutes=15)], + ]) + def test_process_task_instances_depends_on_past(self, state, start_date, end_date): + """ + Test if _process_task_instances puts the right task instances into the + mock_list. + """ + dag = DAG( + dag_id='test_scheduler_process_execute_task_depends_on_past', + start_date=DEFAULT_DATE, + default_args={ + 'depends_on_past': True, + }, + ) + dag_task1 = DummyOperator( + task_id='dummy1', + dag=dag, + owner='airflow') + dag_task2 = DummyOperator( + task_id='dummy2', + dag=dag, + owner='airflow') + + with create_session() as session: + orm_dag = DagModel(dag_id=dag.dag_id) + session.merge(orm_dag) + + scheduler_job = SchedulerJob() + dag.clear() + dr = scheduler_job.create_dag_run(dag) + self.assertIsNotNone(dr) + + with create_session() as session: + tis = dr.get_task_instances(session=session) + for ti in tis: + ti.state = state + ti.start_date = start_date + ti.end_date = end_date + + ti_to_schedule = [] + scheduler_job._process_task_instances(dag, task_instances_list=ti_to_schedule) + + assert ti_to_schedule == [ + (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER), + (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER), + ] + def test_find_executable_task_instances_concurrency(self): dag_id = 'SchedulerJobTest.test_find_executable_task_instances_concurrency' task_id_1 = 'dummy'