Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions operators/gcp_container_operator.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import logging

import kubernetes.client as k8s
from airflow.providers.cncf.kubernetes.callbacks import KubernetesPodOperatorCallback
from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction, PodPhase
from airflow.providers.google.cloud.links.kubernetes_engine import KubernetesEnginePodLink
from airflow.providers.google.cloud.operators.kubernetes_engine import (
GKEStartPodOperator as UpstreamGKEPodOperator,
)
from airflow.utils.context import Context


logger = logging.getLogger(__name__)


class GKEPodOperatorCallbacks(KubernetesPodOperatorCallback):
@staticmethod
def on_pod_completion(
Expand Down Expand Up @@ -92,3 +98,34 @@ def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context: Context) -> k8s
self.pod = pod
KubernetesEnginePodLink.persist(context=context, task_instance=self)
return pod

def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True):
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have preferred to use an on_pod_cleanup callback rather than overriding this internal-ish method, but unfortunately on_pod_cleanup callbacks aren't called when Airflow tasks fail (maybe a bug?).

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any idea why on_pod_cleanup callbacks aren't called in the event of a task failure? Did you look in the Airflow project to see if this is a known issue/bug?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need the * boundary?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any idea why on_pod_cleanup callbacks aren't called in the event of a task failure?

It's because the on_pod_cleanup callback is only called after the cleanup method completes, but the cleanup method raises an exception if the task failed.

Did you look in the Airflow project to see if this is a known issue/bug?

Yes. I didn't see an existing issue for this, and the problem still exists on the main branch so it hasn't been fixed yet (though it's possible this is an intentional design decision).

I did just submit this Airflow PR with a possible fix, so we'll so how that goes (though even if that's accepted it'd likely be a while before it's included in an apache-airflow-providers-cncf-kubernetes package release and we can upgrade to that).

Why do you need the * boundary?

This matches the argument signature of the original process_pod_deletion method that this is overriding, and the * makes it so that subsequent arguments can only be specified as keyword arguments.

if pod is None:
return

# Since we default to on_finish_action="keep_pod" the pod may be left running if the task
# was stopped for a reason other than the pod succeeding/failing, like a pod startup timeout
# or a task execution timeout (this could be considered a bug in the Kubernetes provider).
# As a workaround we delete the pod during cleanup if it's still running.
try:
remote_pod: k8s.V1Pod = self.client.read_namespaced_pod(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.remote_pod is indeed passed in as pod to this method, but in cases when the task execution stopped due to a pod startup timeout or task execution timeout that will only reflect the state of the pod immediately after starting, because that was the last time self.remote_pod would have been updated (self.remote_pod isn't updated again until after waiting for the pod to terminate).

So here I'm setting remote_pod to a more up-to-date copy of the pod's state to have more accurate information, so that this will log "Deleting running pod: ..." if the pod was in fact running at the time (without re-fetching the pod state it would always log "Deleting pending pod: ..." even for running pods).

While this isn't technically required for the core functionality of cleaning up unterminated pods, I figured it was worth doing for recording what actually happened. However, if you think it's overkill I'm not opposed to simplifying this and using the out-of-date self.remote_pod/pod value (in which case I'd change the logging to the more generic "Deleting pod: ..." since we don't really know its state).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok that makes sense. I guess you would need the updated state for the PodPhase.terminal_states check below anyway

pod.metadata.name, pod.metadata.namespace
)
if (
remote_pod.status.phase not in PodPhase.terminal_states
and self.on_finish_action != OnFinishAction.DELETE_POD
):
logger.info(
f"Deleting {remote_pod.status.phase.lower()} pod: {pod.metadata.name}"
)
self.pod_manager.delete_pod(remote_pod)
else:
super().process_pod_deletion(pod, reraise=reraise)
except Exception as e:
if isinstance(e, k8s.ApiException) and e.status == 404:
# Ignore "404 Not Found" errors.
Comment thread
sean-rose marked this conversation as resolved.
logger.warning(f'Pod "{pod.metadata.name}" not found.')
elif reraise:
raise
else:
logger.exception(e)