Skip to content

BugFix: Tasks with depends_on_past or task_concurrency are stuck #12663

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def update_state(self, session=None):
none_task_concurrency = all(t.task.task_concurrency is None
for t in unfinished_tasks)
# small speed up
if unfinished_tasks and none_depends_on_past and none_task_concurrency:
if unfinished_tasks:
scheduleable_tasks = [ut for ut in unfinished_tasks if ut.state in SCHEDULEABLE_STATES]
self.log.debug(
"number of scheduleable tasks for %s: %s task(s)",
Expand Down
98 changes: 98 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,104 @@ def test_find_executable_task_instances_none(self):
states=[State.SCHEDULED],
session=session)))

@parameterized.expand([
[State.NONE, None, None],
[State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15)],
[State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15)],
])
def test_process_task_instances_with_task_concurrency(
self, state, start_date, end_date,
):
"""
Test if _process_task_instances puts the right task instances into the
mock_list.
"""
dag = DAG(
dag_id='test_scheduler_process_execute_task_with_task_concurrency',
start_date=DEFAULT_DATE)
dag_task1 = DummyOperator(
task_id='dummy',
task_concurrency=2,
dag=dag,
owner='airflow')

with create_session() as session:
orm_dag = DagModel(dag_id=dag.dag_id)
session.merge(orm_dag)

scheduler_job = SchedulerJob()
dag.clear()
dr = scheduler_job.create_dag_run(dag)
self.assertIsNotNone(dr)

with create_session() as session:
tis = dr.get_task_instances(session=session)
for ti in tis:
ti.state = state
ti.start_date = start_date
ti.end_date = end_date

ti_to_schedule = []
scheduler_job._process_task_instances(dag, task_instances_list=ti_to_schedule)

assert ti_to_schedule == [
(dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
]

@parameterized.expand([
[State.NONE, None, None],
[State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15)],
[State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15)],
])
def test_process_task_instances_depends_on_past(self, state, start_date, end_date):
"""
Test if _process_task_instances puts the right task instances into the
mock_list.
"""
dag = DAG(
dag_id='test_scheduler_process_execute_task_depends_on_past',
start_date=DEFAULT_DATE,
default_args={
'depends_on_past': True,
},
)
dag_task1 = DummyOperator(
task_id='dummy1',
dag=dag,
owner='airflow')
dag_task2 = DummyOperator(
task_id='dummy2',
dag=dag,
owner='airflow')

with create_session() as session:
orm_dag = DagModel(dag_id=dag.dag_id)
session.merge(orm_dag)

scheduler_job = SchedulerJob()
dag.clear()
dr = scheduler_job.create_dag_run(dag)
self.assertIsNotNone(dr)

with create_session() as session:
tis = dr.get_task_instances(session=session)
for ti in tis:
ti.state = state
ti.start_date = start_date
ti.end_date = end_date

ti_to_schedule = []
scheduler_job._process_task_instances(dag, task_instances_list=ti_to_schedule)

assert ti_to_schedule == [
(dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
(dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm observing CI error after this is merged into 1-10-stable.

I assume it's because the order of the two elements in ti_to_schedule is not guaranteed.

Possibly can instead assert like this:

        mock_list = Mock()
        scheduler._process_task_instances(dag, task_instances_list=mock_list)

        mock_list.append.assert_called_with(
            (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER)
        )

(https://github.com/astronomer/airflow/blob/67807ee492482f57442239e271747a5acc69e15b/tests/jobs/test_scheduler_job.py#L1604-L1609)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will prepare a PR shortly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

argghh Python 2.7 & 3.5 I guess because of the ordering

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the order is not guaranteed so assert two lists would fail.

I have the change ready, and will raise the PR now.


def test_find_executable_task_instances_concurrency(self):
dag_id = 'SchedulerJobTest.test_find_executable_task_instances_concurrency'
task_id_1 = 'dummy'
Expand Down