From 67d806fd8464afe58c070277a003ffa841c67eae Mon Sep 17 00:00:00 2001 From: nicor88 <6278547+nicor88@users.noreply.github.com> Date: Fri, 7 Mar 2025 17:25:58 +0100 Subject: [PATCH 01/11] tweaks --- cosmos/operators/aws_ecs.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cosmos/operators/aws_ecs.py b/cosmos/operators/aws_ecs.py index 2decbcc6c4..8654f3ca4b 100644 --- a/cosmos/operators/aws_ecs.py +++ b/cosmos/operators/aws_ecs.py @@ -21,6 +21,7 @@ logger = get_logger(__name__) +DEFAULT_CONN_ID = "aws_default" DEFAULT_ENVIRONMENT_VARIABLES: dict[str, str] = {} try: @@ -47,11 +48,11 @@ class DbtAwsEcsBaseOperator(AbstractDbtBase, EcsRunTaskOperator): # type: ignor def __init__( self, # arguments required by EcsRunTaskOperator - aws_conn_id: str, cluster: str, task_definition: str, container_name: str, # + aws_conn_id: str = DEFAULT_CONN_ID, profile_config: ProfileConfig | None = None, command: list[str] | None = None, environment_variables: dict[str, Any] | None = None, @@ -69,7 +70,9 @@ def __init__( "overrides": None, } ) - super().__init__(**kwargs) + super().__init__( + container_name=self.container_name, + **kwargs) # In PR #1474, we refactored cosmos.operators.base.AbstractDbtBase to remove its inheritance from BaseOperator # and eliminated the super().__init__() call. This change was made to resolve conflicts in parent class # initializations while adding support for ExecutionMode.AIRFLOW_ASYNC. Operators under this mode inherit @@ -103,6 +106,7 @@ def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> # For the first round, we're going to assume that the command is dbt # This means that we don't have openlineage support, but we will create a ticket # to add that in the future + logger.info('Container name: {}'.format(self.container_name)) self.dbt_executable_path = "dbt" dbt_cmd, env_vars = self.build_cmd(context=context, cmd_flags=cmd_flags) self.environment_variables = {**env_vars, **self.environment_variables} @@ -118,7 +122,6 @@ def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> ] } - class DbtBuildAwsEcsOperator(DbtBuildMixin, DbtAwsEcsBaseOperator): """ Executes a dbt core build command. From c2b97ee63662806dc0f16c766cd94c087e2f8243 Mon Sep 17 00:00:00 2001 From: nicor88 <6278547+nicor88@users.noreply.github.com> Date: Fri, 7 Mar 2025 17:32:02 +0100 Subject: [PATCH 02/11] add logger --- cosmos/operators/aws_ecs.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cosmos/operators/aws_ecs.py b/cosmos/operators/aws_ecs.py index 8654f3ca4b..76064bbb00 100644 --- a/cosmos/operators/aws_ecs.py +++ b/cosmos/operators/aws_ecs.py @@ -70,6 +70,7 @@ def __init__( "overrides": None, } ) + logger.info('Container name: {}'.format(self.container_name)) super().__init__( container_name=self.container_name, **kwargs) From 9f9207d6b87daaaa63fd2f1e8d1106311618b66e Mon Sep 17 00:00:00 2001 From: nicor88 <6278547+nicor88@users.noreply.github.com> Date: Fri, 7 Mar 2025 17:45:55 +0100 Subject: [PATCH 03/11] add default --- cosmos/operators/aws_ecs.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cosmos/operators/aws_ecs.py b/cosmos/operators/aws_ecs.py index 76064bbb00..8647f246b6 100644 --- a/cosmos/operators/aws_ecs.py +++ b/cosmos/operators/aws_ecs.py @@ -72,7 +72,6 @@ def __init__( ) logger.info('Container name: {}'.format(self.container_name)) super().__init__( - container_name=self.container_name, **kwargs) # In PR #1474, we refactored cosmos.operators.base.AbstractDbtBase to remove its inheritance from BaseOperator # and eliminated the super().__init__() call. This change was made to resolve conflicts in parent class @@ -116,7 +115,7 @@ def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> self.overrides = { "containerOverrides": [ { - "name": self.container_name, + "name": self.container_name or 'main', "command": self.command, "environment": [{"name": key, "value": value} for key, value in self.environment_variables.items()], } From 3e82df0a976d75aeeb7d8bf5e57bf7ab15accfd6 Mon Sep 17 00:00:00 2001 From: nicor88 <6278547+nicor88@users.noreply.github.com> Date: Fri, 7 Mar 2025 17:56:05 +0100 Subject: [PATCH 04/11] use dbt_container_name --- cosmos/operators/aws_ecs.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/cosmos/operators/aws_ecs.py b/cosmos/operators/aws_ecs.py index 8647f246b6..984fd31688 100644 --- a/cosmos/operators/aws_ecs.py +++ b/cosmos/operators/aws_ecs.py @@ -50,7 +50,7 @@ def __init__( # arguments required by EcsRunTaskOperator cluster: str, task_definition: str, - container_name: str, + dbt_container_name: str, # aws_conn_id: str = DEFAULT_CONN_ID, profile_config: ProfileConfig | None = None, @@ -61,7 +61,7 @@ def __init__( self.profile_config = profile_config self.command = command self.environment_variables = environment_variables or DEFAULT_ENVIRONMENT_VARIABLES - self.container_name = container_name + self.dbt_container_name = dbt_container_name kwargs.update( { "aws_conn_id": aws_conn_id, @@ -70,9 +70,7 @@ def __init__( "overrides": None, } ) - logger.info('Container name: {}'.format(self.container_name)) - super().__init__( - **kwargs) + super().__init__(**kwargs) # In PR #1474, we refactored cosmos.operators.base.AbstractDbtBase to remove its inheritance from BaseOperator # and eliminated the super().__init__() call. This change was made to resolve conflicts in parent class # initializations while adding support for ExecutionMode.AIRFLOW_ASYNC. Operators under this mode inherit @@ -106,7 +104,7 @@ def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> # For the first round, we're going to assume that the command is dbt # This means that we don't have openlineage support, but we will create a ticket # to add that in the future - logger.info('Container name: {}'.format(self.container_name)) + logger.info('Container name: {}'.format(self.dbt_container_name)) self.dbt_executable_path = "dbt" dbt_cmd, env_vars = self.build_cmd(context=context, cmd_flags=cmd_flags) self.environment_variables = {**env_vars, **self.environment_variables} @@ -115,7 +113,7 @@ def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> self.overrides = { "containerOverrides": [ { - "name": self.container_name or 'main', + "name": self.dbt_container_name, "command": self.command, "environment": [{"name": key, "value": value} for key, value in self.environment_variables.items()], } From 61e0f7a3a41aa5df40832e079e77000473e041e9 Mon Sep 17 00:00:00 2001 From: nicor88 <6278547+nicor88@users.noreply.github.com> Date: Fri, 7 Mar 2025 18:05:49 +0100 Subject: [PATCH 05/11] add tests --- cosmos/operators/aws_ecs.py | 5 +++-- tests/operators/test_aws_ecs.py | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cosmos/operators/aws_ecs.py b/cosmos/operators/aws_ecs.py index 984fd31688..7f1d70cd95 100644 --- a/cosmos/operators/aws_ecs.py +++ b/cosmos/operators/aws_ecs.py @@ -40,7 +40,7 @@ class DbtAwsEcsBaseOperator(AbstractDbtBase, EcsRunTaskOperator): # type: ignor """ template_fields: Sequence[str] = tuple( - list(AbstractDbtBase.template_fields) + list(EcsRunTaskOperator.template_fields) + list(AbstractDbtBase.template_fields) + list(EcsRunTaskOperator.template_fields) + 'dbt_container_name' ) intercept_flag = False @@ -50,6 +50,7 @@ def __init__( # arguments required by EcsRunTaskOperator cluster: str, task_definition: str, + dbt_container_name: str, # aws_conn_id: str = DEFAULT_CONN_ID, @@ -104,7 +105,7 @@ def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> # For the first round, we're going to assume that the command is dbt # This means that we don't have openlineage support, but we will create a ticket # to add that in the future - logger.info('Container name: {}'.format(self.dbt_container_name)) + logger.debug('Container name: {}'.format(self.dbt_container_name)) self.dbt_executable_path = "dbt" dbt_cmd, env_vars = self.build_cmd(context=context, cmd_flags=cmd_flags) self.environment_variables = {**env_vars, **self.environment_variables} diff --git a/tests/operators/test_aws_ecs.py b/tests/operators/test_aws_ecs.py index 230c1616a2..2ab96b6636 100644 --- a/tests/operators/test_aws_ecs.py +++ b/tests/operators/test_aws_ecs.py @@ -30,7 +30,7 @@ def test_dbt_aws_ecs_operator_add_global_flags() -> None: aws_conn_id="my-aws-conn-id", cluster="my-ecs-cluster", task_definition="my-dbt-task-definition", - container_name="my-dbt-container-name", + dbt_container_name="my-dbt-container-name", project_dir="my/dir", vars={ "start_time": "{{ data_interval_start.strftime('%Y%m%d%H%M%S') }}", @@ -185,6 +185,7 @@ def test_dbt_aes_ecs_overrides_parameter(): assert "containerOverrides" in actual_overrides actual_container_overrides = actual_overrides["containerOverrides"][0] + assert actual_container_overrides["name"] == "my-dbt-container-name" assert isinstance(actual_container_overrides["command"], list), "`command` should be of type list" assert "environment" in actual_container_overrides From c07084636c9696a7a40aa389c45160dd7f71e63c Mon Sep 17 00:00:00 2001 From: nicor88 <6278547+nicor88@users.noreply.github.com> Date: Fri, 7 Mar 2025 18:14:16 +0100 Subject: [PATCH 06/11] improvements --- cosmos/operators/aws_ecs.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cosmos/operators/aws_ecs.py b/cosmos/operators/aws_ecs.py index 7f1d70cd95..780eb5de51 100644 --- a/cosmos/operators/aws_ecs.py +++ b/cosmos/operators/aws_ecs.py @@ -40,7 +40,7 @@ class DbtAwsEcsBaseOperator(AbstractDbtBase, EcsRunTaskOperator): # type: ignor """ template_fields: Sequence[str] = tuple( - list(AbstractDbtBase.template_fields) + list(EcsRunTaskOperator.template_fields) + 'dbt_container_name' + list(AbstractDbtBase.template_fields) + list(EcsRunTaskOperator.template_fields) + list("dbt_container_name") ) intercept_flag = False @@ -50,7 +50,6 @@ def __init__( # arguments required by EcsRunTaskOperator cluster: str, task_definition: str, - dbt_container_name: str, # aws_conn_id: str = DEFAULT_CONN_ID, @@ -105,7 +104,6 @@ def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> # For the first round, we're going to assume that the command is dbt # This means that we don't have openlineage support, but we will create a ticket # to add that in the future - logger.debug('Container name: {}'.format(self.dbt_container_name)) self.dbt_executable_path = "dbt" dbt_cmd, env_vars = self.build_cmd(context=context, cmd_flags=cmd_flags) self.environment_variables = {**env_vars, **self.environment_variables} @@ -121,6 +119,7 @@ def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> ] } + class DbtBuildAwsEcsOperator(DbtBuildMixin, DbtAwsEcsBaseOperator): """ Executes a dbt core build command. From 59e95c1110f9f70b1c79149cf00da96a9213e2fc Mon Sep 17 00:00:00 2001 From: nicor88 <6278547+nicor88@users.noreply.github.com> Date: Mon, 10 Mar 2025 08:50:25 +0100 Subject: [PATCH 07/11] pr improvements --- cosmos/operators/aws_ecs.py | 6 +++--- tests/operators/test_aws_ecs.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cosmos/operators/aws_ecs.py b/cosmos/operators/aws_ecs.py index 780eb5de51..91a079961a 100644 --- a/cosmos/operators/aws_ecs.py +++ b/cosmos/operators/aws_ecs.py @@ -50,7 +50,7 @@ def __init__( # arguments required by EcsRunTaskOperator cluster: str, task_definition: str, - dbt_container_name: str, + container_name: str, # aws_conn_id: str = DEFAULT_CONN_ID, profile_config: ProfileConfig | None = None, @@ -61,13 +61,13 @@ def __init__( self.profile_config = profile_config self.command = command self.environment_variables = environment_variables or DEFAULT_ENVIRONMENT_VARIABLES - self.dbt_container_name = dbt_container_name kwargs.update( { "aws_conn_id": aws_conn_id, "task_definition": task_definition, "cluster": cluster, "overrides": None, + "container_name": container_name, } ) super().__init__(**kwargs) @@ -112,7 +112,7 @@ def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> self.overrides = { "containerOverrides": [ { - "name": self.dbt_container_name, + "name": self.container_name, "command": self.command, "environment": [{"name": key, "value": value} for key, value in self.environment_variables.items()], } diff --git a/tests/operators/test_aws_ecs.py b/tests/operators/test_aws_ecs.py index 2ab96b6636..af9ed226d4 100644 --- a/tests/operators/test_aws_ecs.py +++ b/tests/operators/test_aws_ecs.py @@ -30,7 +30,7 @@ def test_dbt_aws_ecs_operator_add_global_flags() -> None: aws_conn_id="my-aws-conn-id", cluster="my-ecs-cluster", task_definition="my-dbt-task-definition", - dbt_container_name="my-dbt-container-name", + container_name="my-dbt-container-name", project_dir="my/dir", vars={ "start_time": "{{ data_interval_start.strftime('%Y%m%d%H%M%S') }}", From 8f4935cda3f14298fc2e2dd9cf3e7ea815db9ac0 Mon Sep 17 00:00:00 2001 From: nicor88 <6278547+nicor88@users.noreply.github.com> Date: Mon, 10 Mar 2025 09:02:22 +0100 Subject: [PATCH 08/11] remove templated reference --- cosmos/operators/aws_ecs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cosmos/operators/aws_ecs.py b/cosmos/operators/aws_ecs.py index 91a079961a..ece7d0c472 100644 --- a/cosmos/operators/aws_ecs.py +++ b/cosmos/operators/aws_ecs.py @@ -40,7 +40,7 @@ class DbtAwsEcsBaseOperator(AbstractDbtBase, EcsRunTaskOperator): # type: ignor """ template_fields: Sequence[str] = tuple( - list(AbstractDbtBase.template_fields) + list(EcsRunTaskOperator.template_fields) + list("dbt_container_name") + list(AbstractDbtBase.template_fields) + list(EcsRunTaskOperator.template_fields) ) intercept_flag = False @@ -66,8 +66,8 @@ def __init__( "aws_conn_id": aws_conn_id, "task_definition": task_definition, "cluster": cluster, - "overrides": None, "container_name": container_name, + "overrides": None, } ) super().__init__(**kwargs) From 44621716a0d1dbce12bb195f2286b0363ec0912e Mon Sep 17 00:00:00 2001 From: nicor88 <6278547+nicor88@users.noreply.github.com> Date: Thu, 13 Mar 2025 09:19:49 +0100 Subject: [PATCH 09/11] add default container name --- cosmos/operators/aws_ecs.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cosmos/operators/aws_ecs.py b/cosmos/operators/aws_ecs.py index ece7d0c472..3e7bc46a7e 100644 --- a/cosmos/operators/aws_ecs.py +++ b/cosmos/operators/aws_ecs.py @@ -22,6 +22,7 @@ logger = get_logger(__name__) DEFAULT_CONN_ID = "aws_default" +DEFAULT_CONTAINER_NAME = "dbt" DEFAULT_ENVIRONMENT_VARIABLES: dict[str, str] = {} try: @@ -50,7 +51,7 @@ def __init__( # arguments required by EcsRunTaskOperator cluster: str, task_definition: str, - container_name: str, + container_name: str = DEFAULT_CONTAINER_NAME, # aws_conn_id: str = DEFAULT_CONN_ID, profile_config: ProfileConfig | None = None, From 418fbfd138beeb29b3179031111739b4798a1bec Mon Sep 17 00:00:00 2001 From: nicor88 <6278547+nicor88@users.noreply.github.com> Date: Thu, 13 Mar 2025 16:38:29 +0100 Subject: [PATCH 10/11] Update cosmos/operators/aws_ecs.py Co-authored-by: marianore-muttdata <115091420+marianore-muttdata@users.noreply.github.com> --- cosmos/operators/aws_ecs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/operators/aws_ecs.py b/cosmos/operators/aws_ecs.py index 3e7bc46a7e..64b0e36d88 100644 --- a/cosmos/operators/aws_ecs.py +++ b/cosmos/operators/aws_ecs.py @@ -52,8 +52,8 @@ def __init__( cluster: str, task_definition: str, container_name: str = DEFAULT_CONTAINER_NAME, - # aws_conn_id: str = DEFAULT_CONN_ID, + # profile_config: ProfileConfig | None = None, command: list[str] | None = None, environment_variables: dict[str, Any] | None = None, From 78bf46e6b49d7c57df226f06f22a033e83e3cdbb Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Thu, 13 Mar 2025 22:00:57 +0530 Subject: [PATCH 11/11] Update cosmos/operators/aws_ecs.py --- cosmos/operators/aws_ecs.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cosmos/operators/aws_ecs.py b/cosmos/operators/aws_ecs.py index 64b0e36d88..98061269ff 100644 --- a/cosmos/operators/aws_ecs.py +++ b/cosmos/operators/aws_ecs.py @@ -62,6 +62,7 @@ def __init__( self.profile_config = profile_config self.command = command self.environment_variables = environment_variables or DEFAULT_ENVIRONMENT_VARIABLES + self.container_name = container_name kwargs.update( { "aws_conn_id": aws_conn_id,