diff --git a/airflow/providers/cncf/kubernetes/__init__.py b/airflow/providers/cncf/kubernetes/__init__.py index 217e5db960782..0998e31143fc8 100644 --- a/airflow/providers/cncf/kubernetes/__init__.py +++ b/airflow/providers/cncf/kubernetes/__init__.py @@ -15,3 +15,30 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import sys + +if sys.version_info < (3, 7): + # This is needed because the Python Kubernetes client >= 12.0 contains a logging object, meaning that + # v1.Pod et al. are not pickleable on Python 3.6. + + # Python 3.7 added this via https://bugs.python.org/issue30520 in 2017 -- but Python 3.6 doesn't have this + # method. + + # This is duplicated/backported from airflow.logging_config in 2.2, but by having it here as well it means + # that we can update the version used in this provider and have it work for older versions + import copyreg + import logging + + def _reduce_Logger(logger): + if logging.getLogger(logger.name) is not logger: + import pickle + + raise pickle.PicklingError('logger cannot be pickled') + return logging.getLogger, (logger.name,) + + def _reduce_RootLogger(logger): + return logging.getLogger, () + + if logging.Logger not in copyreg.dispatch_table: + copyreg.pickle(logging.Logger, _reduce_Logger) + copyreg.pickle(logging.RootLogger, _reduce_RootLogger) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 70b5b4c2ebaed..3640eb2e5da9d 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -20,12 +20,11 @@ import time from contextlib import closing from datetime import datetime -from typing import Iterable, Optional, Tuple, Union +from typing import TYPE_CHECKING, Iterable, Optional, Tuple, Union import pendulum import tenacity from kubernetes import client, watch -from kubernetes.client.models.v1_event_list import V1EventList from kubernetes.client.models.v1_pod import V1Pod from kubernetes.client.rest import ApiException from kubernetes.stream import stream as kubernetes_stream @@ -38,6 +37,13 @@ from airflow.kubernetes.pod_generator import PodDefaults from airflow.utils.log.logging_mixin import LoggingMixin +if TYPE_CHECKING: + try: + # Kube >= 19 + from kubernetes.client.models.core_v1_event_list import CoreV1EventList as V1EventList + except ImportError: + from kubernetes.client.models.v1_event_list import V1EventList + class PodLaunchFailedException(AirflowException): """When pod launching fails in KubernetesPodOperator.""" @@ -293,7 +299,7 @@ def read_pod_logs( raise @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) - def read_pod_events(self, pod: V1Pod) -> V1EventList: + def read_pod_events(self, pod: V1Pod) -> "V1EventList": """Reads events from the POD""" try: return self._client.list_namespaced_event( diff --git a/setup.py b/setup.py index 2b4771ee007d4..c9d4d0ba87d8d 100644 --- a/setup.py +++ b/setup.py @@ -381,7 +381,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version ] kubernetes = [ 'cryptography>=2.0.0', - 'kubernetes>=3.0.0, <12.0.0', + 'kubernetes>=3.0.0', ] kylin = ['kylinpy>=2.6'] ldap = [ diff --git a/tests/kubernetes/test_client.py b/tests/kubernetes/test_client.py index bf5dcfc3d0a2d..9228e9b704a54 100644 --- a/tests/kubernetes/test_client.py +++ b/tests/kubernetes/test_client.py @@ -63,5 +63,9 @@ def test_disable_verify_ssl(self): _disable_verify_ssl() - configuration = Configuration() + # Support wide range of kube client libraries + if hasattr(Configuration, 'get_default_copy'): + configuration = Configuration.get_default_copy() + else: + configuration = Configuration() self.assertFalse(configuration.verify_ssl)