Skip to content

Commit

Permalink
fixes for the code-review
Browse files Browse the repository at this point in the history
  • Loading branch information
howardyoo authored and potiuk committed Jul 19, 2024
1 parent 11165b8 commit d33fe10
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 28 deletions.
32 changes: 16 additions & 16 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,22 +571,22 @@ def _run_parsing_loop(self):
with Trace.start_span(span_name="dag_parsing_loop", component="DagFileProcessorManager") as span:
loop_start_time = time.monotonic()
ready = multiprocessing.connection.wait(self.waitables.keys(), timeout=poll_time)
self.heartbeat()
if span.is_recording():
span.add_event(name="heartbeat()")
span.add_event(name="heartbeat")
self.heartbeat()
if self._direct_scheduler_conn is not None and self._direct_scheduler_conn in ready:
agent_signal = self._direct_scheduler_conn.recv()

self.log.debug("Received %s signal from DagFileProcessorAgent", agent_signal)
if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
self.terminate()
if span.is_recording():
span.add_event(name="terminate()")
span.add_event(name="terminate")
self.terminate()
break
elif agent_signal == DagParsingSignal.END_MANAGER:
self.end()
if span.is_recording():
span.add_event(name="end()")
span.add_event(name="end")
self.end()
sys.exit(os.EX_OK)
elif agent_signal == DagParsingSignal.AGENT_RUN_ONCE:
# continue the loop to parse dags
Expand Down Expand Up @@ -623,28 +623,28 @@ def _run_parsing_loop(self):
DagWarning.purge_inactive_dag_warnings()
refreshed_dag_dir = self._refresh_dag_dir()

self._kill_timed_out_processors()
if span.is_recording():
span.add_event(name="_kill_timed_out_processors()")
span.add_event(name="_kill_timed_out_processors")
self._kill_timed_out_processors()

# Generate more file paths to process if we processed all the files already. Note for this
# to clear down, we must have cleared all files found from scanning the dags dir _and_ have
# cleared all files added as a result of callbacks
if not self._file_path_queue:
if span.is_recording():
span.add_event(name="prepare_file_path_queue()")
self.emit_metrics()
if span.is_recording():
span.add_event(name="prepare_file_path_queue")
self.prepare_file_path_queue()

# if new files found in dag dir, add them
elif refreshed_dag_dir:
if span.is_recording():
span.add_event(name="add_new_file_path_to_queue()")
span.add_event(name="add_new_file_path_to_queue")
self.add_new_file_path_to_queue()

self._refresh_requested_filelocs()
if span.is_recording():
span.add_event(name="start_new_processes()")
span.add_event(name="start_new_processes")
self.start_new_processes()

# Update number of loop iteration.
Expand All @@ -659,13 +659,13 @@ def _run_parsing_loop(self):
self.wait_until_finished()

# Collect anything else that has finished, but don't kick off any more processors
self.collect_results()
if span.is_recording():
span.add_event(name="collect_results()")
span.add_event(name="collect_results")
self.collect_results()

self._print_stat()
if span.is_recording():
span.add_event(name="print_stat()")
span.add_event(name="print_stat")
self._print_stat()

all_files_processed = all(self.get_last_finish_time(x) is not None for x in self.file_paths)
max_runs_reached = self.max_runs_reached()
Expand Down
5 changes: 3 additions & 2 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from airflow.configuration import conf
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.stats import Stats
from airflow.traces import NO_TRACE_ID
from airflow.traces.tracer import Trace, gen_context, span
from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down Expand Up @@ -380,7 +381,7 @@ def fail(self, key: TaskInstanceKey, info=None) -> None:
:param key: Unique key for the task instance
"""
trace_id = Trace.get_current_span().get_span_context().trace_id
if trace_id != 1:
if trace_id != NO_TRACE_ID:
span_id = int(gen_span_id_from_ti_key(key, as_int=True))
with Trace.start_span(
span_name="fail",
Expand All @@ -403,7 +404,7 @@ def success(self, key: TaskInstanceKey, info=None) -> None:
:param key: Unique key for the task instance
"""
trace_id = Trace.get_current_span().get_span_context().trace_id
if trace_id != 1:
if trace_id != NO_TRACE_ID:
span_id = int(gen_span_id_from_ti_key(key, as_int=True))
with Trace.start_span(
span_name="success",
Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def heartbeat(
)
sleep_for = max(0, seconds_remaining)
if span.is_recording():
span.add_event(name="sleep()", attributes={"sleep_for": sleep_for})
span.add_event(name="sleep", attributes={"sleep_for": sleep_for})
sleep(sleep_for)

job = Job._update_heartbeat(job=self, session=session)
Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/local_task_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def sigusr2_debug_handler(signum, frame):
return return_code

if span.is_recording():
span.add_event(name="perform_heartbeat()")
span.add_event(name="perform_heartbeat")
perform_heartbeat(
job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=False
)
Expand Down
12 changes: 6 additions & 6 deletions airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,28 +366,28 @@ def _run_trigger_loop(self) -> None:
with Trace.start_span(span_name="triggerer_job_loop", component="TriggererJobRunner") as span:
# Clean out unused triggers
if span.is_recording():
span.add_event(name="Trigger.clean_unused()")
span.add_event(name="Trigger.clean_unused")
Trigger.clean_unused()
# Load/delete triggers
if span.is_recording():
span.add_event(name="load_triggers()")
span.add_event(name="load_triggers")
self.load_triggers()
# Handle events
if span.is_recording():
span.add_event(name="handle_events()")
span.add_event(name="handle_events")
self.handle_events()
# Handle failed triggers
if span.is_recording():
span.add_event(name="handle_failed_triggers()")
span.add_event(name="handle_failed_triggers")
self.handle_failed_triggers()
if span.is_recording():
span.add_event(name="perform_heartbeat()")
span.add_event(name="perform_heartbeat")
perform_heartbeat(
self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=True
)
# Collect stats
if span.is_recording():
span.add_event(name="emit_metrics()")
span.add_event(name="emit_metrics")
self.emit_metrics()
# Idle sleep
time.sleep(1)
Expand Down
1 change: 1 addition & 0 deletions airflow/traces/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@

TRACEPARENT = "traceparent"
TRACESTATE = "tracestate"
NO_TRACE_ID = 1
5 changes: 3 additions & 2 deletions airflow/traces/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import logging
from typing import TYPE_CHECKING

from airflow.traces import NO_TRACE_ID
from airflow.utils.hashlib_wrapper import md5
from airflow.utils.state import TaskInstanceState

Expand All @@ -41,7 +42,7 @@ def _gen_id(seeds: list[str], as_int: bool = False, type: int = TRACE_ID) -> str

def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int:
if dag_run.start_date is None:
return 1
return NO_TRACE_ID

"""Generate trace id from DagRun."""
return _gen_id(
Expand All @@ -62,7 +63,7 @@ def gen_span_id_from_ti_key(ti_key: TaskInstanceKey, as_int: bool = False) -> st
def gen_dag_span_id(dag_run: DagRun, as_int: bool = False) -> str | int:
"""Generate dag's root span id using dag_run."""
if dag_run.start_date is None:
return 1
return NO_TRACE_ID

return _gen_id(
[dag_run.dag_id, str(dag_run.run_id), str(dag_run.start_date.timestamp())],
Expand Down

0 comments on commit d33fe10

Please sign in to comment.