diff --git a/cosmos/operators/kubernetes.py b/cosmos/operators/kubernetes.py index 34512d16ad..e1c6936672 100644 --- a/cosmos/operators/kubernetes.py +++ b/cosmos/operators/kubernetes.py @@ -98,6 +98,10 @@ def __init__(self, profile_config: ProfileConfig | None = None, **kwargs: Any) - pass AbstractDbtBase.__init__(self, **base_kwargs) + + container_resources = operator_kwargs.get("container_resources") + if isinstance(container_resources, dict): + operator_kwargs["container_resources"] = k8s.V1ResourceRequirements(**container_resources) KubernetesPodOperator.__init__(self, **operator_kwargs) def build_env_args(self, env: dict[str, str | bytes | PathLike[Any]]) -> None: diff --git a/tests/operators/test_kubernetes.py b/tests/operators/test_kubernetes.py index 809f45cbd6..6e1b6f7588 100644 --- a/tests/operators/test_kubernetes.py +++ b/tests/operators/test_kubernetes.py @@ -1,6 +1,7 @@ from pathlib import Path from unittest.mock import MagicMock, Mock, patch +import kubernetes.client as k8s import pytest from airflow import __version__ as airflow_version from airflow.models import DAG, TaskInstance @@ -461,3 +462,37 @@ def test_kubernetes_default_args(): assert dbt_run_operation.image == image assert dbt_run_operation.project_dir == DBT_ROOT_PATH / "jaffle_shop" assert dbt_run_operation.profile_config.target_name == profile_config.target_name + + +def test_kubernetes_pod_container_resources(): + """Test that the container_resources are converted to V1ResourceRequirements in the operator.""" + resources = { + "requests": {"cpu": "100m", "memory": "128Mi"}, + "limits": {"cpu": "500m", "memory": "512Mi"}, + } + a_operator = DbtRunOperationKubernetesOperator( + task_id="run_macro_command", + macro_name="macro", + project_dir=DBT_ROOT_PATH / "jaffle_shop", + container_resources=resources, + ) + assert isinstance(a_operator.container_resources, k8s.V1ResourceRequirements) + assert a_operator.container_resources.to_dict()["requests"] == resources["requests"] + assert a_operator.container_resources.to_dict()["limits"] == resources["limits"] + + b_operator = DbtRunOperationKubernetesOperator( + task_id="run_macro_command", + macro_name="macro", + project_dir=DBT_ROOT_PATH / "jaffle_shop", + container_resources=k8s.V1ResourceRequirements(**resources), + ) + assert isinstance(b_operator.container_resources, k8s.V1ResourceRequirements) + assert b_operator.container_resources.to_dict()["requests"] == resources["requests"] + assert b_operator.container_resources.to_dict()["limits"] == resources["limits"] + + c_operator = DbtRunOperationKubernetesOperator( + task_id="run_macro_command", + macro_name="macro", + project_dir=DBT_ROOT_PATH / "jaffle_shop", + ) + assert c_operator.container_resources is None