Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-3607] collected trigger rule dep check per dag run #4751

Merged

Conversation

amichai07
Copy link
Contributor

@amichai07 amichai07 commented Feb 21, 2019

Jira

Description

  • The delay between tasks can be a major issue, especially when we have dags with many subdags,
    figures out that the scheduling process spends plenty of time in dependency checking, we took the
    trigger rule dependency which calls the db for each task instance, we made it call the db just once for
    each dag_run.

Tests

  • My pr uses very basic parts of the code and also has a fall back to the original behaviour, the ci covers most of the logic and cases that might happen already, I added a unit/integration test that checks the function that has most of the new logic

Commits

  • removed unnecessary queries - run on each dag run instead of each ti

Documentation

no need for new docs

Code Quality

  • Passes flake8
  • Tested in production environment for 3 days

Results

The tests was made on a heavily multitasks dag (35 tasks).
The tasks don't do any db queries

On local environment
before changes:

  • avg delay between tasks: 4.22 sec
  • number of queries during 10 minutes: 118,879

after collecting dep check queries:

  • avg delay between tasks: 3.86 sec
  • number of queries during 10 minutes: 104,397

Stress test - running the dag for every 10 sec for an hour:
before changes:

  • avg delay between tasks: 16.7 sec
  • number of queries: 943,230

after:

  • avg delay between tasks: 3.28 sec
  • number of queries: 734,563

On production environment
before changes:

  • avg delay between tasks: 2.45 sec

after:

  • avg delay between tasks: 2.16 sec

Stress test - running with 150 other dags:
before changes:

  • avg delay between tasks: 6.3 sec

after:

  • avg delay between tasks: 4.74 sec

Edit

we recently did a stress test to check this change again with version 1.10.4
we did the test on a staging like production environment with one dag with 49 tasks that starts once a minute :

  • max delay was decreased by 30%
  • db cpu was decreased by 20%
  • avg delay was decreased 18%

This is a graph of the time delay between two tasks in the dag by time:

  • With improvement(first segment): 14:30-16:30
  • Without improvement(last segment): 18:10-20:10

Screen Shot 2019-11-03 at 20 47 22

This graph shows the db read iops difference which can explain the change (the times are different because this is in utc)
Screen Shot 2019-11-10 at 17 21 41

Conclusion

The query that we changed indeed had a dramatic impact on the performance of the scheduler. Reusing db results decreased the delay notably and gave the system chance to recover from stress.

@feluelle
Copy link
Member

feluelle commented Feb 21, 2019

Please reference your jira ticket number in the PR title. It is required for the jira git integration to work.

@amichai07 amichai07 force-pushed the decrease_scheduler_dependencies_queries branch from 24f5221 to 519b43a Compare February 21, 2019 13:38
@codecov-io
Copy link

codecov-io commented Feb 21, 2019

Codecov Report

Merging #4751 into master will increase coverage by 0.27%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #4751      +/-   ##
==========================================
+ Coverage   84.81%   85.08%   +0.27%     
==========================================
  Files         679      723      +44     
  Lines       38493    39558    +1065     
==========================================
+ Hits        32646    33658    +1012     
- Misses       5847     5900      +53
Impacted Files Coverage Δ
airflow/jobs/scheduler_job.py 89.23% <100%> (-0.05%) ⬇️
airflow/ti_deps/deps/trigger_rule_dep.py 91.25% <100%> (+0.46%) ⬆️
airflow/ti_deps/dep_context.py 100% <100%> (ø) ⬆️
airflow/models/dagrun.py 96.36% <100%> (-0.23%) ⬇️
airflow/contrib/hooks/azure_data_lake_hook.py 0% <0%> (-93.11%) ⬇️
airflow/contrib/sensors/azure_cosmos_sensor.py 0% <0%> (-81.25%) ⬇️
airflow/kubernetes/volume_mount.py 44.44% <0%> (-55.56%) ⬇️
airflow/kubernetes/volume.py 52.94% <0%> (-47.06%) ⬇️
airflow/kubernetes/pod_launcher.py 45.25% <0%> (-46.72%) ⬇️
airflow/security/kerberos.py 30.43% <0%> (-45.66%) ⬇️
... and 240 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 95087af...5890619. Read the comment docs.

