Skip to content

Commit

Permalink
Don't emit first_task_scheduling_delay metric for only-once dags (#12835
Browse files Browse the repository at this point in the history
)

Dags with a schedule interval of None, or `@once` don't have a following
schedule, so we can't realistically calculate this metric.

Additionally, this changes the emitted metric from seconds to
milliseconds -- all timers to statsd should be in milliseconds -- this
is what Statsd and apps that consume data from there expect. See #10629
for more details.

This will be a "breaking" change from 1.10.14, where the metric was
back-ported to, but was (incorrectly) emitting seconds.
  • Loading branch information
ashb authored Dec 5, 2020
1 parent 37b2679 commit 4a02e0a
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 37 deletions.
5 changes: 5 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ This is to align the name with the actual code where the Scheduler launches the
`[scheduler] parsing_processes` to Parse DAG files, calculates next DagRun date for each DAG,
serialize them and store them in the DB.

### `dagrun.*.first_task_scheduling_delay` metric is now in milliseconds

This metric was added in 1.10.14 emitting a value as seconds, but for consistency with Statsd generally this
metric has been changed to emit a value in milliseconds

## Airflow 2.0.0b1

### Rename policy to task_policy
Expand Down
22 changes: 14 additions & 8 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,23 +573,29 @@ def _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis):
Note, the stat will only be emitted if the DagRun is a scheduler triggered one
(i.e. external_trigger is False).
"""
if self.state == State.RUNNING:
return
if self.external_trigger:
return
if not finished_tis:
return

try:
if self.state == State.RUNNING:
return
if self.external_trigger:
return
if not finished_tis:
return
dag = self.get_dag()

if not self.dag.schedule_interval or self.dag.schedule_interval == "@once":
# We can't emit this metric if there is no following schedule to cacluate from!
return

ordered_tis_by_start_date = [ti for ti in finished_tis if ti.start_date]
ordered_tis_by_start_date.sort(key=lambda ti: ti.start_date, reverse=False)
first_start_date = ordered_tis_by_start_date[0].start_date
if first_start_date:
# dag.following_schedule calculates the expected start datetime for a scheduled dagrun
# i.e. a daily flow for execution date 1/1/20 actually runs on 1/2/20 hh:mm:ss,
# and ti.start_date will be 1/2/20 hh:mm:ss so the following schedule is comparison
true_delay = (first_start_date - dag.following_schedule(self.execution_date)).total_seconds()
if true_delay >= 0:
true_delay = first_start_date - dag.following_schedule(self.execution_date)
if true_delay.total_seconds() > 0:
Stats.timing(f'dagrun.{dag.dag_id}.first_task_scheduling_delay', true_delay)
except Exception as e:
self.log.warning(f'Failed to record first_task_scheduling_delay metric:\n{e}')
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/logging-monitoring/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -145,5 +145,5 @@ Name Description
start date and the actual DagRun start date
``scheduler.critical_section_duration`` Milliseconds spent in the critical section of scheduler loop --
only a single scheduler can enter this loop at a time
``dagrun.<dag_id>.first_task_scheduling_delay`` Seconds elapsed between first task start_date and dagrun expected start
``dagrun.<dag_id>.first_task_scheduling_delay`` Milliseconds elapsed between first task start_date and dagrun expected start
=================================================== ========================================================================
75 changes: 47 additions & 28 deletions tests/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -739,41 +739,60 @@ def test_no_scheduling_delay_for_nonscheduled_runs(self, stats_mock):
dag_run.update_state()
self.assertNotIn(call(f'dagrun.{dag.dag_id}.first_task_scheduling_delay'), stats_mock.mock_calls)

@mock.patch.object(Stats, 'timing')
def test_emit_scheduling_delay(self, stats_mock):
@parameterized.expand(
[
("*/5 * * * *", True),
(None, False),
("@once", False),
]
)
def test_emit_scheduling_delay(self, schedule_interval, expected):
"""
Tests that dag scheduling delay stat is set properly once running scheduled dag.
dag_run.update_state() invokes the _emit_true_scheduling_delay_stats_for_finished_state method.
"""
dag = DAG(dag_id='test_emit_dag_stats', start_date=days_ago(1))
dag = DAG(dag_id='test_emit_dag_stats', start_date=days_ago(1), schedule_interval=schedule_interval)
dag_task = DummyOperator(task_id='dummy', dag=dag, owner='airflow')

session = settings.Session()
orm_dag = DagModel(
dag_id=dag.dag_id,
has_task_concurrency_limits=False,
next_dagrun=dag.start_date,
next_dagrun_create_after=dag.following_schedule(dag.start_date),
is_active=True,
)
session.add(orm_dag)
session.flush()
dag_run = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
state=State.SUCCESS,
execution_date=dag.start_date,
start_date=dag.start_date,
session=session,
)
ti = dag_run.get_task_instance(dag_task.task_id)
ti.set_state(State.SUCCESS, session)
session.commit()
session.close()
dag_run.update_state()
true_delay = (ti.start_date - dag.following_schedule(dag_run.execution_date)).total_seconds()
stats_mock.assert_called()
sched_delay_stat_call = call(f'dagrun.{dag.dag_id}.first_task_scheduling_delay', true_delay)
self.assertIn(sched_delay_stat_call, stats_mock.mock_calls)
try:
orm_dag = DagModel(
dag_id=dag.dag_id,
has_task_concurrency_limits=False,
next_dagrun=dag.start_date,
next_dagrun_create_after=dag.following_schedule(dag.start_date),
is_active=True,
)
session.add(orm_dag)
session.flush()
dag_run = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
state=State.SUCCESS,
execution_date=dag.start_date,
start_date=dag.start_date,
session=session,
)
ti = dag_run.get_task_instance(dag_task.task_id, session)
ti.set_state(State.SUCCESS, session)
session.flush()

with mock.patch.object(Stats, 'timing') as stats_mock:
dag_run.update_state(session)

metric_name = f'dagrun.{dag.dag_id}.first_task_scheduling_delay'

if expected:
true_delay = ti.start_date - dag.following_schedule(dag_run.execution_date)
sched_delay_stat_call = call(metric_name, true_delay)
assert sched_delay_stat_call in stats_mock.mock_calls
else:
# Assert that we never passed the metric
sched_delay_stat_call = call(metric_name, mock.ANY)
assert sched_delay_stat_call not in stats_mock.mock_calls
finally:
# Don't write anything to the DB
session.rollback()
session.close()

def test_states_sets(self):
"""
Expand Down

0 comments on commit 4a02e0a

Please sign in to comment.