From 19380b33769fe0f744cd1308a90c39885e7ccdfa Mon Sep 17 00:00:00 2001 From: Daniel Date: Tue, 3 May 2022 16:46:19 +0200 Subject: [PATCH 1/7] Add await all containers option to kubernetes pod operator --- .../kubernetes/operators/kubernetes_pod.py | 9 ++- .../cncf/kubernetes/utils/pod_manager.py | 38 +++++++-- .../cncf/kubernetes/utils/test_pod_manager.py | 79 ++++++++++++++----- 3 files changed, 100 insertions(+), 26 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index 4eb6282162cc7..cf967ecc0b59c 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -139,6 +139,9 @@ class KubernetesPodOperator(BaseOperator): :param priority_class_name: priority class name for the launched Pod :param termination_grace_period: Termination grace period if task killed in UI, defaults to kubernetes default + :param await_all_containers: if there are multiple containers, wait until all the + containers are in a terminated state. If False, the task will ends once a container + is in a terminated state. """ BASE_CONTAINER_NAME = 'base' @@ -200,6 +203,7 @@ def __init__( pod_runtime_info_envs: Optional[List[k8s.V1EnvVar]] = None, termination_grace_period: Optional[int] = None, configmaps: Optional[List[str]] = None, + await_all_containers: bool = True, **kwargs, ) -> None: if kwargs.get('xcom_push') is not None: @@ -262,6 +266,7 @@ def __init__( self.termination_grace_period = termination_grace_period self.pod_request_obj: Optional[k8s.V1Pod] = None self.pod: Optional[k8s.V1Pod] = None + self.await_all_containers = await_all_containers def _render_nested_template_fields( self, @@ -396,7 +401,9 @@ def execute(self, context: 'Context'): if self.do_xcom_push: result = self.extract_xcom(pod=self.pod) - remote_pod = self.pod_manager.await_pod_completion(self.pod) + remote_pod = self.pod_manager.await_pod_completion( + pod=self.pod, await_all_containers=self.await_all_containers + ) finally: self.cleanup( pod=self.pod or self.pod_request_obj, diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 993ba12e313fe..856f4cc798d71 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -27,6 +27,7 @@ import pendulum import tenacity from kubernetes import client, watch +from kubernetes.client.models import V1ContainerState from kubernetes.client.models.v1_pod import V1Pod from kubernetes.client.rest import ApiException from kubernetes.stream import stream as kubernetes_stream @@ -68,18 +69,36 @@ class PodPhase: terminal_states = {FAILED, SUCCEEDED} -def container_is_running(pod: V1Pod, container_name: str) -> bool: +def get_container_state(pod: V1Pod, container_name: str) -> V1ContainerState: """ - Examines V1Pod ``pod`` to determine whether ``container_name`` is running. - If that container is present and running, returns True. Returns False otherwise. + Examines V1Pod ``pod`` to determine the ``container_name`` state. + If that container is present returns the state. Returns None otherwise. """ container_statuses = pod.status.container_statuses if pod and pod.status else None if not container_statuses: - return False + return None container_status = next(iter([x for x in container_statuses if x.name == container_name]), None) if not container_status: - return False - return container_status.state.running is not None + return None + return container_status.state + + +def container_is_running(pod: V1Pod, container_name: str) -> bool: + """ + Examines V1Pod ``pod`` to determine whether ``container_name`` is running. + If that container is present and running, returns True. Returns False otherwise. + """ + container_state = get_container_state(pod=pod, container_name=container_name) + return container_state.running is not None if container_state else False + + +def container_is_terminated(pod: V1Pod, container_name: str) -> bool: + """ + Examines V1Pod ``pod`` to determine whether ``container_name`` is terminated. + If that container is present and terminated, returns True. Returns False otherwise. + """ + container_state = get_container_state(pod=pod, container_name=container_name) + return container_state.terminated is not None if container_state else False def get_container_termination_message(pod: V1Pod, container_name: str): @@ -258,7 +277,7 @@ def await_container_completion(self, pod: V1Pod, container_name: str) -> None: while not self.container_is_running(pod=pod, container_name=container_name): time.sleep(1) - def await_pod_completion(self, pod: V1Pod) -> V1Pod: + def await_pod_completion(self, pod: V1Pod, await_all_containers: bool) -> V1Pod: """ Monitors a pod and returns the final state @@ -269,6 +288,11 @@ def await_pod_completion(self, pod: V1Pod) -> V1Pod: remote_pod = self.read_pod(pod) if remote_pod.status.phase in PodPhase.terminal_states: break + if not await_all_containers: + for container in remote_pod.spec.containers: + if container_is_terminated(remote_pod, container.name): + self.log.info('Container %s is in a terminated state', container.name) + break self.log.info('Pod %s has phase %s', pod.metadata.name, remote_pod.status.phase) time.sleep(2) return remote_pod diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index 8070c3c3532b5..ece34ac91c243 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -26,7 +26,13 @@ from urllib3.exceptions import HTTPError as BaseHTTPError from airflow.exceptions import AirflowException -from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager, PodPhase, container_is_running +from airflow.providers.cncf.kubernetes.utils.pod_manager import ( + PodManager, + PodPhase, + container_is_running, + container_is_terminated, + get_container_state, +) class TestPodManager: @@ -330,11 +336,49 @@ def test_fetch_container_running_follow( assert ret.running is exp_running -def params_for_test_container_is_running(): - """The `container_is_running` method is designed to handle an assortment of bad objects +@pytest.mark.parametrize('is_running', [True, False]) +@mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.get_container_state') +def test_container_is_running(get_container_state_mock, is_running): + mock_pod = MagicMock() + c = MagicMock() + c.running = {'a': 'b'} if is_running else None + get_container_state_mock.return_value = c + assert container_is_running(mock_pod, 'base') is is_running + get_container_state_mock.assert_called_with(pod=mock_pod, container_name='base') + + +@mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.get_container_state') +def test_container_is_running_returned_none(get_container_state_mock): + mock_pod = MagicMock() + get_container_state_mock.return_value = None + assert container_is_running(mock_pod, 'base') is False + get_container_state_mock.assert_called_with(pod=mock_pod, container_name='base') + + +@pytest.mark.parametrize('is_terminated', [True, False]) +@mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.get_container_state') +def test_container_is_terminated(get_container_state_mock, is_terminated): + mock_pod = MagicMock() + c = MagicMock() + c.terminated = {'a': 'b'} if is_terminated else None + get_container_state_mock.return_value = c + assert container_is_terminated(mock_pod, 'base') is is_terminated + get_container_state_mock.assert_called_with(pod=mock_pod, container_name='base') + + +@mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.get_container_state') +def test_container_is_terminated_returned_none(get_container_state_mock): + mock_pod = MagicMock() + get_container_state_mock.return_value = None + assert container_is_terminated(mock_pod, 'base') is False + get_container_state_mock.assert_called_with(pod=mock_pod, container_name='base') + + +def params_for_get_container_state(): + """The `get_container_state` method is designed to handle an assortment of bad objects returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, an object `e` such that `e.status.container_statuses` is None, and so on. This function - emits params used in `test_container_is_running` to verify this behavior. + emits params used in `test_get_container_state` to verify this behavior. We create mock classes not derived from MagicMock because with an instance `e` of MagicMock, tests like `e.hello is not None` are always True. @@ -347,20 +391,17 @@ class ContainerStatusMock: def __init__(self, name): self.name = name - def remote_pod(running=None, not_running=None): + def remote_pod(container_list=None): e = RemotePodMock() e.status = RemotePodMock() e.status.container_statuses = [] - for r in not_running or []: - e.status.container_statuses.append(container(r, False)) - for r in running or []: - e.status.container_statuses.append(container(r, True)) + for r in container_list or []: + e.status.container_statuses.append(container(r)) return e - def container(name, running): + def container(name): c = ContainerStatusMock(name) c.state = RemotePodMock() - c.state.running = {'a': 'b'} if running else None return c pod_mock_list = [] @@ -373,16 +414,18 @@ def container(name, running): p.status.container_statuses = [] pod_mock_list.append(pytest.param(p, False, id='empty remote_pod.status.container_statuses')) pod_mock_list.append(pytest.param(remote_pod(), False, id='filter empty')) - pod_mock_list.append(pytest.param(remote_pod(None, ['base']), False, id='filter 0 running')) - pod_mock_list.append(pytest.param(remote_pod(['hello'], ['base']), False, id='filter 1 not running')) - pod_mock_list.append(pytest.param(remote_pod(['base'], ['hello']), True, id='filter 1 running')) + pod_mock_list.append(pytest.param(remote_pod(None), False, id='filter 0 running')) + pod_mock_list.append(pytest.param(remote_pod(['base', 'hello']), True, id='filter 1 running')) return pod_mock_list -@pytest.mark.parametrize('remote_pod, result', params_for_test_container_is_running()) -def test_container_is_running(remote_pod, result): - """The `container_is_running` function is designed to handle an assortment of bad objects +@pytest.mark.parametrize('remote_pod, state', params_for_get_container_state()) +def test_get_container_state(remote_pod, state): + """The `container_is_terminated` function is designed to handle an assortment of bad objects returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, an object `e` such that `e.status.container_statuses` is None, and so on. This test verifies the expected behavior.""" - assert container_is_running(remote_pod, 'base') is result + if state: + assert get_container_state(remote_pod, 'base') is not None + else: + assert get_container_state(remote_pod, 'base') is None From 1693940d326cc91563e359ee6814877f0961646a Mon Sep 17 00:00:00 2001 From: Daniel Date: Tue, 10 May 2022 12:11:30 +0200 Subject: [PATCH 2/7] Await pod completion based on base container state --- .../kubernetes/operators/kubernetes_pod.py | 7 +- .../cncf/kubernetes/utils/pod_manager.py | 42 +++------- .../cncf/kubernetes/utils/test_pod_manager.py | 79 +++++-------------- 3 files changed, 28 insertions(+), 100 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index cf967ecc0b59c..10ee09ff946f8 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -139,9 +139,6 @@ class KubernetesPodOperator(BaseOperator): :param priority_class_name: priority class name for the launched Pod :param termination_grace_period: Termination grace period if task killed in UI, defaults to kubernetes default - :param await_all_containers: if there are multiple containers, wait until all the - containers are in a terminated state. If False, the task will ends once a container - is in a terminated state. """ BASE_CONTAINER_NAME = 'base' @@ -203,7 +200,6 @@ def __init__( pod_runtime_info_envs: Optional[List[k8s.V1EnvVar]] = None, termination_grace_period: Optional[int] = None, configmaps: Optional[List[str]] = None, - await_all_containers: bool = True, **kwargs, ) -> None: if kwargs.get('xcom_push') is not None: @@ -266,7 +262,6 @@ def __init__( self.termination_grace_period = termination_grace_period self.pod_request_obj: Optional[k8s.V1Pod] = None self.pod: Optional[k8s.V1Pod] = None - self.await_all_containers = await_all_containers def _render_nested_template_fields( self, @@ -402,7 +397,7 @@ def execute(self, context: 'Context'): if self.do_xcom_push: result = self.extract_xcom(pod=self.pod) remote_pod = self.pod_manager.await_pod_completion( - pod=self.pod, await_all_containers=self.await_all_containers + pod=self.pod, base_container=self.BASE_CONTAINER_NAME ) finally: self.cleanup( diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 856f4cc798d71..2443f5a411d98 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -27,7 +27,6 @@ import pendulum import tenacity from kubernetes import client, watch -from kubernetes.client.models import V1ContainerState from kubernetes.client.models.v1_pod import V1Pod from kubernetes.client.rest import ApiException from kubernetes.stream import stream as kubernetes_stream @@ -69,36 +68,18 @@ class PodPhase: terminal_states = {FAILED, SUCCEEDED} -def get_container_state(pod: V1Pod, container_name: str) -> V1ContainerState: +def container_is_running(pod: V1Pod, container_name: str) -> bool: """ - Examines V1Pod ``pod`` to determine the ``container_name`` state. - If that container is present returns the state. Returns None otherwise. + Examines V1Pod ``pod`` to determine whether ``container_name`` is running. + If that container is present and running, returns True. Returns False otherwise. """ container_statuses = pod.status.container_statuses if pod and pod.status else None if not container_statuses: - return None + return False container_status = next(iter([x for x in container_statuses if x.name == container_name]), None) if not container_status: - return None - return container_status.state - - -def container_is_running(pod: V1Pod, container_name: str) -> bool: - """ - Examines V1Pod ``pod`` to determine whether ``container_name`` is running. - If that container is present and running, returns True. Returns False otherwise. - """ - container_state = get_container_state(pod=pod, container_name=container_name) - return container_state.running is not None if container_state else False - - -def container_is_terminated(pod: V1Pod, container_name: str) -> bool: - """ - Examines V1Pod ``pod`` to determine whether ``container_name`` is terminated. - If that container is present and terminated, returns True. Returns False otherwise. - """ - container_state = get_container_state(pod=pod, container_name=container_name) - return container_state.terminated is not None if container_state else False + return False + return container_status.state.running is not None def get_container_termination_message(pod: V1Pod, container_name: str): @@ -277,22 +258,17 @@ def await_container_completion(self, pod: V1Pod, container_name: str) -> None: while not self.container_is_running(pod=pod, container_name=container_name): time.sleep(1) - def await_pod_completion(self, pod: V1Pod, await_all_containers: bool) -> V1Pod: + def await_pod_completion(self, pod: V1Pod, base_container: str) -> V1Pod: """ - Monitors a pod and returns the final state + Monitors the base container in a pod and returns the final state :param pod: pod spec that will be monitored :return: Tuple[State, Optional[str]] """ while True: remote_pod = self.read_pod(pod) - if remote_pod.status.phase in PodPhase.terminal_states: + if not self.container_is_running(pod=pod, container_name=base_container): break - if not await_all_containers: - for container in remote_pod.spec.containers: - if container_is_terminated(remote_pod, container.name): - self.log.info('Container %s is in a terminated state', container.name) - break self.log.info('Pod %s has phase %s', pod.metadata.name, remote_pod.status.phase) time.sleep(2) return remote_pod diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index ece34ac91c243..8070c3c3532b5 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -26,13 +26,7 @@ from urllib3.exceptions import HTTPError as BaseHTTPError from airflow.exceptions import AirflowException -from airflow.providers.cncf.kubernetes.utils.pod_manager import ( - PodManager, - PodPhase, - container_is_running, - container_is_terminated, - get_container_state, -) +from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager, PodPhase, container_is_running class TestPodManager: @@ -336,49 +330,11 @@ def test_fetch_container_running_follow( assert ret.running is exp_running -@pytest.mark.parametrize('is_running', [True, False]) -@mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.get_container_state') -def test_container_is_running(get_container_state_mock, is_running): - mock_pod = MagicMock() - c = MagicMock() - c.running = {'a': 'b'} if is_running else None - get_container_state_mock.return_value = c - assert container_is_running(mock_pod, 'base') is is_running - get_container_state_mock.assert_called_with(pod=mock_pod, container_name='base') - - -@mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.get_container_state') -def test_container_is_running_returned_none(get_container_state_mock): - mock_pod = MagicMock() - get_container_state_mock.return_value = None - assert container_is_running(mock_pod, 'base') is False - get_container_state_mock.assert_called_with(pod=mock_pod, container_name='base') - - -@pytest.mark.parametrize('is_terminated', [True, False]) -@mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.get_container_state') -def test_container_is_terminated(get_container_state_mock, is_terminated): - mock_pod = MagicMock() - c = MagicMock() - c.terminated = {'a': 'b'} if is_terminated else None - get_container_state_mock.return_value = c - assert container_is_terminated(mock_pod, 'base') is is_terminated - get_container_state_mock.assert_called_with(pod=mock_pod, container_name='base') - - -@mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.get_container_state') -def test_container_is_terminated_returned_none(get_container_state_mock): - mock_pod = MagicMock() - get_container_state_mock.return_value = None - assert container_is_terminated(mock_pod, 'base') is False - get_container_state_mock.assert_called_with(pod=mock_pod, container_name='base') - - -def params_for_get_container_state(): - """The `get_container_state` method is designed to handle an assortment of bad objects +def params_for_test_container_is_running(): + """The `container_is_running` method is designed to handle an assortment of bad objects returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, an object `e` such that `e.status.container_statuses` is None, and so on. This function - emits params used in `test_get_container_state` to verify this behavior. + emits params used in `test_container_is_running` to verify this behavior. We create mock classes not derived from MagicMock because with an instance `e` of MagicMock, tests like `e.hello is not None` are always True. @@ -391,17 +347,20 @@ class ContainerStatusMock: def __init__(self, name): self.name = name - def remote_pod(container_list=None): + def remote_pod(running=None, not_running=None): e = RemotePodMock() e.status = RemotePodMock() e.status.container_statuses = [] - for r in container_list or []: - e.status.container_statuses.append(container(r)) + for r in not_running or []: + e.status.container_statuses.append(container(r, False)) + for r in running or []: + e.status.container_statuses.append(container(r, True)) return e - def container(name): + def container(name, running): c = ContainerStatusMock(name) c.state = RemotePodMock() + c.state.running = {'a': 'b'} if running else None return c pod_mock_list = [] @@ -414,18 +373,16 @@ def container(name): p.status.container_statuses = [] pod_mock_list.append(pytest.param(p, False, id='empty remote_pod.status.container_statuses')) pod_mock_list.append(pytest.param(remote_pod(), False, id='filter empty')) - pod_mock_list.append(pytest.param(remote_pod(None), False, id='filter 0 running')) - pod_mock_list.append(pytest.param(remote_pod(['base', 'hello']), True, id='filter 1 running')) + pod_mock_list.append(pytest.param(remote_pod(None, ['base']), False, id='filter 0 running')) + pod_mock_list.append(pytest.param(remote_pod(['hello'], ['base']), False, id='filter 1 not running')) + pod_mock_list.append(pytest.param(remote_pod(['base'], ['hello']), True, id='filter 1 running')) return pod_mock_list -@pytest.mark.parametrize('remote_pod, state', params_for_get_container_state()) -def test_get_container_state(remote_pod, state): - """The `container_is_terminated` function is designed to handle an assortment of bad objects +@pytest.mark.parametrize('remote_pod, result', params_for_test_container_is_running()) +def test_container_is_running(remote_pod, result): + """The `container_is_running` function is designed to handle an assortment of bad objects returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, an object `e` such that `e.status.container_statuses` is None, and so on. This test verifies the expected behavior.""" - if state: - assert get_container_state(remote_pod, 'base') is not None - else: - assert get_container_state(remote_pod, 'base') is None + assert container_is_running(remote_pod, 'base') is result From c3c5cad99ebd658efcd2b113661803e2f20c432e Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 25 May 2022 17:02:38 +0200 Subject: [PATCH 3/7] Check the remote pod fetched in the step before --- airflow/providers/cncf/kubernetes/utils/pod_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 4cea38b15f9d7..9a1c2b022dcd2 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -268,7 +268,7 @@ def await_pod_completion(self, pod: V1Pod, base_container: str) -> V1Pod: """ while True: remote_pod = self.read_pod(pod) - if not self.container_is_running(pod=pod, container_name=base_container): + if not self.container_is_running(pod=remote_pod, container_name=base_container): break self.log.info('Pod %s has phase %s', pod.metadata.name, remote_pod.status.phase) time.sleep(2) From c5f48f8e3bdc30a2eee59a00fce542481028cfa6 Mon Sep 17 00:00:00 2001 From: Daniel Date: Tue, 14 Jun 2022 16:36:52 +0200 Subject: [PATCH 4/7] Change cleanup and exec k8s operator --- .../kubernetes/operators/kubernetes_pod.py | 94 ++++++++++++++----- .../cncf/kubernetes/utils/pod_manager.py | 39 ++++++-- 2 files changed, 103 insertions(+), 30 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index becd27a630e13..b39215bebf2e9 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -25,8 +25,9 @@ from kubernetes.client import CoreV1Api, models as k8s +from airflow.configuration import conf from airflow.exceptions import AirflowException -from airflow.kubernetes import kube_client, pod_generator +from airflow.kubernetes import pod_generator from airflow.kubernetes.pod_generator import PodGenerator from airflow.kubernetes.secret import Secret from airflow.models import BaseOperator @@ -42,11 +43,12 @@ convert_volume, convert_volume_mount, ) +from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook from airflow.providers.cncf.kubernetes.utils import xcom_sidecar # type: ignore[attr-defined] from airflow.providers.cncf.kubernetes.utils.pod_manager import ( PodLaunchFailedException, PodManager, - PodPhase, + container_is_terminated, get_container_termination_message, ) from airflow.settings import pod_mutation_hook @@ -83,6 +85,8 @@ class KubernetesPodOperator(BaseOperator): :class:`~airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator`, which simplifies the authorization process. + :param kubernetes_conn_id: The :ref:`kubernetes connection id ` + for the Kubernetes cluster. :param namespace: the namespace to run within kubernetes. :param image: Docker image you wish to launch. Defaults to hub.docker.com, but fully qualified URLS will point to custom repositories. (templated) @@ -98,6 +102,7 @@ class KubernetesPodOperator(BaseOperator): :param volume_mounts: volumeMounts for the launched pod. :param volumes: volumes for the launched pod. Includes ConfigMaps and PersistentVolumes. :param env_vars: Environment variables initialized in the container. (templated) + :param env_from: (Optional) List of sources to populate environment variables in the container. :param secrets: Kubernetes secrets to inject in the container. They can be exposed as environment vars or files in a volume. :param in_cluster: run kubernetes client with in_cluster configuration. @@ -116,6 +121,8 @@ class KubernetesPodOperator(BaseOperator): :param affinity: affinity scheduling rules for the launched pod. :param config_file: The path to the Kubernetes config file. (templated) If not specified, default value is ``~/.kube/config`` + :param node_selectors: (Deprecated) A dict containing a group of scheduling rules. + Please use node_selector instead. :param node_selector: A dict containing a group of scheduling rules. :param image_pull_secrets: Any image pull secrets to be given to the pod. If more than one secret is required, provide a @@ -137,8 +144,15 @@ class KubernetesPodOperator(BaseOperator): XCom when the container completes. :param pod_template_file: path to pod template file (templated) :param priority_class_name: priority class name for the launched Pod + :param pod_runtime_info_envs: (Optional) A list of environment variables, + to be set in the container. :param termination_grace_period: Termination grace period if task killed in UI, defaults to kubernetes default + :param configmaps: (Optional) A list of names of config maps from which it collects ConfigMaps + to populate the environment variables with. The contents of the target + ConfigMap's Data field will represent the key-value pairs as environment variables. + Extends env_from. + :param: kubernetes_conn_id: To retrieve credentials for your k8s cluster from an Airflow connection """ BASE_CONTAINER_NAME = 'base' @@ -158,6 +172,7 @@ class KubernetesPodOperator(BaseOperator): def __init__( self, *, + kubernetes_conn_id: Optional[str] = None, # 'kubernetes_default', namespace: Optional[str] = None, image: Optional[str] = None, name: Optional[str] = None, @@ -205,7 +220,7 @@ def __init__( if kwargs.get('xcom_push') is not None: raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead") super().__init__(resources=None, **kwargs) - + self.kubernetes_conn_id = kubernetes_conn_id self.do_xcom_push = do_xcom_push self.image = image self.namespace = namespace @@ -319,20 +334,24 @@ def _get_ti_pod_labels(context: Optional[dict] = None, include_try_number: bool def pod_manager(self) -> PodManager: return PodManager(kube_client=self.client) - @cached_property - def client(self) -> CoreV1Api: - # todo: use airflow Connection / hook to authenticate to the cluster - kwargs: Dict[str, Any] = dict( - cluster_context=self.cluster_context, + def get_hook(self): + hook = KubernetesHook( + conn_id=self.kubernetes_conn_id, + in_cluster=self.in_cluster, config_file=self.config_file, + cluster_context=self.cluster_context, ) - if self.in_cluster is not None: - kwargs.update(in_cluster=self.in_cluster) - return kube_client.get_kube_client(**kwargs) + self._patch_deprecated_k8s_settings(hook) + return hook - def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]: + @cached_property + def client(self) -> CoreV1Api: + hook = self.get_hook() + return hook.core_v1_client + + def find_pod(self, namespace, context, *, exclude_checked=True) -> Optional[k8s.V1Pod]: """Returns an already-running pod for this task instance if one exists.""" - label_selector = self._build_find_pod_label_selector(context) + label_selector = self._build_find_pod_label_selector(context, exclude_checked=exclude_checked) pod_list = self.client.list_namespaced_pod( namespace=namespace, label_selector=label_selector, @@ -396,9 +415,7 @@ def execute(self, context: 'Context'): if self.do_xcom_push: result = self.extract_xcom(pod=self.pod) - remote_pod = self.pod_manager.await_pod_completion( - pod=self.pod, base_container=self.BASE_CONTAINER_NAME - ) + remote_pod = self.pod_manager.read_pod(pod=self.pod) finally: self.cleanup( pod=self.pod or self.pod_request_obj, @@ -411,11 +428,10 @@ def execute(self, context: 'Context'): return result def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): - pod_phase = remote_pod.status.phase if hasattr(remote_pod, 'status') else None if not self.is_delete_operator_pod: with _suppress(Exception): - self.patch_already_checked(pod) - if pod_phase != PodPhase.SUCCEEDED: + self.patch_already_checked(remote_pod) + if not container_is_terminated(remote_pod, self.BASE_CONTAINER_NAME): if self.log_events_on_failure: with _suppress(Exception): for event in self.pod_manager.read_pod_events(pod).items: @@ -438,10 +454,14 @@ def process_pod_deletion(self, pod): else: self.log.info("skipping deleting pod: %s", pod.metadata.name) - def _build_find_pod_label_selector(self, context: Optional[dict] = None) -> str: + def _build_find_pod_label_selector(self, context: Optional[dict] = None, *, exclude_checked=True) -> str: labels = self._get_ti_pod_labels(context, include_try_number=False) label_strings = [f'{label_id}={label}' for label_id, label in sorted(labels.items())] - return ','.join(label_strings) + f',{self.POD_CHECKED_KEY}!=True,!airflow-worker' + labels_value = ','.join(label_strings) + if exclude_checked: + labels_value += f',{self.POD_CHECKED_KEY}!=True' + labels_value += ',!airflow-worker' + return labels_value def _set_name(self, name): if name is None: @@ -563,6 +583,38 @@ def dry_run(self) -> None: pod = self.build_pod_request_obj() print(yaml.dump(prune_dict(pod.to_dict(), mode='strict'))) + def _patch_deprecated_k8s_settings(self, hook: KubernetesHook): + """ + Here we read config from core Airflow config [kubernetes] section. + In a future release we will stop looking at this section and require users + to use Airflow connections to configure KPO. + + When we find values there that we need to apply on the hook, we patch special + hook attributes here. + """ + # default for enable_tcp_keepalive is True; patch if False + if conf.getboolean('kubernetes', 'enable_tcp_keepalive') is False: + hook._deprecated_core_disable_tcp_keepalive = True + + # default verify_ssl is True; patch if False. + if conf.getboolean('kubernetes', 'verify_ssl') is False: + hook._deprecated_core_disable_verify_ssl = True + + # default for in_cluster is True; patch if False and no KPO param. + conf_in_cluster = conf.getboolean('kubernetes', 'in_cluster') + if self.in_cluster is None and conf_in_cluster is False: + hook._deprecated_core_in_cluster = conf_in_cluster + + # there's no default for cluster context; if we get something (and no KPO param) patch it. + conf_cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None) + if not self.cluster_context and conf_cluster_context: + hook._deprecated_core_cluster_context = conf_cluster_context + + # there's no default for config_file; if we get something (and no KPO param) patch it. + conf_config_file = conf.get('kubernetes', 'config_file', fallback=None) + if not self.config_file and conf_config_file: + hook._deprecated_core_config_file = conf_config_file + class _suppress(AbstractContextManager): """ diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 9a1c2b022dcd2..859eb9c786614 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -68,18 +68,39 @@ class PodPhase: terminal_states = {FAILED, SUCCEEDED} -def container_is_running(pod: V1Pod, container_name: str) -> bool: +def get_container_state(pod: V1Pod, container_name: str) -> V1ContainerState: """ - Examines V1Pod ``pod`` to determine whether ``container_name`` is running. - If that container is present and running, returns True. Returns False otherwise. + Examines V1Pod ``pod`` to determine the ``container_name`` state. + If that container is present returns the state. Returns None otherwise. """ container_statuses = pod.status.container_statuses if pod and pod.status else None if not container_statuses: - return False + return None container_status = next(iter([x for x in container_statuses if x.name == container_name]), None) if not container_status: + return None + return container_status.state + + +def container_is_running(pod: V1Pod, container_name: str) -> bool: + """ + Examines V1Pod ``pod`` to determine whether ``container_name`` is running. + If that container is present and running, returns True. Returns False otherwise. + """ + container_state = get_container_state(pod=pod, container_name=container_name) + return container_state.running is not None if container_state else False + + +def container_is_completed(pod: V1Pod, container_name: str) -> bool: + """ + Examines V1Pod ``pod`` to determine the ``container_name`` state. + If that container is terminated and the reason is Completed, returns True. Returns + False otherwise. + """ + container_state = get_container_state(pod=pod, container_name=container_name) + if container_state.terminated is None: return False - return container_status.state.running is not None + return True if container_state.terminated.reason == 'Completed' else False def get_container_termination_message(pod: V1Pod, container_name: str): @@ -256,19 +277,19 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True) time.sleep(1) def await_container_completion(self, pod: V1Pod, container_name: str) -> None: - while not self.container_is_running(pod=pod, container_name=container_name): + while self.container_is_running(pod=pod, container_name=container_name): time.sleep(1) - def await_pod_completion(self, pod: V1Pod, base_container: str) -> V1Pod: + def await_pod_completion(self, pod: V1Pod) -> V1Pod: """ - Monitors the base container in a pod and returns the final state + Monitors a pod and returns the final state :param pod: pod spec that will be monitored :return: Tuple[State, Optional[str]] """ while True: remote_pod = self.read_pod(pod) - if not self.container_is_running(pod=remote_pod, container_name=base_container): + if remote_pod.status.phase in PodPhase.terminal_states: break self.log.info('Pod %s has phase %s', pod.metadata.name, remote_pod.status.phase) time.sleep(2) From 95540f9534edece0b24d4aef220476217ef7d637 Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 15 Jun 2022 09:29:31 +0200 Subject: [PATCH 5/7] Fix method name --- airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py | 4 ++-- airflow/providers/cncf/kubernetes/utils/pod_manager.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index b39215bebf2e9..aae7f2b1aacd2 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -48,7 +48,7 @@ from airflow.providers.cncf.kubernetes.utils.pod_manager import ( PodLaunchFailedException, PodManager, - container_is_terminated, + container_is_completed, get_container_termination_message, ) from airflow.settings import pod_mutation_hook @@ -431,7 +431,7 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): if not self.is_delete_operator_pod: with _suppress(Exception): self.patch_already_checked(remote_pod) - if not container_is_terminated(remote_pod, self.BASE_CONTAINER_NAME): + if not container_is_completed(remote_pod, self.BASE_CONTAINER_NAME): if self.log_events_on_failure: with _suppress(Exception): for event in self.pod_manager.read_pod_events(pod).items: diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 859eb9c786614..dfd1752dd0fb1 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -27,6 +27,7 @@ import pendulum import tenacity from kubernetes import client, watch +from kubernetes.client.models import V1ContainerState from kubernetes.client.models.v1_pod import V1Pod from kubernetes.client.rest import ApiException from kubernetes.stream import stream as kubernetes_stream From 6bafc8c90eb6064c369f8ea0817804ac5b00c2f8 Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 16 Jun 2022 12:20:17 +0200 Subject: [PATCH 6/7] Add pod comprobation --- airflow/providers/cncf/kubernetes/utils/pod_manager.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index dfd1752dd0fb1..f444c01f3ac28 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -98,6 +98,8 @@ def container_is_completed(pod: V1Pod, container_name: str) -> bool: If that container is terminated and the reason is Completed, returns True. Returns False otherwise. """ + if pod is None: + return False container_state = get_container_state(pod=pod, container_name=container_name) if container_state.terminated is None: return False From 3d35d538606cd786078e166db6be3b456e628357 Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 22 Jun 2022 17:26:11 +0200 Subject: [PATCH 7/7] Add xcom check before cleanup --- .../providers/cncf/kubernetes/operators/kubernetes_pod.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index aae7f2b1aacd2..8725b583b99ff 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -28,7 +28,7 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.kubernetes import pod_generator -from airflow.kubernetes.pod_generator import PodGenerator +from airflow.kubernetes.pod_generator import PodDefaults, PodGenerator from airflow.kubernetes.secret import Secret from airflow.models import BaseOperator from airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters import ( @@ -415,6 +415,9 @@ def execute(self, context: 'Context'): if self.do_xcom_push: result = self.extract_xcom(pod=self.pod) + self.pod_manager.await_container_completion( + pod=self.pod, container_name=PodDefaults.SIDECAR_CONTAINER_NAME + ) remote_pod = self.pod_manager.read_pod(pod=self.pod) finally: self.cleanup(