From 4a02e0a287f880eab98979de565c061747b35f27 Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Sat, 5 Dec 2020 21:56:51 +0000 Subject: [PATCH] Don't emit first_task_scheduling_delay metric for only-once dags (#12835) Dags with a schedule interval of None, or `@once` don't have a following schedule, so we can't realistically calculate this metric. Additionally, this changes the emitted metric from seconds to milliseconds -- all timers to statsd should be in milliseconds -- this is what Statsd and apps that consume data from there expect. See #10629 for more details. This will be a "breaking" change from 1.10.14, where the metric was back-ported to, but was (incorrectly) emitting seconds. --- UPDATING.md | 5 ++ airflow/models/dagrun.py | 22 ++++-- .../logging-monitoring/metrics.rst | 2 +- tests/models/test_dagrun.py | 75 ++++++++++++------- 4 files changed, 67 insertions(+), 37 deletions(-) diff --git a/UPDATING.md b/UPDATING.md index 7ef41b37445ae..b9f02a97865cd 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -109,6 +109,11 @@ This is to align the name with the actual code where the Scheduler launches the `[scheduler] parsing_processes` to Parse DAG files, calculates next DagRun date for each DAG, serialize them and store them in the DB. +### `dagrun.*.first_task_scheduling_delay` metric is now in milliseconds + +This metric was added in 1.10.14 emitting a value as seconds, but for consistency with Statsd generally this +metric has been changed to emit a value in milliseconds + ## Airflow 2.0.0b1 ### Rename policy to task_policy diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 307b7265c72fb..5979237a1ad45 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -573,14 +573,20 @@ def _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis): Note, the stat will only be emitted if the DagRun is a scheduler triggered one (i.e. external_trigger is False). """ + if self.state == State.RUNNING: + return + if self.external_trigger: + return + if not finished_tis: + return + try: - if self.state == State.RUNNING: - return - if self.external_trigger: - return - if not finished_tis: - return dag = self.get_dag() + + if not self.dag.schedule_interval or self.dag.schedule_interval == "@once": + # We can't emit this metric if there is no following schedule to cacluate from! + return + ordered_tis_by_start_date = [ti for ti in finished_tis if ti.start_date] ordered_tis_by_start_date.sort(key=lambda ti: ti.start_date, reverse=False) first_start_date = ordered_tis_by_start_date[0].start_date @@ -588,8 +594,8 @@ def _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis): # dag.following_schedule calculates the expected start datetime for a scheduled dagrun # i.e. a daily flow for execution date 1/1/20 actually runs on 1/2/20 hh:mm:ss, # and ti.start_date will be 1/2/20 hh:mm:ss so the following schedule is comparison - true_delay = (first_start_date - dag.following_schedule(self.execution_date)).total_seconds() - if true_delay >= 0: + true_delay = first_start_date - dag.following_schedule(self.execution_date) + if true_delay.total_seconds() > 0: Stats.timing(f'dagrun.{dag.dag_id}.first_task_scheduling_delay', true_delay) except Exception as e: self.log.warning(f'Failed to record first_task_scheduling_delay metric:\n{e}') diff --git a/docs/apache-airflow/logging-monitoring/metrics.rst b/docs/apache-airflow/logging-monitoring/metrics.rst index 043777f8dd040..ff5374900f12c 100644 --- a/docs/apache-airflow/logging-monitoring/metrics.rst +++ b/docs/apache-airflow/logging-monitoring/metrics.rst @@ -145,5 +145,5 @@ Name Description start date and the actual DagRun start date ``scheduler.critical_section_duration`` Milliseconds spent in the critical section of scheduler loop -- only a single scheduler can enter this loop at a time -``dagrun..first_task_scheduling_delay`` Seconds elapsed between first task start_date and dagrun expected start +``dagrun..first_task_scheduling_delay`` Milliseconds elapsed between first task start_date and dagrun expected start =================================================== ======================================================================== diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index f5a0cbdb392e2..b0c1046cf45bd 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -739,41 +739,60 @@ def test_no_scheduling_delay_for_nonscheduled_runs(self, stats_mock): dag_run.update_state() self.assertNotIn(call(f'dagrun.{dag.dag_id}.first_task_scheduling_delay'), stats_mock.mock_calls) - @mock.patch.object(Stats, 'timing') - def test_emit_scheduling_delay(self, stats_mock): + @parameterized.expand( + [ + ("*/5 * * * *", True), + (None, False), + ("@once", False), + ] + ) + def test_emit_scheduling_delay(self, schedule_interval, expected): """ Tests that dag scheduling delay stat is set properly once running scheduled dag. dag_run.update_state() invokes the _emit_true_scheduling_delay_stats_for_finished_state method. """ - dag = DAG(dag_id='test_emit_dag_stats', start_date=days_ago(1)) + dag = DAG(dag_id='test_emit_dag_stats', start_date=days_ago(1), schedule_interval=schedule_interval) dag_task = DummyOperator(task_id='dummy', dag=dag, owner='airflow') session = settings.Session() - orm_dag = DagModel( - dag_id=dag.dag_id, - has_task_concurrency_limits=False, - next_dagrun=dag.start_date, - next_dagrun_create_after=dag.following_schedule(dag.start_date), - is_active=True, - ) - session.add(orm_dag) - session.flush() - dag_run = dag.create_dagrun( - run_type=DagRunType.SCHEDULED, - state=State.SUCCESS, - execution_date=dag.start_date, - start_date=dag.start_date, - session=session, - ) - ti = dag_run.get_task_instance(dag_task.task_id) - ti.set_state(State.SUCCESS, session) - session.commit() - session.close() - dag_run.update_state() - true_delay = (ti.start_date - dag.following_schedule(dag_run.execution_date)).total_seconds() - stats_mock.assert_called() - sched_delay_stat_call = call(f'dagrun.{dag.dag_id}.first_task_scheduling_delay', true_delay) - self.assertIn(sched_delay_stat_call, stats_mock.mock_calls) + try: + orm_dag = DagModel( + dag_id=dag.dag_id, + has_task_concurrency_limits=False, + next_dagrun=dag.start_date, + next_dagrun_create_after=dag.following_schedule(dag.start_date), + is_active=True, + ) + session.add(orm_dag) + session.flush() + dag_run = dag.create_dagrun( + run_type=DagRunType.SCHEDULED, + state=State.SUCCESS, + execution_date=dag.start_date, + start_date=dag.start_date, + session=session, + ) + ti = dag_run.get_task_instance(dag_task.task_id, session) + ti.set_state(State.SUCCESS, session) + session.flush() + + with mock.patch.object(Stats, 'timing') as stats_mock: + dag_run.update_state(session) + + metric_name = f'dagrun.{dag.dag_id}.first_task_scheduling_delay' + + if expected: + true_delay = ti.start_date - dag.following_schedule(dag_run.execution_date) + sched_delay_stat_call = call(metric_name, true_delay) + assert sched_delay_stat_call in stats_mock.mock_calls + else: + # Assert that we never passed the metric + sched_delay_stat_call = call(metric_name, mock.ANY) + assert sched_delay_stat_call not in stats_mock.mock_calls + finally: + # Don't write anything to the DB + session.rollback() + session.close() def test_states_sets(self): """