@amichai07 amichai07 changed the title collected trigger rule dep check per dag run [AIRFLOW-3607] collected trigger rule dep check per dag run Feb 21, 2019
airflow/jobs.py Outdated Show resolved Hide resolved
airflow/jobs.py Outdated Show resolved Hide resolved
airflow/models/__init__.py Outdated Show resolved Hide resolved
@amichai07 amichai07 force-pushed the decrease_scheduler_dependencies_queries branch 3 times, most recently from e9c50ca to bb5c825 Compare March 5, 2019 23:17
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to think if the existing tests are detailed enough and cover both "branches" (with and without finished_tasks being passed in)

Anyone have thoughts on this?

# see if the task name is in the task upstream for our task
upstream_tasks = [finished_task for finished_task in dep_context.finished_tasks
if finished_task.task_id in ti.task.upstream_task_ids]
if upstream_tasks:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering before this change we had only SQL query, but now it's both python + sql.
🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, because we need to share sql results for more than one purpose

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left the sql query for backwards compatibility but I suppose it will be best that in most places the python will be used.

@amichai07 amichai07 force-pushed the decrease_scheduler_dependencies_queries branch 3 times, most recently from 5244994 to e03c63a Compare August 6, 2019 11:16
@amichai07 amichai07 closed this Aug 6, 2019
@amichai07 amichai07 reopened this Aug 6, 2019
@amichai07 amichai07 force-pushed the decrease_scheduler_dependencies_queries branch 2 times, most recently from 9f63c64 to ed922f2 Compare August 6, 2019 17:20
@OmerJog
Copy link
Contributor

OmerJog commented Aug 6, 2019

@amichai07 you have failure in the test:

