Skip to content

Commit

Permalink
[AIRFLOW-3607] fix scheduler bug related to concurrency and depends o…
Browse files Browse the repository at this point in the history
…n past (apache#7402)

commit 50efda5 introduced a bug that
prevents scheduler from scheduling tasks with the following properties:

* has depends on past set to True
* has custom concurrency limit
  • Loading branch information
QP Hou authored and turbaszek committed Sep 24, 2020
1 parent 159a0ed commit 6652ada
Show file tree
Hide file tree
Showing 2 changed files with 973 additions and 8 deletions.
26 changes: 19 additions & 7 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,15 +272,27 @@ def update_state(self, session=None):
none_depends_on_past = all(not t.task.depends_on_past for t in unfinished_tasks)
none_task_concurrency = all(t.task.task_concurrency is None
for t in unfinished_tasks)
# small speed up
if unfinished_tasks and none_depends_on_past and none_task_concurrency:
if unfinished_tasks:
scheduleable_tasks = [ut for ut in unfinished_tasks if ut.state in SCHEDULEABLE_STATES]
if none_depends_on_past and none_task_concurrency:
# small speed up
self.log.debug(
"number of scheduleable tasks for %s: %s task(s)",
self, len(scheduleable_tasks))
ready_tis, changed_tis = self._get_ready_tis(scheduleable_tasks, finished_tasks, session)
self.log.debug("ready tis length for %s: %s task(s)", self, len(ready_tis))
are_runnable_tasks = ready_tis or self._are_premature_tis(
unfinished_tasks, finished_tasks, session) or changed_tis
else:
# slow path
for ti in scheduleable_tasks:
if ti.are_dependencies_met(
dep_context=DepContext(flag_upstream_failed=True),
session=session
):
self.log.debug('Queuing task: %s', ti)
ready_tis.append(ti)

self.log.debug("number of scheduleable tasks for %s: %s task(s)", self, len(scheduleable_tasks))
ready_tis, changed_tis = self._get_ready_tis(scheduleable_tasks, finished_tasks, session)
self.log.debug("ready tis length for %s: %s task(s)", self, len(ready_tis))
are_runnable_tasks = ready_tis or self._are_premature_tis(
unfinished_tasks, finished_tasks, session) or changed_tis
duration = (timezone.utcnow() - start_dttm).total_seconds() * 1000
Stats.timing("dagrun.dependency-check.{}".format(self.dag_id), duration)

Expand Down
Loading

0 comments on commit 6652ada

Please sign in to comment.