diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py b/airflow/api_internal/endpoints/rpc_api_endpoint.py index 1a12ae5bf214a..eb7a0fbfa3061 100644 --- a/airflow/api_internal/endpoints/rpc_api_endpoint.py +++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py @@ -39,6 +39,7 @@ def _initialize_map() -> dict[str, Callable]: functions: list[Callable] = [ DagFileProcessor.update_import_errors, + DagFileProcessor.manage_slas, DagModel.get_paused_dag_ids, DagFileProcessorManager.clear_nonexistent_import_errors, XCom.get_value, diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 5c9b7ebfd6e8d..a50ca933dc8e0 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -365,8 +365,10 @@ def __init__(self, dag_ids: list[str] | None, dag_directory: str, log: logging.L self._dag_directory = dag_directory self.dag_warnings: set[tuple[str, str]] = set() + @classmethod + @internal_api_call @provide_session - def manage_slas(self, dag: DAG, session: Session = None) -> None: + def manage_slas(cls, dag_folder, dag_id: str, session: Session = NEW_SESSION) -> None: """ Finding all tasks that have SLAs defined, and sending alert emails when needed. @@ -375,9 +377,11 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None: We are assuming that the scheduler runs often, so we only check for tasks that should have succeeded in the past hour. """ - self.log.info("Running SLA Checks for %s", dag.dag_id) + dagbag = DagFileProcessor._get_dagbag(dag_folder) + dag = dagbag.get_dag(dag_id) + cls.logger().info("Running SLA Checks for %s", dag.dag_id) if not any(isinstance(ti.sla, timedelta) for ti in dag.tasks): - self.log.info("Skipping SLA check for %s because no tasks in DAG have SLAs", dag) + cls.logger().info("Skipping SLA check for %s because no tasks in DAG have SLAs", dag) return qry = ( @@ -481,7 +485,7 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None: else [dag.sla_miss_callback] ) for callback in callbacks: - self.log.info("Calling SLA miss callback %s", callback) + cls.logger().info("Calling SLA miss callback %s", callback) try: callback(dag, task_list, blocking_task_list, slas, blocking_tis) notification_sent = True @@ -493,7 +497,7 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None: "func_name": callback.__name__, }, ) - self.log.exception( + cls.logger().exception( "Could not call sla_miss_callback(%s) for DAG %s", callback.__name__, dag.dag_id, @@ -512,7 +516,7 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None: task = dag.get_task(sla.task_id) except TaskNotFound: # task already deleted from DAG, skip it - self.log.warning( + cls.logger().warning( "Task %s doesn't exist in DAG anymore, skipping SLA miss notification.", sla.task_id ) continue @@ -532,7 +536,9 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None: notification_sent = True except Exception: Stats.incr("sla_email_notification_failure", tags={"dag_id": dag.dag_id}) - self.log.exception("Could not send SLA Miss email notification for DAG %s", dag.dag_id) + cls.logger().exception( + "Could not send SLA Miss email notification for DAG %s", dag.dag_id + ) # If we sent any notification, update the sla_miss table if notification_sent: for sla in slas: @@ -652,7 +658,7 @@ def execute_callbacks( if isinstance(request, TaskCallbackRequest): self._execute_task_callbacks(dagbag, request, session=session) elif isinstance(request, SlaCallbackRequest): - self.manage_slas(dagbag.get_dag(request.dag_id), session=session) + DagFileProcessor.manage_slas(dagbag.dag_folder, request.dag_id, session=session) elif isinstance(request, DagCallbackRequest): self._execute_dag_callbacks(dagbag, request, session) except Exception: @@ -736,6 +742,15 @@ def _execute_task_callbacks(self, dagbag: DagBag | None, request: TaskCallbackRe self.log.info("Executed failure callback for %s in state %s", ti, ti.state) session.flush() + @classmethod + def _get_dagbag(cls, file_path: str): + try: + return DagBag(file_path, include_examples=False) + except Exception: + cls.logger().exception("Failed at reloading the DAG file %s", file_path) + Stats.incr("dag_file_refresh_error", 1, 1) + raise + @provide_session def process_file( self, @@ -766,7 +781,7 @@ def process_file( self.log.info("Processing file %s for tasks to queue", file_path) try: - dagbag = DagBag(file_path, include_examples=False) + dagbag = DagFileProcessor._get_dagbag(file_path) except Exception: self.log.exception("Failed at reloading the DAG file %s", file_path) Stats.incr("dag_file_refresh_error", 1, 1, tags={"file_path": file_path}) diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py index 85ff71a94f1bf..79746c8492396 100644 --- a/airflow/utils/log/logging_mixin.py +++ b/airflow/utils/log/logging_mixin.py @@ -24,7 +24,7 @@ import sys from io import IOBase from logging import Handler, Logger, StreamHandler -from typing import IO, cast +from typing import IO, Any, TypeVar, cast from airflow.settings import IS_K8S_EXECUTOR_POD @@ -59,6 +59,9 @@ def remove_escape_codes(text: str) -> str: return ANSI_ESCAPE.sub("", text) +_T = TypeVar("_T") + + class LoggingMixin: """Convenience super-class to have a logger configured with the class name""" @@ -67,12 +70,21 @@ class LoggingMixin: def __init__(self, context=None): self._set_context(context) + @staticmethod + def _get_log(obj: Any, clazz: type[_T]) -> Logger: + if obj._log is None: + obj._log = logging.getLogger(f"{clazz.__module__}.{clazz.__name__}") + return obj._log + + @classmethod + def logger(cls) -> Logger: + """Returns a logger.""" + return LoggingMixin._get_log(cls, cls) + @property def log(self) -> Logger: """Returns a logger.""" - if self._log is None: - self._log = logging.getLogger(self.__class__.__module__ + "." + self.__class__.__name__) - return self._log + return LoggingMixin._get_log(self, self.__class__) def _set_context(self, context): if context is not None: diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index 8bad4c8cb85ac..12528ae462c93 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -105,16 +105,15 @@ def _process_file(self, file_path, dag_directory, session): dag_file_processor.process_file(file_path, [], False, session) - def test_dag_file_processor_sla_miss_callback(self, create_dummy_dag): + @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") + def test_dag_file_processor_sla_miss_callback(self, mock_get_dagbag, create_dummy_dag, get_test_dag): """ Test that the dag file processor calls the sla miss callback """ session = settings.Session() - sla_callback = MagicMock() - # Create dag with a start of 1 day ago, but an sla of 0 - # so we'll already have an sla_miss on the books. + # Create dag with a start of 1 day ago, but a sla of 0, so we'll already have a sla_miss on the books. test_start_date = timezone.utcnow() - datetime.timedelta(days=1) dag, task = create_dummy_dag( dag_id="test_sla_miss", @@ -124,17 +123,18 @@ def test_dag_file_processor_sla_miss_callback(self, create_dummy_dag): ) session.merge(TaskInstance(task=task, execution_date=test_start_date, state="success")) - session.merge(SlaMiss(task_id="dummy", dag_id="test_sla_miss", execution_date=test_start_date)) - dag_file_processor = DagFileProcessor( - dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock() - ) - dag_file_processor.manage_slas(dag=dag, session=session) + mock_dagbag = mock.Mock() + mock_dagbag.get_dag.return_value = dag + mock_get_dagbag.return_value = mock_dagbag + + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) assert sla_callback.called - def test_dag_file_processor_sla_miss_callback_invalid_sla(self, create_dummy_dag): + @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") + def test_dag_file_processor_sla_miss_callback_invalid_sla(self, mock_get_dagbag, create_dummy_dag): """ Test that the dag file processor does not call the sla miss callback when given an invalid sla @@ -155,16 +155,17 @@ def test_dag_file_processor_sla_miss_callback_invalid_sla(self, create_dummy_dag ) session.merge(TaskInstance(task=task, execution_date=test_start_date, state="success")) - session.merge(SlaMiss(task_id="dummy", dag_id="test_sla_miss", execution_date=test_start_date)) - dag_file_processor = DagFileProcessor( - dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock() - ) - dag_file_processor.manage_slas(dag=dag, session=session) + mock_dagbag = mock.Mock() + mock_dagbag.get_dag.return_value = dag + mock_get_dagbag.return_value = mock_dagbag + + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) sla_callback.assert_not_called() - def test_dag_file_processor_sla_miss_callback_sent_notification(self, create_dummy_dag): + @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") + def test_dag_file_processor_sla_miss_callback_sent_notification(self, mock_get_dagbag, create_dummy_dag): """ Test that the dag file processor does not call the sla_miss_callback when a notification has already been sent @@ -198,16 +199,20 @@ def test_dag_file_processor_sla_miss_callback_sent_notification(self, create_dum ) ) + mock_dagbag = mock.Mock() + mock_dagbag.get_dag.return_value = dag + mock_get_dagbag.return_value = mock_dagbag + # Now call manage_slas and see if the sla_miss callback gets called - dag_file_processor = DagFileProcessor( - dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock() - ) - dag_file_processor.manage_slas(dag=dag, session=session) + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) sla_callback.assert_not_called() @mock.patch("airflow.dag_processing.processor.Stats.incr") - def test_dag_file_processor_sla_miss_doesnot_raise_integrity_error(self, mock_stats_incr, dag_maker): + @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") + def test_dag_file_processor_sla_miss_doesnot_raise_integrity_error( + self, mock_get_dagbag, mock_stats_incr, dag_maker + ): """ Test that the dag file processor does not try to insert already existing item into the database """ @@ -229,10 +234,11 @@ def test_dag_file_processor_sla_miss_doesnot_raise_integrity_error(self, mock_st session.merge(ti) session.flush() - dag_file_processor = DagFileProcessor( - dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock() - ) - dag_file_processor.manage_slas(dag=dag, session=session) + mock_dagbag = mock.Mock() + mock_dagbag.get_dag.return_value = dag + mock_get_dagbag.return_value = mock_dagbag + + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) sla_miss_count = ( session.query(SlaMiss) .filter( @@ -249,11 +255,12 @@ def test_dag_file_processor_sla_miss_doesnot_raise_integrity_error(self, mock_st # because of existing SlaMiss above. # Since this is run often, it's possible that it runs before another # ti is successful thereby trying to insert a duplicate record. - dag_file_processor.manage_slas(dag=dag, session=session) + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) @mock.patch("airflow.dag_processing.processor.Stats.incr") + @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") def test_dag_file_processor_sla_miss_continue_checking_the_task_instances_after_recording_missing_sla( - self, mock_stats_incr, dag_maker + self, mock_get_dagbag, mock_stats_incr, dag_maker ): """ Test that the dag file processor continue checking subsequent task instances @@ -279,10 +286,11 @@ def test_dag_file_processor_sla_miss_continue_checking_the_task_instances_after_ ) session.flush() - dag_file_processor = DagFileProcessor( - dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock() - ) - dag_file_processor.manage_slas(dag=dag, session=session) + mock_dagbag = mock.Mock() + mock_dagbag.get_dag.return_value = dag + mock_get_dagbag.return_value = mock_dagbag + + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) sla_miss_count = ( session.query(SlaMiss) .filter( @@ -296,8 +304,12 @@ def test_dag_file_processor_sla_miss_continue_checking_the_task_instances_after_ "sla_missed", tags={"dag_id": "test_sla_miss", "run_id": "test", "task_id": "dummy"} ) + @patch.object(DagFileProcessor, "logger") @mock.patch("airflow.dag_processing.processor.Stats.incr") - def test_dag_file_processor_sla_miss_callback_exception(self, mock_stats_incr, create_dummy_dag): + @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") + def test_dag_file_processor_sla_miss_callback_exception( + self, mock_get_dagbag, mock_stats_incr, mock_get_log, create_dummy_dag + ): """ Test that the dag file processor gracefully logs an exception if there is a problem calling the sla_miss_callback @@ -327,9 +339,13 @@ def test_dag_file_processor_sla_miss_callback_exception(self, mock_stats_incr, c ) # Now call manage_slas and see if the sla_miss callback gets called - mock_log = mock.MagicMock() - dag_file_processor = DagFileProcessor(dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock_log) - dag_file_processor.manage_slas(dag=dag, session=session) + mock_log = mock.Mock() + mock_get_log.return_value = mock_log + mock_dagbag = mock.Mock() + mock_dagbag.get_dag.return_value = dag + mock_get_dagbag.return_value = mock_dagbag + + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) assert sla_callback.called mock_log.exception.assert_called_once_with( "Could not call sla_miss_callback(%s) for DAG %s", @@ -342,8 +358,9 @@ def test_dag_file_processor_sla_miss_callback_exception(self, mock_stats_incr, c ) @mock.patch("airflow.dag_processing.processor.send_email") + @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") def test_dag_file_processor_only_collect_emails_from_sla_missed_tasks( - self, mock_send_email, create_dummy_dag + self, mock_get_dagbag, mock_send_email, create_dummy_dag ): session = settings.Session() @@ -363,11 +380,11 @@ def test_dag_file_processor_only_collect_emails_from_sla_missed_tasks( session.merge(SlaMiss(task_id="sla_missed", dag_id="test_sla_miss", execution_date=test_start_date)) - dag_file_processor = DagFileProcessor( - dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock() - ) + mock_dagbag = mock.Mock() + mock_dagbag.get_dag.return_value = dag + mock_get_dagbag.return_value = mock_dagbag - dag_file_processor.manage_slas(dag=dag, session=session) + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) assert len(mock_send_email.call_args_list) == 1 @@ -375,10 +392,12 @@ def test_dag_file_processor_only_collect_emails_from_sla_missed_tasks( assert email1 in send_email_to assert email2 not in send_email_to + @patch.object(DagFileProcessor, "logger") @mock.patch("airflow.dag_processing.processor.Stats.incr") @mock.patch("airflow.utils.email.send_email") + @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") def test_dag_file_processor_sla_miss_email_exception( - self, mock_send_email, mock_stats_incr, create_dummy_dag + self, mock_get_dagbag, mock_send_email, mock_stats_incr, mock_get_log, create_dummy_dag ): """ Test that the dag file processor gracefully logs an exception if there is a problem @@ -403,10 +422,13 @@ def test_dag_file_processor_sla_miss_email_exception( # Create an SlaMiss where notification was sent, but email was not session.merge(SlaMiss(task_id="dummy", dag_id="test_sla_miss", execution_date=test_start_date)) - mock_log = mock.MagicMock() - dag_file_processor = DagFileProcessor(dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock_log) + mock_log = mock.Mock() + mock_get_log.return_value = mock_log + mock_dagbag = mock.Mock() + mock_dagbag.get_dag.return_value = dag + mock_get_dagbag.return_value = mock_dagbag - dag_file_processor.manage_slas(dag=dag, session=session) + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) mock_log.exception.assert_called_once_with( "Could not send SLA Miss email notification for DAG %s", "test_sla_miss" ) @@ -414,7 +436,8 @@ def test_dag_file_processor_sla_miss_email_exception( "sla_email_notification_failure", tags={"dag_id": "test_sla_miss"} ) - def test_dag_file_processor_sla_miss_deleted_task(self, create_dummy_dag): + @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") + def test_dag_file_processor_sla_miss_deleted_task(self, mock_get_dagbag, create_dummy_dag): """ Test that the dag file processor will not crash when trying to send sla miss notification for a deleted task @@ -436,9 +459,11 @@ def test_dag_file_processor_sla_miss_deleted_task(self, create_dummy_dag): SlaMiss(task_id="dummy_deleted", dag_id="test_sla_miss", execution_date=test_start_date) ) - mock_log = mock.MagicMock() - dag_file_processor = DagFileProcessor(dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock_log) - dag_file_processor.manage_slas(dag=dag, session=session) + mock_dagbag = mock.Mock() + mock_dagbag.get_dag.return_value = dag + mock_get_dagbag.return_value = mock_dagbag + + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) @patch.object(TaskInstance, "handle_failure") def test_execute_on_failure_callbacks(self, mock_ti_handle_failure):