Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -688,15 +688,8 @@ def execute_sync(self, context: Context):
self.cleanup(
pod=pod_to_clean,
remote_pod=self.remote_pod,
context=context,
)
for callback in self.callbacks:
callback.on_pod_cleanup(
pod=pod_to_clean,
client=self.client,
mode=ExecutionMode.SYNC,
context=context,
operator=self,
)

if self.do_xcom_push:
return result
Expand Down Expand Up @@ -952,13 +945,10 @@ def post_complete_action(self, *, pod, remote_pod, context: Context, **kwargs) -
self.cleanup(
pod=pod,
remote_pod=remote_pod,
context=context,
)
for callback in self.callbacks:
callback.on_pod_cleanup(
pod=pod, client=self.client, mode=ExecutionMode.SYNC, operator=self, context=context
)

def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod, context: Context):
# Skip cleaning the pod in the following scenarios.
# 1. If a task got marked as failed, "on_kill" method would be called and the pod will be cleaned up
# there. Cleaning it up again will raise an exception (which might cause retry).
Expand All @@ -983,6 +973,15 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):

self.process_pod_deletion(remote_pod, reraise=False)

for callback in self.callbacks:
callback.on_pod_cleanup(
pod=remote_pod,
client=self.client,
mode=ExecutionMode.SYNC,
operator=self,
context=context,
)

if self.skip_on_exit_code:
container_statuses = (
remote_pod.status.container_statuses if remote_pod and remote_pod.status else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1657,7 +1657,7 @@ def test_execute_sync_callbacks(self, find_pod_mock):
assert mock_callbacks.on_pod_cleanup.call_args.kwargs == {
"client": k.client,
"mode": ExecutionMode.SYNC,
"pod": k.pod,
"pod": self.await_pod_mock.return_value,
"operator": k,
"context": context,
}
Expand Down Expand Up @@ -1744,7 +1744,7 @@ def test_execute_sync_multiple_callbacks(self, find_pod_mock):
assert mock_callbacks.on_pod_cleanup.call_args.kwargs == {
"client": k.client,
"mode": ExecutionMode.SYNC,
"pod": k.pod,
"pod": self.await_pod_mock.return_value,
"operator": k,
"context": context,
}
Expand Down Expand Up @@ -2396,10 +2396,11 @@ def test_async_write_logs_handler_api_exception(
)
def test_cleanup_log_pod_spec_on_failure(self, log_pod_spec_on_failure, expect_match):
k = KubernetesPodOperator(task_id="task", log_pod_spec_on_failure=log_pod_spec_on_failure)
pod = k.build_pod_request_obj(create_context(k))
context = create_context(k)
pod = k.build_pod_request_obj(context)
pod.status = V1PodStatus(phase=PodPhase.FAILED)
with pytest.raises(AirflowException, match=expect_match):
k.cleanup(pod, pod)
k.cleanup(pod, pod, context)

@patch(f"{HOOK_CLASS}.get_pod")
@patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion")
Expand Down
Loading