diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py index 8706e444c4bab..03ee0a038faf4 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py @@ -688,15 +688,8 @@ def execute_sync(self, context: Context): self.cleanup( pod=pod_to_clean, remote_pod=self.remote_pod, + context=context, ) - for callback in self.callbacks: - callback.on_pod_cleanup( - pod=pod_to_clean, - client=self.client, - mode=ExecutionMode.SYNC, - context=context, - operator=self, - ) if self.do_xcom_push: return result @@ -952,13 +945,10 @@ def post_complete_action(self, *, pod, remote_pod, context: Context, **kwargs) - self.cleanup( pod=pod, remote_pod=remote_pod, + context=context, ) - for callback in self.callbacks: - callback.on_pod_cleanup( - pod=pod, client=self.client, mode=ExecutionMode.SYNC, operator=self, context=context - ) - def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): + def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod, context: Context): # Skip cleaning the pod in the following scenarios. # 1. If a task got marked as failed, "on_kill" method would be called and the pod will be cleaned up # there. Cleaning it up again will raise an exception (which might cause retry). @@ -983,6 +973,15 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): self.process_pod_deletion(remote_pod, reraise=False) + for callback in self.callbacks: + callback.on_pod_cleanup( + pod=remote_pod, + client=self.client, + mode=ExecutionMode.SYNC, + operator=self, + context=context, + ) + if self.skip_on_exit_code: container_statuses = ( remote_pod.status.container_statuses if remote_pod and remote_pod.status else None diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py index 2afb31a2ae17c..b5f84cd362107 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py @@ -1657,7 +1657,7 @@ def test_execute_sync_callbacks(self, find_pod_mock): assert mock_callbacks.on_pod_cleanup.call_args.kwargs == { "client": k.client, "mode": ExecutionMode.SYNC, - "pod": k.pod, + "pod": self.await_pod_mock.return_value, "operator": k, "context": context, } @@ -1744,7 +1744,7 @@ def test_execute_sync_multiple_callbacks(self, find_pod_mock): assert mock_callbacks.on_pod_cleanup.call_args.kwargs == { "client": k.client, "mode": ExecutionMode.SYNC, - "pod": k.pod, + "pod": self.await_pod_mock.return_value, "operator": k, "context": context, } @@ -2396,10 +2396,11 @@ def test_async_write_logs_handler_api_exception( ) def test_cleanup_log_pod_spec_on_failure(self, log_pod_spec_on_failure, expect_match): k = KubernetesPodOperator(task_id="task", log_pod_spec_on_failure=log_pod_spec_on_failure) - pod = k.build_pod_request_obj(create_context(k)) + context = create_context(k) + pod = k.build_pod_request_obj(context) pod.status = V1PodStatus(phase=PodPhase.FAILED) with pytest.raises(AirflowException, match=expect_match): - k.cleanup(pod, pod) + k.cleanup(pod, pod, context) @patch(f"{HOOK_CLASS}.get_pod") @patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion")