From 96b8677ad4a763305cea2996d7e5beb8344ef1bb Mon Sep 17 00:00:00 2001 From: John Horan Date: Fri, 20 Jun 2025 14:19:36 +0100 Subject: [PATCH 1/6] add convert --- cosmos/operators/kubernetes.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cosmos/operators/kubernetes.py b/cosmos/operators/kubernetes.py index 34512d16ad..d5a915dceb 100644 --- a/cosmos/operators/kubernetes.py +++ b/cosmos/operators/kubernetes.py @@ -98,6 +98,11 @@ def __init__(self, profile_config: ProfileConfig | None = None, **kwargs: Any) - pass AbstractDbtBase.__init__(self, **base_kwargs) + + if isinstance(operator_kwargs.get("container_resources"), dict): + operator_kwargs["container_resources"] = k8s.V1ResourceRequirements( + **operator_kwargs["container_resources"] + ) KubernetesPodOperator.__init__(self, **operator_kwargs) def build_env_args(self, env: dict[str, str | bytes | PathLike[Any]]) -> None: From 022e8020f889b929b1ba3c2e37b1cc94f9433438 Mon Sep 17 00:00:00 2001 From: John Horan Date: Fri, 20 Jun 2025 14:30:19 +0100 Subject: [PATCH 2/6] refactor --- cosmos/operators/kubernetes.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cosmos/operators/kubernetes.py b/cosmos/operators/kubernetes.py index d5a915dceb..e1c6936672 100644 --- a/cosmos/operators/kubernetes.py +++ b/cosmos/operators/kubernetes.py @@ -99,10 +99,9 @@ def __init__(self, profile_config: ProfileConfig | None = None, **kwargs: Any) - AbstractDbtBase.__init__(self, **base_kwargs) - if isinstance(operator_kwargs.get("container_resources"), dict): - operator_kwargs["container_resources"] = k8s.V1ResourceRequirements( - **operator_kwargs["container_resources"] - ) + container_resources = operator_kwargs.get("container_resources") + if isinstance(container_resources, dict): + operator_kwargs["container_resources"] = k8s.V1ResourceRequirements(**container_resources) KubernetesPodOperator.__init__(self, **operator_kwargs) def build_env_args(self, env: dict[str, str | bytes | PathLike[Any]]) -> None: From 054e541137592c633a2275c77046756c66eb8985 Mon Sep 17 00:00:00 2001 From: John Horan Date: Mon, 23 Jun 2025 12:12:21 +0100 Subject: [PATCH 3/6] add test --- tests/operators/test_kubernetes.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/operators/test_kubernetes.py b/tests/operators/test_kubernetes.py index 809f45cbd6..6173271c6a 100644 --- a/tests/operators/test_kubernetes.py +++ b/tests/operators/test_kubernetes.py @@ -1,6 +1,7 @@ from pathlib import Path from unittest.mock import MagicMock, Mock, patch +import kubernetes.client as k8s import pytest from airflow import __version__ as airflow_version from airflow.models import DAG, TaskInstance @@ -461,3 +462,20 @@ def test_kubernetes_default_args(): assert dbt_run_operation.image == image assert dbt_run_operation.project_dir == DBT_ROOT_PATH / "jaffle_shop" assert dbt_run_operation.profile_config.target_name == profile_config.target_name + + +def test_kubernetes_pod_container_resources(): + """Test that the container_resources are converted to V1ResourceRequirements in the operator.""" + resources = { + "requests": {"cpu": "100m", "memory": "128Mi"}, + "limits": {"cpu": "500m", "memory": "512Mi"}, + } + run_operator = DbtRunOperationKubernetesOperator( + task_id="run_macro_command", + macro_name="macro", + project_dir = DBT_ROOT_PATH / "jaffle_shop", + container_resources=resources, + ) + assert isinstance(run_operator.container_resources, k8s.V1ResourceRequirements) + assert run_operator.container_resources.to_dict()["requests"] == resources["requests"] + assert run_operator.container_resources.to_dict()["limits"] == resources["limits"] From fa8a39c9f312a2b584d4f4b685d2616bb782067b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 23 Jun 2025 11:12:42 +0000 Subject: [PATCH 4/6] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20for?= =?UTF-8?q?mat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/operators/test_kubernetes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/operators/test_kubernetes.py b/tests/operators/test_kubernetes.py index 6173271c6a..354382ce7d 100644 --- a/tests/operators/test_kubernetes.py +++ b/tests/operators/test_kubernetes.py @@ -473,7 +473,7 @@ def test_kubernetes_pod_container_resources(): run_operator = DbtRunOperationKubernetesOperator( task_id="run_macro_command", macro_name="macro", - project_dir = DBT_ROOT_PATH / "jaffle_shop", + project_dir=DBT_ROOT_PATH / "jaffle_shop", container_resources=resources, ) assert isinstance(run_operator.container_resources, k8s.V1ResourceRequirements) From 686d4c12511b2fa301e2bec878f92ac99d95699d Mon Sep 17 00:00:00 2001 From: John Horan Date: Mon, 23 Jun 2025 12:14:45 +0100 Subject: [PATCH 5/6] coverage --- tests/operators/test_kubernetes.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/tests/operators/test_kubernetes.py b/tests/operators/test_kubernetes.py index 354382ce7d..1481aa23ca 100644 --- a/tests/operators/test_kubernetes.py +++ b/tests/operators/test_kubernetes.py @@ -470,12 +470,29 @@ def test_kubernetes_pod_container_resources(): "requests": {"cpu": "100m", "memory": "128Mi"}, "limits": {"cpu": "500m", "memory": "512Mi"}, } - run_operator = DbtRunOperationKubernetesOperator( + a_operator = DbtRunOperationKubernetesOperator( task_id="run_macro_command", macro_name="macro", project_dir=DBT_ROOT_PATH / "jaffle_shop", container_resources=resources, ) - assert isinstance(run_operator.container_resources, k8s.V1ResourceRequirements) - assert run_operator.container_resources.to_dict()["requests"] == resources["requests"] - assert run_operator.container_resources.to_dict()["limits"] == resources["limits"] + assert isinstance(a_operator.container_resources, k8s.V1ResourceRequirements) + assert a_operator.container_resources.to_dict()["requests"] == resources["requests"] + assert a_operator.container_resources.to_dict()["limits"] == resources["limits"] + + b_operator = DbtRunOperationKubernetesOperator( + task_id="run_macro_command", + macro_name="macro", + project_dir = DBT_ROOT_PATH / "jaffle_shop", + container_resources=k8s.V1ResourceRequirements(**resources), + ) + assert isinstance(b_operator.container_resources, k8s.V1ResourceRequirements) + assert b_operator.container_resources.to_dict()["requests"] == resources["requests"] + assert b_operator.container_resources.to_dict()["limits"] == resources["limits"] + + c_operator = DbtRunOperationKubernetesOperator( + task_id="run_macro_command", + macro_name="macro", + project_dir = DBT_ROOT_PATH / "jaffle_shop", + ) + assert c_operator.container_resources is None From 9c3a2e8feae1fddfb3cb3750cf789a03d2999f16 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 23 Jun 2025 11:15:05 +0000 Subject: [PATCH 6/6] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20for?= =?UTF-8?q?mat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/operators/test_kubernetes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/operators/test_kubernetes.py b/tests/operators/test_kubernetes.py index 1481aa23ca..6e1b6f7588 100644 --- a/tests/operators/test_kubernetes.py +++ b/tests/operators/test_kubernetes.py @@ -483,7 +483,7 @@ def test_kubernetes_pod_container_resources(): b_operator = DbtRunOperationKubernetesOperator( task_id="run_macro_command", macro_name="macro", - project_dir = DBT_ROOT_PATH / "jaffle_shop", + project_dir=DBT_ROOT_PATH / "jaffle_shop", container_resources=k8s.V1ResourceRequirements(**resources), ) assert isinstance(b_operator.container_resources, k8s.V1ResourceRequirements) @@ -493,6 +493,6 @@ def test_kubernetes_pod_container_resources(): c_operator = DbtRunOperationKubernetesOperator( task_id="run_macro_command", macro_name="macro", - project_dir = DBT_ROOT_PATH / "jaffle_shop", + project_dir=DBT_ROOT_PATH / "jaffle_shop", ) assert c_operator.container_resources is None