�[33m======================================================================�[0m
�[33m68) ERROR: test_get_states_count_upstream_ti (tests.ti_deps.deps.test_trigger_rule_dep.TriggerRuleDepTest)�[0m
�[33m----------------------------------------------------------------------�[0m
�[0m   Traceback (most recent call last):�[0m
    �[34mtests/ti_deps/deps/test_trigger_rule_dep.py�[0m line �[1m�[36m412�[0m�[0m in �[36mtest_get_states_count_upstream_ti�[0m
      self.assertEqual(get_states_count_upstream_ti(finished_tasks=finished_tasks, ti=ti_op2),
    �[34mairflow/ti_deps/deps/trigger_rule_dep.py�[0m line �[1m�[36m41�[0m�[0m in �[36m_get_states_count_upstream_ti�[0m
      upstream_tasks = [finished_task for finished_task in finished_tasks
    �[34mairflow/ti_deps/deps/trigger_rule_dep.py�[0m line �[1m�[36m42�[0m�[0m in �[36m<listcomp>�[0m
      if finished_task.task_id in ti.task.upstream_task_ids]
�[33m   �[33m�[1m�[33mAttributeError�[0m�[0m�[33m: �[0m�[33m'TaskInstance' object has no attribute 'task'�[0m

@amichai07 amichai07 force-pushed the decrease_scheduler_dependencies_queries branch 2 times, most recently from da89779 to 3788262 Compare August 7, 2019 08:28
@amichai07 amichai07 force-pushed the decrease_scheduler_dependencies_queries branch from 7562be3 to b9a88f8 Compare August 22, 2019 08:39
Copy link
Member

@feluelle feluelle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, only a few comments.

airflow/ti_deps/dep_context.py Show resolved Hide resolved
airflow/models/dagrun.py Outdated Show resolved Hide resolved
op5 = DummyOperator(task_id='E', trigger_rule=TriggerRule.ONE_FAILED)

op1.set_downstream([op2, op3]) # op1 >> op2, op3
op4.set_upstream([op3, op2]) # op3 >> op4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
op4.set_upstream([op3, op2]) # op3 >> op4
op4.set_upstream([op3, op2]) # (op3, op2) >> op4

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is also fine to use the >> operator. You just need to disable the related pylint rule.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks I will fix it

airflow/utils/state.py Outdated Show resolved Hide resolved
galuszkak pushed a commit to FlyrInc/apache-airflow that referenced this pull request Mar 5, 2020
…che#4751)

This decreases scheduler delay between tasks by about 20% for larger DAGs,
sometimes more for larger or more complex DAGs.

The delay between tasks can be a major issue, especially when we have dags with 
many subdags, figures out that the scheduling process spends plenty of time in
dependency checking, we took the trigger rule dependency which calls the db for
each task instance, we made it call the db just once for each dag_run
kaxil added a commit to kaxil/airflow that referenced this pull request Mar 31, 2020
…che#4751)

This decreases scheduler delay between tasks by about 20% for larger DAGs,
sometimes more for larger or more complex DAGs.

The delay between tasks can be a major issue, especially when we have dags with
many subdags, figures out that the scheduling process spends plenty of time in
dependency checking, we took the trigger rule dependency which calls the db for
each task instance, we made it call the db just once for each dag_run
kaxil added a commit that referenced this pull request Mar 31, 2020
This decreases scheduler delay between tasks by about 20% for larger DAGs,
sometimes more for larger or more complex DAGs.

The delay between tasks can be a major issue, especially when we have dags with
many subdags, figures out that the scheduling process spends plenty of time in
dependency checking, we took the trigger rule dependency which calls the db for
each task instance, we made it call the db just once for each dag_run
yuqian90 pushed a commit to yuqian90/airflow that referenced this pull request Sep 21, 2020
…che#4751)

This decreases scheduler delay between tasks by about 20% for larger DAGs,
sometimes more for larger or more complex DAGs.

The delay between tasks can be a major issue, especially when we have dags with
many subdags, figures out that the scheduling process spends plenty of time in
dependency checking, we took the trigger rule dependency which calls the db for
each task instance, we made it call the db just once for each dag_run

(cherry picked from commit 50efda5)
turbaszek pushed a commit to PolideaInternal/airflow that referenced this pull request Oct 1, 2020
…che#4751)

This decreases scheduler delay between tasks by about 20% for larger DAGs,
sometimes more for larger or more complex DAGs.

The delay between tasks can be a major issue, especially when we have dags with
many subdags, figures out that the scheduling process spends plenty of time in
dependency checking, we took the trigger rule dependency which calls the db for
each task instance, we made it call the db just once for each dag_run
turbaszek pushed a commit to PolideaInternal/airflow that referenced this pull request Oct 1, 2020
[AIRFLOW-3607] Only query DB once per DAG run for TriggerRuleDep (apache#4751)

This decreases scheduler delay between tasks by about 20% for larger DAGs,
sometimes more for larger or more complex DAGs.

The delay between tasks can be a major issue, especially when we have dags with
many subdags, figures out that the scheduling process spends plenty of time in
dependency checking, we took the trigger rule dependency which calls the db for
each task instance, we made it call the db just once for each dag_run

[AIRFLOW-3607] fix scheduler bug related to concurrency and depends on past (apache#7402)

commit 50efda5 introduced a bug that
prevents scheduler from scheduling tasks with the following properties:

* has depends on past set to True
* has custom concurrency limit

[AIRFLOW-3607] Optimize dep checking when depends on past set and concurrency limit (apache#7503)
yuqian90 pushed a commit to yuqian90/airflow that referenced this pull request Oct 26, 2020
…che#4751)

This decreases scheduler delay between tasks by about 20% for larger DAGs,
sometimes more for larger or more complex DAGs.

The delay between tasks can be a major issue, especially when we have dags with
many subdags, figures out that the scheduling process spends plenty of time in
dependency checking, we took the trigger rule dependency which calls the db for
each task instance, we made it call the db just once for each dag_run

(cherry picked from commit 50efda5)
kaxil pushed a commit that referenced this pull request Oct 26, 2020
This decreases scheduler delay between tasks by about 20% for larger DAGs,
sometimes more for larger or more complex DAGs.

The delay between tasks can be a major issue, especially when we have dags with
many subdags, figures out that the scheduling process spends plenty of time in
dependency checking, we took the trigger rule dependency which calls the db for
each task instance, we made it call the db just once for each dag_run

(cherry picked from commit 50efda5)
kaxil pushed a commit that referenced this pull request Nov 12, 2020
This decreases scheduler delay between tasks by about 20% for larger DAGs,
sometimes more for larger or more complex DAGs.

The delay between tasks can be a major issue, especially when we have dags with
many subdags, figures out that the scheduling process spends plenty of time in
dependency checking, we took the trigger rule dependency which calls the db for
each task instance, we made it call the db just once for each dag_run

(cherry picked from commit 50efda5)
kaxil pushed a commit that referenced this pull request Nov 12, 2020
This decreases scheduler delay between tasks by about 20% for larger DAGs,
sometimes more for larger or more complex DAGs.

The delay between tasks can be a major issue, especially when we have dags with
many subdags, figures out that the scheduling process spends plenty of time in
dependency checking, we took the trigger rule dependency which calls the db for
each task instance, we made it call the db just once for each dag_run

(cherry picked from commit 50efda5)
@potiuk potiuk added this to the Airflow 1.10.13 milestone Nov 14, 2020
@potiuk potiuk added the type:improvement Changelog: Improvements label Nov 14, 2020
potiuk pushed a commit that referenced this pull request Nov 16, 2020
This decreases scheduler delay between tasks by about 20% for larger DAGs,
sometimes more for larger or more complex DAGs.

The delay between tasks can be a major issue, especially when we have dags with
many subdags, figures out that the scheduling process spends plenty of time in
dependency checking, we took the trigger rule dependency which calls the db for
each task instance, we made it call the db just once for each dag_run

(cherry picked from commit 50efda5)
kaxil pushed a commit that referenced this pull request Nov 18, 2020
This decreases scheduler delay between tasks by about 20% for larger DAGs,
sometimes more for larger or more complex DAGs.

The delay between tasks can be a major issue, especially when we have dags with
many subdags, figures out that the scheduling process spends plenty of time in
dependency checking, we took the trigger rule dependency which calls the db for
each task instance, we made it call the db just once for each dag_run

(cherry picked from commit 50efda5)
kaxil pushed a commit to astronomer/airflow that referenced this pull request Nov 24, 2020
…che#4751)

This decreases scheduler delay between tasks by about 20% for larger DAGs,
sometimes more for larger or more complex DAGs.

The delay between tasks can be a major issue, especially when we have dags with
many subdags, figures out that the scheduling process spends plenty of time in
dependency checking, we took the trigger rule dependency which calls the db for
each task instance, we made it call the db just once for each dag_run

(cherry picked from commit 50efda5)
(cherry picked from commit cb750c1)
kaxil pushed a commit to astronomer/airflow that referenced this pull request Nov 27, 2020
…che#4751)

This decreases scheduler delay between tasks by about 20% for larger DAGs,
sometimes more for larger or more complex DAGs.

The delay between tasks can be a major issue, especially when we have dags with
many subdags, figures out that the scheduling process spends plenty of time in
dependency checking, we took the trigger rule dependency which calls the db for
each task instance, we made it call the db just once for each dag_run

(cherry picked from commit 50efda5)
(cherry picked from commit cb750c1)
cfei18 pushed a commit to cfei18/incubator-airflow that referenced this pull request Mar 5, 2021
This decreases scheduler delay between tasks by about 20% for larger DAGs,
sometimes more for larger or more complex DAGs.

The delay between tasks can be a major issue, especially when we have dags with
many subdags, figures out that the scheduling process spends plenty of time in
dependency checking, we took the trigger rule dependency which calls the db for
each task instance, we made it call the db just once for each dag_run

(cherry picked from commit 50efda5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler type:improvement Changelog: Improvements
Projects
None yet
Development

Successfully merging this pull request may close these issues.