diff --git a/cosmos/operators/kubernetes.py b/cosmos/operators/kubernetes.py index 75ea001c42..8b38d4f162 100644 --- a/cosmos/operators/kubernetes.py +++ b/cosmos/operators/kubernetes.py @@ -7,6 +7,7 @@ from airflow.utils.context import Context from cosmos.log import get_logger +from cosmos.config import ProfileConfig from cosmos.operators.base import DbtBaseOperator @@ -35,7 +36,8 @@ class DbtKubernetesBaseOperator(KubernetesPodOperator, DbtBaseOperator): # type intercept_flag = False - def __init__(self, **kwargs: Any) -> None: + def __init__(self, profile_config: ProfileConfig | None = None, **kwargs: Any) -> None: + self.profile_config = profile_config super().__init__(**kwargs) def build_env_args(self, env: dict[str, str | bytes | PathLike[Any]]) -> None: @@ -57,6 +59,14 @@ def build_kube_args(self, context: Context, cmd_flags: list[str] | None = None) # to add that in the future self.dbt_executable_path = "dbt" dbt_cmd, env_vars = self.build_cmd(context=context, cmd_flags=cmd_flags) + + # Parse ProfileConfig and add additional arguments to the dbt_cmd + if self.profile_config: + if self.profile_config.profile_name: + dbt_cmd.extend(["--profile", self.profile_config.profile_name]) + if self.profile_config.target_name: + dbt_cmd.extend(["--target", self.profile_config.target_name]) + # set env vars self.build_env_args(env_vars) self.arguments = dbt_cmd @@ -69,7 +79,7 @@ class DbtLSKubernetesOperator(DbtKubernetesBaseOperator): ui_color = "#DBCDF6" - def __init__(self, **kwargs: str) -> None: + def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) self.base_cmd = ["ls"] @@ -86,7 +96,7 @@ class DbtSeedKubernetesOperator(DbtKubernetesBaseOperator): ui_color = "#F58D7E" - def __init__(self, full_refresh: bool = False, **kwargs: str) -> None: + def __init__(self, full_refresh: bool = False, **kwargs: Any) -> None: self.full_refresh = full_refresh super().__init__(**kwargs) self.base_cmd = ["seed"] @@ -111,7 +121,7 @@ class DbtSnapshotKubernetesOperator(DbtKubernetesBaseOperator): ui_color = "#964B00" - def __init__(self, **kwargs: str) -> None: + def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) self.base_cmd = ["snapshot"] @@ -127,7 +137,7 @@ class DbtRunKubernetesOperator(DbtKubernetesBaseOperator): ui_color = "#7352BA" ui_fgcolor = "#F4F2FC" - def __init__(self, **kwargs: str) -> None: + def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) self.base_cmd = ["run"] @@ -142,7 +152,7 @@ class DbtTestKubernetesOperator(DbtKubernetesBaseOperator): ui_color = "#8194E0" - def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwargs: str) -> None: + def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwargs: Any) -> None: super().__init__(**kwargs) self.base_cmd = ["test"] # as of now, on_warning_callback in kubernetes executor does nothing @@ -164,7 +174,7 @@ class DbtRunOperationKubernetesOperator(DbtKubernetesBaseOperator): ui_color = "#8194E0" template_fields: Sequence[str] = ("args",) - def __init__(self, macro_name: str, args: dict[str, Any] | None = None, **kwargs: str) -> None: + def __init__(self, macro_name: str, args: dict[str, Any] | None = None, **kwargs: Any) -> None: self.macro_name = macro_name self.args = args super().__init__(**kwargs)