From af4f6515e921227c7fc6067bb12d96cc4c2b895a Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 26 May 2025 16:54:17 +0530 Subject: [PATCH 1/3] Upgrade KPO Upgrade KPO --- dagfactory/__init__.py | 2 +- dagfactory/dagbuilder.py | 264 ++++++++++-------- pyproject.toml | 2 +- .../dag_factory_kubernetes_pod_operator.yml | 8 +- ...factory_kubernetes_pod_operator_lt_2_7.yml | 73 +++++ tests/test_dagfactory.py | 13 +- 6 files changed, 241 insertions(+), 121 deletions(-) create mode 100644 tests/fixtures/dag_factory_kubernetes_pod_operator_lt_2_7.yml diff --git a/dagfactory/__init__.py b/dagfactory/__init__.py index 2f17847a..18274204 100644 --- a/dagfactory/__init__.py +++ b/dagfactory/__init__.py @@ -2,7 +2,7 @@ from .dagfactory import DagFactory, load_yaml_dags -__version__ = "0.23.0a3" +__version__ = "0.23.0a4" __all__ = [ "DagFactory", "load_yaml_dags", diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index 48b2cbd6..4f959a16 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -23,16 +23,28 @@ try: from airflow.version import version as AIRFLOW_VERSION -except ImportError: +except ImportError: # pragma: no cover from airflow import __version__ as AIRFLOW_VERSION +try: + from airflow.providers.cncf.kubernetes import get_provider_info + + try: + K8S_PROVIDER_VERSION = get_provider_info.get_provider_info()["versions"][0] + except KeyError: # pragma: no cover + from airflow.providers.cncf.kubernetes import __version__ + + K8S_PROVIDER_VERSION = __version__ +except ImportError: # pragma: no cover + K8S_PROVIDER_VERSION = "0" + INSTALLED_AIRFLOW_VERSION = version.parse(AIRFLOW_VERSION) # python operators were moved in 2.4 try: from airflow.operators.python import BranchPythonOperator, PythonOperator -except ImportError: +except ImportError: # pragma: no cover from airflow.operators.python_operator import BranchPythonOperator, PythonOperator from airflow.providers.http.sensors.http import HttpSensor @@ -42,12 +54,12 @@ from airflow.providers.http.operators.http import HttpOperator HTTP_OPERATOR_CLASS = HttpOperator -except ImportError: +except ImportError: # pragma: no cover try: from airflow.providers.http.operators.http import SimpleHttpOperator HTTP_OPERATOR_CLASS = SimpleHttpOperator - except ImportError: + except ImportError: # pragma: no cover # Fall back to dynamically importing the operator HTTP_OPERATOR_CLASS = None @@ -55,54 +67,41 @@ # sql sensor was moved in 2.4 try: from airflow.sensors.sql_sensor import SqlSensor -except ImportError: +except ImportError: # pragma: no cover from airflow.providers.common.sql.sensors.sql import SqlSensor -from airflow.sensors.python import PythonSensor - -if INSTALLED_AIRFLOW_VERSION.major < AIRFLOW3_MAJOR_VERSION: - # k8s libraries are moved in v5.0.0 - try: - from airflow.providers.cncf.kubernetes import get_provider_info - - K8S_PROVIDER_VERSION = get_provider_info.get_provider_info()["versions"][0] - except ImportError: - K8S_PROVIDER_VERSION = "0" +from airflow.models import MappedOperator - # kubernetes operator - try: - if version.parse(K8S_PROVIDER_VERSION) < version.parse("5.0.0"): - from airflow.kubernetes.pod import Port - from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv - from airflow.kubernetes.volume import Volume - from airflow.kubernetes.volume_mount import VolumeMount - else: - from kubernetes.client.models import ( - V1ContainerPort as Port, - V1EnvVar, - V1EnvVarSource, - V1ObjectFieldSelector, - V1Volume, - V1VolumeMount as VolumeMount, - ) - from airflow.kubernetes.secret import Secret +try: + from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +except ImportError: + from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator - if version.parse(K8S_PROVIDER_VERSION) < version.parse("10"): - from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator - else: - from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator - except ImportError: # pragma: no cover - from airflow.contrib.kubernetes.pod import Port - from airflow.contrib.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv - from airflow.contrib.kubernetes.secret import Secret - from airflow.contrib.kubernetes.volume import Volume - from airflow.contrib.kubernetes.volume_mount import VolumeMount - from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator +try: + from airflow.providers.cncf.kubernetes.secret import Secret +except ImportError: + from airflow.kubernetes.secret import Secret -from airflow.models import MappedOperator +from airflow.sensors.python import PythonSensor from airflow.timetables.base import Timetable from airflow.utils.task_group import TaskGroup -from kubernetes.client.models import V1Container, V1Pod +from kubernetes.client.models import ( + V1Affinity, + V1Container, + V1ContainerPort as Port, + V1EnvFromSource, + V1EnvVar, + V1HostAlias, + V1LocalObjectReference, + V1Pod, + V1PodDNSConfig, + V1PodSecurityContext, + V1ResourceRequirements, + V1SecurityContext, + V1Toleration, + V1Volume, + V1VolumeMount as VolumeMount, +) from dagfactory import parsers, utils from dagfactory.exceptions import DagFactoryConfigException, DagFactoryException @@ -267,10 +266,21 @@ def make_timetable(timetable: str, timetable_params: Dict[str, Any]) -> Timetabl raise DagFactoryException(f"Failed to import timetable {timetable} due to: {err}") from err try: schedule: Timetable = timetable_obj(**timetable_params) - except Exception as err: + except Exception as err: # pragma: no cover raise DagFactoryException(f"Failed to create {timetable_obj} due to: {err}") from err return schedule + @staticmethod + def _create_volume(vol): + volume = V1Volume(name=vol.get("name")) + for k, v in vol["configs"].items(): + snake_key = utils.convert_to_snake_case(k) + if hasattr(volume, snake_key): + setattr(volume, snake_key, v) + else: + raise DagFactoryException(f"Volume for KubernetesPodOperator does not have attribute {k}") + return volume + # pylint: disable=too-many-branches # pylint: disable=too-many-statements # pylint: disable=too-many-locals @@ -369,77 +379,103 @@ def make_task(operator: str, task_params: Dict[str, Any]) -> BaseOperator: # Airflow 2.0 doesn't allow these to be passed to operator del task_params["response_check_lambda"] - if INSTALLED_AIRFLOW_VERSION.major < AIRFLOW3_MAJOR_VERSION: - # KubernetesPodOperator - if issubclass(operator_obj, KubernetesPodOperator): - task_params["secrets"] = ( - [Secret(**v) for v in task_params.get("secrets")] - if task_params.get("secrets") is not None - else None - ) + if issubclass(operator_obj, KubernetesPodOperator): + task_params["ports"] = ( + [Port(**v) for v in task_params.get("ports")] if task_params.get("ports") is not None else None + ) - task_params["ports"] = ( - [Port(**v) for v in task_params.get("ports")] if task_params.get("ports") is not None else None - ) - task_params["volume_mounts"] = ( - [VolumeMount(**v) for v in task_params.get("volume_mounts")] - if task_params.get("volume_mounts") is not None - else None - ) - if version.parse(K8S_PROVIDER_VERSION) < version.parse("5.0.0"): - task_params["volumes"] = ( - [Volume(**v) for v in task_params.get("volumes")] - if task_params.get("volumes") is not None - else None - ) - task_params["pod_runtime_info_envs"] = ( - [PodRuntimeInfoEnv(**v) for v in task_params.get("pod_runtime_info_envs")] - if task_params.get("pod_runtime_info_envs") is not None - else None - ) - else: - if task_params.get("volumes") is not None: - task_params_volumes = [] - for vol in task_params.get("volumes"): - resp = V1Volume(name=vol.get("name")) - for k, v in vol["configs"].items(): - snake_key = utils.convert_to_snake_case(k) - if hasattr(resp, snake_key): - setattr(resp, snake_key, v) - else: - raise DagFactoryException( - f"Volume for KubernetesPodOperator \ - does not have attribute {k}" - ) - task_params_volumes.append(resp) - task_params["volumes"] = task_params_volumes - else: - task_params["volumes"] = None - - task_params["pod_runtime_info_envs"] = ( - [ - V1EnvVar( - name=v.get("name"), - value_from=V1EnvVarSource( - field_ref=V1ObjectFieldSelector(field_path=v.get("field_path")) - ), - ) - for v in task_params.get("pod_runtime_info_envs") - ] - if task_params.get("pod_runtime_info_envs") is not None - else None - ) - task_params["full_pod_spec"] = ( - V1Pod(**task_params.get("full_pod_spec")) - if task_params.get("full_pod_spec") is not None - else None - ) - task_params["init_containers"] = ( - [V1Container(**v) for v in task_params.get("init_containers")] - if task_params.get("init_containers") is not None + task_params["volume_mounts"] = ( + [VolumeMount(**v) for v in task_params.get("volume_mounts")] + if task_params.get("volume_mounts") is not None + else None + ) + + task_params["volumes"] = ( + [DagBuilder._create_volume(vol) for vol in task_params["volumes"]] + if task_params.get("volumes") is not None + else None + ) + + task_params["env_vars"] = ( + [V1EnvVar(**env_var) for env_var in task_params.get("env_vars")] + if task_params.get("env_vars") is not None + else None + ) + + task_params["env_from"] = ( + [V1EnvFromSource(**v) for v in task_params["env_from"]] + if task_params.get("env_from") is not None + else None + ) + + task_params["secrets"] = ( + [Secret(**v) for v in task_params.get("secrets")] + if task_params.get("secrets") is not None + else None + ) + + task_params["container_resources"] = ( + V1ResourceRequirements(**task_params["container_resources"]) + if task_params.get("container_resources") is not None + else None + ) + + task_params["affinity"] = ( + V1Affinity(task_params["affinity"]) if task_params.get("affinity") is not None else None + ) + + task_params["image_pull_secrets"] = ( + [V1LocalObjectReference(**v) for v in task_params["image_pull_secrets"]] + if task_params.get("image_pull_secrets") is not None + else None + ) + + if version.parse(K8S_PROVIDER_VERSION) >= version.parse("7.8.0"): + # See PR: https://github.com/apache/airflow/pull/35063 + task_params["host_aliases"] = ( + [V1HostAlias(**v) for v in task_params.get("host_aliases")] + if task_params.get("host_aliases") is not None else None ) + task_params["tolerations"] = ( + [V1Toleration(**v) for v in task_params.get("tolerations")] + if task_params.get("tolerations") is not None + else None + ) + + task_params["security_context"] = ( + V1PodSecurityContext(task_params["security_context"]) + if task_params.get("security_context") is not None + else None + ) + + task_params["container_security_context"] = ( + V1SecurityContext(task_params["container_security_context"]) + if task_params.get("container_security_context") is not None + else None + ) + + task_params["dns_config"] = ( + V1PodDNSConfig(task_params["dns_config"]) if task_params.get("dns_config") is not None else None + ) + + task_params["init_containers"] = ( + [V1Container(**v) for v in task_params.get("init_containers")] + if task_params.get("init_containers") is not None + else None + ) + + task_params["pod_runtime_info_envs"] = ( + [V1EnvVar(**v) for v in task_params.get("pod_runtime_info_envs")] + if task_params.get("pod_runtime_info_envs") is not None + else None + ) + + task_params["full_pod_spec"] = ( + V1Pod(**task_params.get("full_pod_spec")) if task_params.get("full_pod_spec") is not None else None + ) + # HttpOperator if HTTP_OPERATOR_CLASS and issubclass(operator_obj, HTTP_OPERATOR_CLASS): headers = task_params.get("headers", {}) @@ -469,7 +505,7 @@ def make_task(operator: str, task_params: Dict[str, Any]) -> BaseOperator: else operator_obj.partial(**task_params).expand(**expand_kwargs) ) except Exception as err: - raise DagFactoryException(f"Failed to create {operator_obj} task") from err + raise DagFactoryException(f"Failed to create {operator_obj} task: {err}") from err return task @staticmethod diff --git a/pyproject.toml b/pyproject.toml index 71e6d5f5..fe6b1ed8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,7 @@ classifiers = [ dependencies = [ "apache-airflow>=2.3", "apache-airflow-providers-http>=2.0.0", - "apache-airflow-providers-cncf-kubernetes<10.4.2", # https://github.com/astronomer/dag-factory/issues/397 + "apache-airflow-providers-cncf-kubernetes", "pyyaml", "packaging", ] diff --git a/tests/fixtures/dag_factory_kubernetes_pod_operator.yml b/tests/fixtures/dag_factory_kubernetes_pod_operator.yml index e0cfbecc..d965b782 100644 --- a/tests/fixtures/dag_factory_kubernetes_pod_operator.yml +++ b/tests/fixtures/dag_factory_kubernetes_pod_operator.yml @@ -14,7 +14,7 @@ default: example_dag: tasks: task_1: - operator: airflow.contrib.operators.kubernetes_pod_operator.KubernetesPodOperator + operator: airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator namespace: 'default' config_file : 'path_to_config_file' image : 'image' @@ -35,8 +35,8 @@ example_dag: {"name":"name","configs":{'persistentVolumeClaim': {'claimName': 'test-volume'}}}, ] pod_runtime_info_envs : [ - {"name":"name","field_path":"field_path"}, - {"name":"name","field_path":"field_path"}, + {"name":"name","value":"field_path"}, + {"name":"name","value":"field_path"}, ] full_pod_spec : { "api_version": "api_version", @@ -55,7 +55,7 @@ example_dag: in_cluster: False dependencies: [] task_2: - operator: airflow.contrib.operators.kubernetes_pod_operator.KubernetesPodOperator + operator: airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator namespace: 'default' config_file : 'path_to_config_file' image : 'image' diff --git a/tests/fixtures/dag_factory_kubernetes_pod_operator_lt_2_7.yml b/tests/fixtures/dag_factory_kubernetes_pod_operator_lt_2_7.yml new file mode 100644 index 00000000..0e6e9282 --- /dev/null +++ b/tests/fixtures/dag_factory_kubernetes_pod_operator_lt_2_7.yml @@ -0,0 +1,73 @@ +default: + default_args: + owner: 'default_owner' + start_date: 2018-03-01 + end_date: 2018-03-05 + retries: 1 + retry_delay_sec: 300 + concurrency: 1 + max_active_runs: 1 + dagrun_timeout_sec: 600 + default_view: 'tree' + orientation: 'LR' + schedule_interval: '0 1 * * *' +example_dag: + tasks: + task_1: + operator: airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator + namespace: 'default' + config_file : 'path_to_config_file' + image : 'image' + image_pull_policy : 'Always' + arguments : [ + 'arg1', + 'arg2', + 'arg3', + ] + secrets : [{"secret":"secret","deploy_type":"env","deploy_target":"ENV_VAR"}] + ports : [{"name" : "name","container_port":"container_port"},{"name" : "name","container_port":"container_port"}] + volume_mounts : [ + {"name":"name","mount_path":"mount_path","sub_path":"sub_path","read_only":"read_only"}, + {"name":"name","mount_path":"mount_path","sub_path":"sub_path","read_only":"read_only"}, + ] + volumes : [ + {"name":"name","configs":{'persistentVolumeClaim': {'claimName': 'test-volume'}}}, + {"name":"name","configs":{'persistentVolumeClaim': {'claimName': 'test-volume'}}}, + ] + pod_runtime_info_envs : [ + { "name": "name","value": "field_path" }, + { "name": "name","value": "field_path" }, + ] + full_pod_spec : { + "api_version": "api_version", + "kind": "kind", + "metadata": "metadata", + "spec": "spec", + "status": "status", + } + init_containers : [ + {"name": "name","args":"args","command":"command"}, + ] + labels: {'foo': 'bar'} + name: 'passing-test' + task_id: 'passing-task' + get_logs: True + in_cluster: False + dependencies: [] + task_2: + operator: airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator + namespace: 'default' + config_file : 'path_to_config_file' + image : 'image' + image_pull_policy : 'Always' + arguments : [ + 'arg1', + 'arg2', + 'arg3', + ] + labels: {'foo': 'bar'} + name: 'passing-test' + task_id: 'passing-task' + get_logs: True + in_cluster: False + dependencies: ['task_1'] diff --git a/tests/test_dagfactory.py b/tests/test_dagfactory.py index 9a7d2139..9180e9e2 100644 --- a/tests/test_dagfactory.py +++ b/tests/test_dagfactory.py @@ -16,6 +16,9 @@ INVALID_DAG_FACTORY = os.path.join(here, "fixtures/invalid_dag_factory.yml") DEFAULT_ARGS_CONFIG_ROOT = os.path.join(here, "fixtures/") DAG_FACTORY_KUBERNETES_POD_OPERATOR = os.path.join(here, "fixtures/dag_factory_kubernetes_pod_operator.yml") +DAG_FACTORY_KUBERNETES_POD_OPERATOR_LT_2_7 = os.path.join( + here, "fixtures/dag_factory_kubernetes_pod_operator_lt_2_7.yml" +) DAG_FACTORY_VARIABLES_AS_ARGUMENTS = os.path.join(here, "fixtures/dag_factory_variables_as_arguments.yml") DOC_MD_FIXTURE_FILE = os.path.join(here, "fixtures/mydocfile.md") @@ -313,12 +316,20 @@ def test_generate_dags_invalid(): td.generate_dags(globals()) -def test_kubernetes_pod_operator_dag(): +@pytest.mark.skipif(version.parse(AIRFLOW_VERSION) < version.parse("2.7.0"), reason="Requires Airflow >= 2.7.0") +def test_kubernetes_pod_operator_dag_gt_2_7(): td = dagfactory.DagFactory(DAG_FACTORY_KUBERNETES_POD_OPERATOR) td.generate_dags(globals()) assert "example_dag" in globals() +@pytest.mark.skipif(version.parse(AIRFLOW_VERSION) <= version.parse("2.7.0"), reason="Requires Airflow <= 2.7.0") +def test_kubernetes_pod_operator_dag_lte_2_7(): + td = dagfactory.DagFactory(DAG_FACTORY_KUBERNETES_POD_OPERATOR_LT_2_7) + td.generate_dags(globals()) + assert "example_dag" in globals() + + def test_variables_as_arguments_dag(): override_command = "value_from_variable" if version.parse(AIRFLOW_VERSION) >= version.parse("1.10.10"): From 07387cabf267cc786da49da0ba40792071801591 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 28 May 2025 20:10:00 +0530 Subject: [PATCH 2/3] Address review feedback --- dagfactory/dagbuilder.py | 133 +++++++++++---------------------------- 1 file changed, 38 insertions(+), 95 deletions(-) diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index 4f959a16..55dc15a4 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -281,6 +281,43 @@ def _create_volume(vol): raise DagFactoryException(f"Volume for KubernetesPodOperator does not have attribute {k}") return volume + @staticmethod + def _clean_kpo_task_params(task_params: dict) -> dict: + conversions = [ + ("ports", Port, "list"), + ("volume_mounts", VolumeMount, "list"), + ("env_vars", V1EnvVar, "list"), + ("env_from", V1EnvFromSource, "list"), + ("secrets", Secret, "list"), + ("container_resources", V1ResourceRequirements, "single"), + ("affinity", V1Affinity, "single"), + ("image_pull_secrets", V1LocalObjectReference, "list"), + ("tolerations", V1Toleration, "list"), + ("security_context", V1PodSecurityContext, "single"), + ("container_security_context", V1SecurityContext, "single"), + ("dns_config", V1PodDNSConfig, "single"), + ("init_containers", V1Container, "list"), + ("pod_runtime_info_envs", V1EnvVar, "list"), + ("full_pod_spec", V1Pod, "single"), + ] + + # Conditional field based on version + if version.parse(K8S_PROVIDER_VERSION) >= version.parse("7.8.0"): + conversions.append(("host_aliases", V1HostAlias, "list")) + + for key, cls, conv_type in conversions: + if key in task_params and task_params[key] is not None: + if conv_type == "list": + task_params[key] = [cls(**v) for v in task_params[key]] + elif conv_type == "single": + task_params[key] = cls(task_params[key]) + + # Special case for volumes that uses a different constructor + if task_params.get("volumes") is not None: + task_params["volumes"] = [DagBuilder._create_volume(vol) for vol in task_params["volumes"]] + + return task_params + # pylint: disable=too-many-branches # pylint: disable=too-many-statements # pylint: disable=too-many-locals @@ -380,101 +417,7 @@ def make_task(operator: str, task_params: Dict[str, Any]) -> BaseOperator: del task_params["response_check_lambda"] if issubclass(operator_obj, KubernetesPodOperator): - task_params["ports"] = ( - [Port(**v) for v in task_params.get("ports")] if task_params.get("ports") is not None else None - ) - - task_params["volume_mounts"] = ( - [VolumeMount(**v) for v in task_params.get("volume_mounts")] - if task_params.get("volume_mounts") is not None - else None - ) - - task_params["volumes"] = ( - [DagBuilder._create_volume(vol) for vol in task_params["volumes"]] - if task_params.get("volumes") is not None - else None - ) - - task_params["env_vars"] = ( - [V1EnvVar(**env_var) for env_var in task_params.get("env_vars")] - if task_params.get("env_vars") is not None - else None - ) - - task_params["env_from"] = ( - [V1EnvFromSource(**v) for v in task_params["env_from"]] - if task_params.get("env_from") is not None - else None - ) - - task_params["secrets"] = ( - [Secret(**v) for v in task_params.get("secrets")] - if task_params.get("secrets") is not None - else None - ) - - task_params["container_resources"] = ( - V1ResourceRequirements(**task_params["container_resources"]) - if task_params.get("container_resources") is not None - else None - ) - - task_params["affinity"] = ( - V1Affinity(task_params["affinity"]) if task_params.get("affinity") is not None else None - ) - - task_params["image_pull_secrets"] = ( - [V1LocalObjectReference(**v) for v in task_params["image_pull_secrets"]] - if task_params.get("image_pull_secrets") is not None - else None - ) - - if version.parse(K8S_PROVIDER_VERSION) >= version.parse("7.8.0"): - # See PR: https://github.com/apache/airflow/pull/35063 - task_params["host_aliases"] = ( - [V1HostAlias(**v) for v in task_params.get("host_aliases")] - if task_params.get("host_aliases") is not None - else None - ) - - task_params["tolerations"] = ( - [V1Toleration(**v) for v in task_params.get("tolerations")] - if task_params.get("tolerations") is not None - else None - ) - - task_params["security_context"] = ( - V1PodSecurityContext(task_params["security_context"]) - if task_params.get("security_context") is not None - else None - ) - - task_params["container_security_context"] = ( - V1SecurityContext(task_params["container_security_context"]) - if task_params.get("container_security_context") is not None - else None - ) - - task_params["dns_config"] = ( - V1PodDNSConfig(task_params["dns_config"]) if task_params.get("dns_config") is not None else None - ) - - task_params["init_containers"] = ( - [V1Container(**v) for v in task_params.get("init_containers")] - if task_params.get("init_containers") is not None - else None - ) - - task_params["pod_runtime_info_envs"] = ( - [V1EnvVar(**v) for v in task_params.get("pod_runtime_info_envs")] - if task_params.get("pod_runtime_info_envs") is not None - else None - ) - - task_params["full_pod_spec"] = ( - V1Pod(**task_params.get("full_pod_spec")) if task_params.get("full_pod_spec") is not None else None - ) + task_params = DagBuilder._clean_kpo_task_params(task_params) # HttpOperator if HTTP_OPERATOR_CLASS and issubclass(operator_obj, HTTP_OPERATOR_CLASS): From 16cda2e4a476d344abe95e88261b11f358a38c43 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 28 May 2025 20:40:10 +0530 Subject: [PATCH 3/3] Test for multiple version provider --- dagfactory/dagbuilder.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index 55dc15a4..05506c04 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -91,13 +91,9 @@ V1ContainerPort as Port, V1EnvFromSource, V1EnvVar, - V1HostAlias, V1LocalObjectReference, V1Pod, - V1PodDNSConfig, V1PodSecurityContext, - V1ResourceRequirements, - V1SecurityContext, V1Toleration, V1Volume, V1VolumeMount as VolumeMount, @@ -289,13 +285,10 @@ def _clean_kpo_task_params(task_params: dict) -> dict: ("env_vars", V1EnvVar, "list"), ("env_from", V1EnvFromSource, "list"), ("secrets", Secret, "list"), - ("container_resources", V1ResourceRequirements, "single"), ("affinity", V1Affinity, "single"), ("image_pull_secrets", V1LocalObjectReference, "list"), ("tolerations", V1Toleration, "list"), ("security_context", V1PodSecurityContext, "single"), - ("container_security_context", V1SecurityContext, "single"), - ("dns_config", V1PodDNSConfig, "single"), ("init_containers", V1Container, "list"), ("pod_runtime_info_envs", V1EnvVar, "list"), ("full_pod_spec", V1Pod, "single"), @@ -303,8 +296,25 @@ def _clean_kpo_task_params(task_params: dict) -> dict: # Conditional field based on version if version.parse(K8S_PROVIDER_VERSION) >= version.parse("7.8.0"): + from kubernetes.client.models import V1HostAlias + conversions.append(("host_aliases", V1HostAlias, "list")) + if version.parse(K8S_PROVIDER_VERSION) >= version.parse("7.0.0"): + from kubernetes.client.models import V1PodDNSConfig + + conversions.append(("dns_config", V1PodDNSConfig, "single")) + + if version.parse(K8S_PROVIDER_VERSION) >= version.parse("5.0.0"): + from kubernetes.client.models import V1ResourceRequirements + + conversions.append(("container_resources", V1ResourceRequirements, "single")) + + if version.parse(K8S_PROVIDER_VERSION) >= version.parse("4.4.0"): + from kubernetes.client.models import V1SecurityContext + + conversions.append(("container_security_context", V1SecurityContext, "single")) + for key, cls, conv_type in conversions: if key in task_params and task_params[key] is not None: if conv_type == "list":