Skip to content

Commit 2a7944d

Browse files
authored
BugFix: Tasks with depends_on_past or task_concurrency are stuck (#12663)
closes #12659
1 parent 847820f commit 2a7944d

File tree

2 files changed

+99
-1
lines changed

2 files changed

+99
-1
lines changed

airflow/models/dagrun.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ def update_state(self, session=None):
273273
none_task_concurrency = all(t.task.task_concurrency is None
274274
for t in unfinished_tasks)
275275
# small speed up
276-
if unfinished_tasks and none_depends_on_past and none_task_concurrency:
276+
if unfinished_tasks:
277277
scheduleable_tasks = [ut for ut in unfinished_tasks if ut.state in SCHEDULEABLE_STATES]
278278
self.log.debug(
279279
"number of scheduleable tasks for %s: %s task(s)",

tests/jobs/test_scheduler_job.py

+98
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,104 @@ def test_find_executable_task_instances_none(self):
477477
states=[State.SCHEDULED],
478478
session=session)))
479479

480+
@parameterized.expand([
481+
[State.NONE, None, None],
482+
[State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
483+
timezone.utcnow() - datetime.timedelta(minutes=15)],
484+
[State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
485+
timezone.utcnow() - datetime.timedelta(minutes=15)],
486+
])
487+
def test_process_task_instances_with_task_concurrency(
488+
self, state, start_date, end_date,
489+
):
490+
"""
491+
Test if _process_task_instances puts the right task instances into the
492+
mock_list.
493+
"""
494+
dag = DAG(
495+
dag_id='test_scheduler_process_execute_task_with_task_concurrency',
496+
start_date=DEFAULT_DATE)
497+
dag_task1 = DummyOperator(
498+
task_id='dummy',
499+
task_concurrency=2,
500+
dag=dag,
501+
owner='airflow')
502+
503+
with create_session() as session:
504+
orm_dag = DagModel(dag_id=dag.dag_id)
505+
session.merge(orm_dag)
506+
507+
scheduler_job = SchedulerJob()
508+
dag.clear()
509+
dr = scheduler_job.create_dag_run(dag)
510+
self.assertIsNotNone(dr)
511+
512+
with create_session() as session:
513+
tis = dr.get_task_instances(session=session)
514+
for ti in tis:
515+
ti.state = state
516+
ti.start_date = start_date
517+
ti.end_date = end_date
518+
519+
ti_to_schedule = []
520+
scheduler_job._process_task_instances(dag, task_instances_list=ti_to_schedule)
521+
522+
assert ti_to_schedule == [
523+
(dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
524+
]
525+
526+
@parameterized.expand([
527+
[State.NONE, None, None],
528+
[State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
529+
timezone.utcnow() - datetime.timedelta(minutes=15)],
530+
[State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
531+
timezone.utcnow() - datetime.timedelta(minutes=15)],
532+
])
533+
def test_process_task_instances_depends_on_past(self, state, start_date, end_date):
534+
"""
535+
Test if _process_task_instances puts the right task instances into the
536+
mock_list.
537+
"""
538+
dag = DAG(
539+
dag_id='test_scheduler_process_execute_task_depends_on_past',
540+
start_date=DEFAULT_DATE,
541+
default_args={
542+
'depends_on_past': True,
543+
},
544+
)
545+
dag_task1 = DummyOperator(
546+
task_id='dummy1',
547+
dag=dag,
548+
owner='airflow')
549+
dag_task2 = DummyOperator(
550+
task_id='dummy2',
551+
dag=dag,
552+
owner='airflow')
553+
554+
with create_session() as session:
555+
orm_dag = DagModel(dag_id=dag.dag_id)
556+
session.merge(orm_dag)
557+
558+
scheduler_job = SchedulerJob()
559+
dag.clear()
560+
dr = scheduler_job.create_dag_run(dag)
561+
self.assertIsNotNone(dr)
562+
563+
with create_session() as session:
564+
tis = dr.get_task_instances(session=session)
565+
for ti in tis:
566+
ti.state = state
567+
ti.start_date = start_date
568+
ti.end_date = end_date
569+
570+
ti_to_schedule = []
571+
scheduler_job._process_task_instances(dag, task_instances_list=ti_to_schedule)
572+
573+
assert ti_to_schedule == [
574+
(dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
575+
(dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
576+
]
577+
480578
def test_find_executable_task_instances_concurrency(self):
481579
dag_id = 'SchedulerJobTest.test_find_executable_task_instances_concurrency'
482580
task_id_1 = 'dummy'

0 commit comments

Comments
 (0)