Skip to content

Commit

Permalink
BugFix: Tasks with depends_on_past or task_concurrency are stuck (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
kaxil authored and Dewald Abrie committed Dec 11, 2020
1 parent 8dc47a3 commit b54e0f3
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 1 deletion.
2 changes: 1 addition & 1 deletion airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def update_state(self, session=None):
none_task_concurrency = all(t.task.task_concurrency is None
for t in unfinished_tasks)
# small speed up
if unfinished_tasks and none_depends_on_past and none_task_concurrency:
if unfinished_tasks:
scheduleable_tasks = [ut for ut in unfinished_tasks if ut.state in SCHEDULEABLE_STATES]
self.log.debug(
"number of scheduleable tasks for %s: %s task(s)",
Expand Down
98 changes: 98 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,104 @@ def test_find_executable_task_instances_none(self):
states=[State.SCHEDULED],
session=session)))

@parameterized.expand([
[State.NONE, None, None],
[State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15)],
[State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15)],
])
def test_process_task_instances_with_task_concurrency(
self, state, start_date, end_date,
):
"""
Test if _process_task_instances puts the right task instances into the
mock_list.
"""
dag = DAG(
dag_id='test_scheduler_process_execute_task_with_task_concurrency',
start_date=DEFAULT_DATE)
dag_task1 = DummyOperator(
task_id='dummy',
task_concurrency=2,
dag=dag,
owner='airflow')

with create_session() as session:
orm_dag = DagModel(dag_id=dag.dag_id)
session.merge(orm_dag)

scheduler_job = SchedulerJob()
dag.clear()
dr = scheduler_job.create_dag_run(dag)
self.assertIsNotNone(dr)

with create_session() as session:
tis = dr.get_task_instances(session=session)
for ti in tis:
ti.state = state
ti.start_date = start_date
ti.end_date = end_date

ti_to_schedule = []
scheduler_job._process_task_instances(dag, task_instances_list=ti_to_schedule)

assert ti_to_schedule == [
(dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
]

@parameterized.expand([
[State.NONE, None, None],
[State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15)],
[State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15)],
])
def test_process_task_instances_depends_on_past(self, state, start_date, end_date):
"""
Test if _process_task_instances puts the right task instances into the
mock_list.
"""
dag = DAG(
dag_id='test_scheduler_process_execute_task_depends_on_past',
start_date=DEFAULT_DATE,
default_args={
'depends_on_past': True,
},
)
dag_task1 = DummyOperator(
task_id='dummy1',
dag=dag,
owner='airflow')
dag_task2 = DummyOperator(
task_id='dummy2',
dag=dag,
owner='airflow')

with create_session() as session:
orm_dag = DagModel(dag_id=dag.dag_id)
session.merge(orm_dag)

scheduler_job = SchedulerJob()
dag.clear()
dr = scheduler_job.create_dag_run(dag)
self.assertIsNotNone(dr)

with create_session() as session:
tis = dr.get_task_instances(session=session)
for ti in tis:
ti.state = state
ti.start_date = start_date
ti.end_date = end_date

ti_to_schedule = []
scheduler_job._process_task_instances(dag, task_instances_list=ti_to_schedule)

assert ti_to_schedule == [
(dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
(dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
]

def test_find_executable_task_instances_concurrency(self):
dag_id = 'SchedulerJobTest.test_find_executable_task_instances_concurrency'
task_id_1 = 'dummy'
Expand Down

0 comments on commit b54e0f3

Please sign in to comment.