Skip to content
12 changes: 7 additions & 5 deletions airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.kubernetes import pod_generator
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.kubernetes.pod_generator import PodDefaults, PodGenerator
from airflow.kubernetes.secret import Secret
from airflow.models import BaseOperator
from airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters import (
Expand All @@ -48,7 +48,7 @@
from airflow.providers.cncf.kubernetes.utils.pod_manager import (
PodLaunchFailedException,
PodManager,
PodPhase,
container_is_completed,
get_container_termination_message,
)
from airflow.settings import pod_mutation_hook
Expand Down Expand Up @@ -435,7 +435,10 @@ def execute(self, context: 'Context'):
if self.do_xcom_push:
self.pod_manager.await_xcom_sidecar_container_start(pod=self.pod)
result = self.extract_xcom(pod=self.pod)
remote_pod = self.pod_manager.await_pod_completion(self.pod)
self.pod_manager.await_container_completion(
pod=self.pod, container_name=PodDefaults.SIDECAR_CONTAINER_NAME
)
remote_pod = self.pod_manager.read_pod(pod=self.pod)
finally:
self.cleanup(
pod=self.pod or self.pod_request_obj,
Expand All @@ -448,11 +451,10 @@ def execute(self, context: 'Context'):
return result

def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
pod_phase = remote_pod.status.phase if hasattr(remote_pod, 'status') else None
if not self.is_delete_operator_pod:
with _suppress(Exception):
self.patch_already_checked(remote_pod)
if pod_phase != PodPhase.SUCCEEDED:
if not container_is_completed(remote_pod, self.BASE_CONTAINER_NAME):
if self.log_events_on_failure:
with _suppress(Exception):
for event in self.pod_manager.read_pod_events(pod).items:
Expand Down
34 changes: 29 additions & 5 deletions airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import pendulum
import tenacity
from kubernetes import client, watch
from kubernetes.client.models import V1ContainerState
from kubernetes.client.models.v1_pod import V1Pod
from kubernetes.client.rest import ApiException
from kubernetes.stream import stream as kubernetes_stream
Expand Down Expand Up @@ -68,18 +69,41 @@ class PodPhase:
terminal_states = {FAILED, SUCCEEDED}


def container_is_running(pod: V1Pod, container_name: str) -> bool:
def get_container_state(pod: V1Pod, container_name: str) -> V1ContainerState:
"""
Examines V1Pod ``pod`` to determine whether ``container_name`` is running.
If that container is present and running, returns True. Returns False otherwise.
Examines V1Pod ``pod`` to determine the ``container_name`` state.
If that container is present returns the state. Returns None otherwise.
"""
container_statuses = pod.status.container_statuses if pod and pod.status else None
if not container_statuses:
return False
return None
container_status = next(iter([x for x in container_statuses if x.name == container_name]), None)
if not container_status:
return None
return container_status.state


def container_is_running(pod: V1Pod, container_name: str) -> bool:
"""
Examines V1Pod ``pod`` to determine whether ``container_name`` is running.
If that container is present and running, returns True. Returns False otherwise.
"""
container_state = get_container_state(pod=pod, container_name=container_name)
return container_state.running is not None if container_state else False


def container_is_completed(pod: V1Pod, container_name: str) -> bool:
"""
Examines V1Pod ``pod`` to determine the ``container_name`` state.
If that container is terminated and the reason is Completed, returns True. Returns
False otherwise.
"""
if pod is None:
return False
container_state = get_container_state(pod=pod, container_name=container_name)
if container_state.terminated is None:
return False
return container_status.state.running is not None
return True if container_state.terminated.reason == 'Completed' else False


def get_container_termination_message(pod: V1Pod, container_name: str):
Expand Down