diff --git a/operators/gcp_container_operator.py b/operators/gcp_container_operator.py index ff2137dd4..42e5a03e4 100644 --- a/operators/gcp_container_operator.py +++ b/operators/gcp_container_operator.py @@ -1,5 +1,8 @@ +import logging + import kubernetes.client as k8s from airflow.providers.cncf.kubernetes.callbacks import KubernetesPodOperatorCallback +from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction, PodPhase from airflow.providers.google.cloud.links.kubernetes_engine import KubernetesEnginePodLink from airflow.providers.google.cloud.operators.kubernetes_engine import ( GKEStartPodOperator as UpstreamGKEPodOperator, @@ -7,6 +10,9 @@ from airflow.utils.context import Context +logger = logging.getLogger(__name__) + + class GKEPodOperatorCallbacks(KubernetesPodOperatorCallback): @staticmethod def on_pod_completion( @@ -92,3 +98,34 @@ def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context: Context) -> k8s self.pod = pod KubernetesEnginePodLink.persist(context=context, task_instance=self) return pod + + def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True): + if pod is None: + return + + # Since we default to on_finish_action="keep_pod" the pod may be left running if the task + # was stopped for a reason other than the pod succeeding/failing, like a pod startup timeout + # or a task execution timeout (this could be considered a bug in the Kubernetes provider). + # As a workaround we delete the pod during cleanup if it's still running. + try: + remote_pod: k8s.V1Pod = self.client.read_namespaced_pod( + pod.metadata.name, pod.metadata.namespace + ) + if ( + remote_pod.status.phase not in PodPhase.terminal_states + and self.on_finish_action != OnFinishAction.DELETE_POD + ): + logger.info( + f"Deleting {remote_pod.status.phase.lower()} pod: {pod.metadata.name}" + ) + self.pod_manager.delete_pod(remote_pod) + else: + super().process_pod_deletion(pod, reraise=reraise) + except Exception as e: + if isinstance(e, k8s.ApiException) and e.status == 404: + # Ignore "404 Not Found" errors. + logger.warning(f'Pod "{pod.metadata.name}" not found.') + elif reraise: + raise + else: + logger.exception(e)