From 86fcb1f1ae025d0f459d48a976ec31dfb140c70d Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 27 Aug 2024 21:26:27 +0530 Subject: [PATCH 01/12] Comment the dependencies temporarily causing deep resolution errors --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 2087b3f325..ef2f422d08 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -140,10 +140,10 @@ packages = ["/cosmos"] dependencies = [ "astronomer-cosmos[tests]", "apache-airflow-providers-cncf-kubernetes>=5.1.1", - "apache-airflow-providers-amazon[s3fs]>=3.0.0", +# "apache-airflow-providers-amazon[s3fs]>=3.0.0", "apache-airflow-providers-docker>=3.5.0", "apache-airflow-providers-google", - "apache-airflow-providers-microsoft-azure", +# "apache-airflow-providers-microsoft-azure", "apache-airflow-providers-postgres", "types-PyYAML", "types-attrs", From 1ffcdc1321974f9a0df1baf630a868296ea549ba Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 27 Aug 2024 22:13:44 +0530 Subject: [PATCH 02/12] Try next combination --- pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index ef2f422d08..887495763e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -141,9 +141,10 @@ dependencies = [ "astronomer-cosmos[tests]", "apache-airflow-providers-cncf-kubernetes>=5.1.1", # "apache-airflow-providers-amazon[s3fs]>=3.0.0", + "apache-airflow-providers-amazon>=3.0.0", "apache-airflow-providers-docker>=3.5.0", "apache-airflow-providers-google", -# "apache-airflow-providers-microsoft-azure", + "apache-airflow-providers-microsoft-azure", "apache-airflow-providers-postgres", "types-PyYAML", "types-attrs", From 1d9edfb9ad114754c925163c921bdbfea9bb9f0b Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 28 Aug 2024 11:08:21 +0530 Subject: [PATCH 03/12] Comment amazon provider dependency --- dev/dags/cosmos_manifest_example.py | 29 +++++++++++++++-------------- pyproject.toml | 1 - 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/dev/dags/cosmos_manifest_example.py b/dev/dags/cosmos_manifest_example.py index a96eff9718..cdf5c59d4e 100644 --- a/dev/dags/cosmos_manifest_example.py +++ b/dev/dags/cosmos_manifest_example.py @@ -55,19 +55,19 @@ def cosmos_manifest_example() -> None: # [END local_example] # [START aws_s3_example] - aws_s3_example = DbtTaskGroup( - group_id="aws_s3_example", - project_config=ProjectConfig( - manifest_path="s3://cosmos-manifest-test/manifest.json", - manifest_conn_id="aws_s3_conn", - # `manifest_conn_id` is optional. If not provided, the default connection ID `aws_default` is used. - project_name="jaffle_shop", - ), - profile_config=profile_config, - render_config=render_config, - execution_config=execution_config, - operator_args={"install_deps": True}, - ) + # aws_s3_example = DbtTaskGroup( + # group_id="aws_s3_example", + # project_config=ProjectConfig( + # manifest_path="s3://cosmos-manifest-test/manifest.json", + # manifest_conn_id="aws_s3_conn", + # # `manifest_conn_id` is optional. If not provided, the default connection ID `aws_default` is used. + # project_name="jaffle_shop", + # ), + # profile_config=profile_config, + # render_config=render_config, + # execution_config=execution_config, + # operator_args={"install_deps": True}, + # ) # [END aws_s3_example] # [START gcp_gs_example] @@ -104,7 +104,8 @@ def cosmos_manifest_example() -> None: post_dbt = EmptyOperator(task_id="post_dbt") - (pre_dbt >> local_example >> aws_s3_example >> gcp_gs_example >> azure_abfs_example >> post_dbt) + # (pre_dbt >> local_example >> aws_s3_example >> gcp_gs_example >> azure_abfs_example >> post_dbt) + (pre_dbt >> local_example >> gcp_gs_example >> azure_abfs_example >> post_dbt) cosmos_manifest_example() diff --git a/pyproject.toml b/pyproject.toml index 887495763e..b5e9cbdc26 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -141,7 +141,6 @@ dependencies = [ "astronomer-cosmos[tests]", "apache-airflow-providers-cncf-kubernetes>=5.1.1", # "apache-airflow-providers-amazon[s3fs]>=3.0.0", - "apache-airflow-providers-amazon>=3.0.0", "apache-airflow-providers-docker>=3.5.0", "apache-airflow-providers-google", "apache-airflow-providers-microsoft-azure", From e8dd86878b72f98b047d9a603f32cb84ff1c4d26 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 28 Aug 2024 11:29:23 +0530 Subject: [PATCH 04/12] Comment AWS EKS tests --- tests/operators/test_aws_eks.py | 194 ++++++++++++++++---------------- 1 file changed, 97 insertions(+), 97 deletions(-) diff --git a/tests/operators/test_aws_eks.py b/tests/operators/test_aws_eks.py index 35717a0617..9760e6f20f 100644 --- a/tests/operators/test_aws_eks.py +++ b/tests/operators/test_aws_eks.py @@ -1,97 +1,97 @@ -from unittest.mock import MagicMock, patch - -import pytest -from airflow.exceptions import AirflowException - -from cosmos.operators.aws_eks import ( - DbtBuildAwsEksOperator, - DbtLSAwsEksOperator, - DbtRunAwsEksOperator, - DbtSeedAwsEksOperator, - DbtTestAwsEksOperator, -) - - -@pytest.fixture() -def mock_kubernetes_execute(): - with patch("cosmos.operators.kubernetes.KubernetesPodOperator.execute") as mock_execute: - yield mock_execute - - -base_kwargs = { - "conn_id": "my_airflow_connection", - "cluster_name": "my-cluster", - "task_id": "my-task", - "image": "my_image", - "project_dir": "my/dir", - "vars": { - "start_time": "{{ data_interval_start.strftime('%Y%m%d%H%M%S') }}", - "end_time": "{{ data_interval_end.strftime('%Y%m%d%H%M%S') }}", - }, - "no_version_check": True, -} - - -def test_dbt_kubernetes_build_command(): - """ - Since we know that the KubernetesOperator is tested, we can just test that the - command is built correctly and added to the "arguments" parameter. - """ - - result_map = { - "ls": DbtLSAwsEksOperator(**base_kwargs), - "run": DbtRunAwsEksOperator(**base_kwargs), - "test": DbtTestAwsEksOperator(**base_kwargs), - "build": DbtBuildAwsEksOperator(**base_kwargs), - "seed": DbtSeedAwsEksOperator(**base_kwargs), - } - - for command_name, command_operator in result_map.items(): - command_operator.build_kube_args(context=MagicMock(), cmd_flags=MagicMock()) - assert command_operator.arguments == [ - "dbt", - command_name, - "--vars", - "end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n" - "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n", - "--no-version-check", - "--project-dir", - "my/dir", - ] - - -@patch("cosmos.operators.kubernetes.DbtKubernetesBaseOperator.build_kube_args") -@patch("cosmos.operators.aws_eks.EksHook.generate_config_file") -def test_dbt_kubernetes_operator_execute(mock_generate_config_file, mock_build_kube_args, mock_kubernetes_execute): - """Tests that the execute method call results in both the build_kube_args method and the kubernetes execute method being called.""" - operator = DbtLSAwsEksOperator( - conn_id="my_airflow_connection", - cluster_name="my-cluster", - task_id="my-task", - image="my_image", - project_dir="my/dir", - ) - operator.execute(context={}) - # Assert that the build_kube_args method was called in the execution - mock_build_kube_args.assert_called_once() - - # Assert that the generate_config_file method was called in the execution to create the kubeconfig for eks - mock_generate_config_file.assert_called_once_with(eks_cluster_name="my-cluster", pod_namespace="default") - - # Assert that the kubernetes execute method was called in the execution - mock_kubernetes_execute.assert_called_once() - assert mock_kubernetes_execute.call_args.args[-1] == {} - - -def test_provided_config_file_fails(): - """Tests that the constructor fails if it is called with a config_file.""" - with pytest.raises(AirflowException) as err_context: - DbtLSAwsEksOperator( - conn_id="my_airflow_connection", - cluster_name="my-cluster", - task_id="my-task", - image="my_image", - project_dir="my/dir", - config_file="my/config", - ) - assert "The config_file is not an allowed parameter for the EksPodOperator." in str(err_context.value) +# from unittest.mock import MagicMock, patch +# +# import pytest +# from airflow.exceptions import AirflowException +# +# from cosmos.operators.aws_eks import ( +# DbtBuildAwsEksOperator, +# DbtLSAwsEksOperator, +# DbtRunAwsEksOperator, +# DbtSeedAwsEksOperator, +# DbtTestAwsEksOperator, +# ) +# +# +# @pytest.fixture() +# def mock_kubernetes_execute(): +# with patch("cosmos.operators.kubernetes.KubernetesPodOperator.execute") as mock_execute: +# yield mock_execute +# +# +# base_kwargs = { +# "conn_id": "my_airflow_connection", +# "cluster_name": "my-cluster", +# "task_id": "my-task", +# "image": "my_image", +# "project_dir": "my/dir", +# "vars": { +# "start_time": "{{ data_interval_start.strftime('%Y%m%d%H%M%S') }}", +# "end_time": "{{ data_interval_end.strftime('%Y%m%d%H%M%S') }}", +# }, +# "no_version_check": True, +# } +# +# +# def test_dbt_kubernetes_build_command(): +# """ +# Since we know that the KubernetesOperator is tested, we can just test that the +# command is built correctly and added to the "arguments" parameter. +# """ +# +# result_map = { +# "ls": DbtLSAwsEksOperator(**base_kwargs), +# "run": DbtRunAwsEksOperator(**base_kwargs), +# "test": DbtTestAwsEksOperator(**base_kwargs), +# "build": DbtBuildAwsEksOperator(**base_kwargs), +# "seed": DbtSeedAwsEksOperator(**base_kwargs), +# } +# +# for command_name, command_operator in result_map.items(): +# command_operator.build_kube_args(context=MagicMock(), cmd_flags=MagicMock()) +# assert command_operator.arguments == [ +# "dbt", +# command_name, +# "--vars", +# "end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n" +# "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n", +# "--no-version-check", +# "--project-dir", +# "my/dir", +# ] +# +# +# @patch("cosmos.operators.kubernetes.DbtKubernetesBaseOperator.build_kube_args") +# @patch("cosmos.operators.aws_eks.EksHook.generate_config_file") +# def test_dbt_kubernetes_operator_execute(mock_generate_config_file, mock_build_kube_args, mock_kubernetes_execute): +# """Tests that the execute method call results in both the build_kube_args method and the kubernetes execute method being called.""" +# operator = DbtLSAwsEksOperator( +# conn_id="my_airflow_connection", +# cluster_name="my-cluster", +# task_id="my-task", +# image="my_image", +# project_dir="my/dir", +# ) +# operator.execute(context={}) +# # Assert that the build_kube_args method was called in the execution +# mock_build_kube_args.assert_called_once() +# +# # Assert that the generate_config_file method was called in the execution to create the kubeconfig for eks +# mock_generate_config_file.assert_called_once_with(eks_cluster_name="my-cluster", pod_namespace="default") +# +# # Assert that the kubernetes execute method was called in the execution +# mock_kubernetes_execute.assert_called_once() +# assert mock_kubernetes_execute.call_args.args[-1] == {} +# +# +# def test_provided_config_file_fails(): +# """Tests that the constructor fails if it is called with a config_file.""" +# with pytest.raises(AirflowException) as err_context: +# DbtLSAwsEksOperator( +# conn_id="my_airflow_connection", +# cluster_name="my-cluster", +# task_id="my-task", +# image="my_image", +# project_dir="my/dir", +# config_file="my/config", +# ) +# assert "The config_file is not an allowed parameter for the EksPodOperator." in str(err_context.value) From 1fab2df7a14cf9a0f311926d8fd4ee1e4c74c3eb Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 28 Aug 2024 11:43:05 +0530 Subject: [PATCH 05/12] Restrict amazon provider package to <8.28 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index b5e9cbdc26..76ea1aed74 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -140,7 +140,7 @@ packages = ["/cosmos"] dependencies = [ "astronomer-cosmos[tests]", "apache-airflow-providers-cncf-kubernetes>=5.1.1", -# "apache-airflow-providers-amazon[s3fs]>=3.0.0", + "apache-airflow-providers-amazon[s3fs]>=3.0.0,<8.28.0", "apache-airflow-providers-docker>=3.5.0", "apache-airflow-providers-google", "apache-airflow-providers-microsoft-azure", From 82f9b69f2983f9798bf9ebb4cca5b0e719f8cb00 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 28 Aug 2024 12:52:20 +0530 Subject: [PATCH 06/12] Use Airflow 2.8 instead of 2.7 for ancillary jobs --- .github/workflows/test.yml | 8 ++++---- pyproject.toml | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index be33c90f65..c29f5d087b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -32,7 +32,7 @@ jobs: architecture: "x64" - run: pip3 install hatch - - run: hatch run tests.py3.9-2.7:type-check + - run: hatch run tests.py3.9-2.8:type-check Run-Unit-Tests: runs-on: ubuntu-latest @@ -254,7 +254,7 @@ jobs: strategy: matrix: python-version: ["3.11"] - airflow-version: ["2.7"] + airflow-version: ["2.8"] steps: - uses: actions/checkout@v3 @@ -319,7 +319,7 @@ jobs: strategy: matrix: python-version: [ "3.11" ] - airflow-version: [ "2.7" ] + airflow-version: [ "2.8" ] services: postgres: image: postgres @@ -395,7 +395,7 @@ jobs: strategy: matrix: python-version: ["3.11"] - airflow-version: ["2.7"] + airflow-version: ["2.8"] num-models: [1, 10, 50, 100] services: postgres: diff --git a/pyproject.toml b/pyproject.toml index 76ea1aed74..2087b3f325 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -140,7 +140,7 @@ packages = ["/cosmos"] dependencies = [ "astronomer-cosmos[tests]", "apache-airflow-providers-cncf-kubernetes>=5.1.1", - "apache-airflow-providers-amazon[s3fs]>=3.0.0,<8.28.0", + "apache-airflow-providers-amazon[s3fs]>=3.0.0", "apache-airflow-providers-docker>=3.5.0", "apache-airflow-providers-google", "apache-airflow-providers-microsoft-azure", From 556949790daf1cc6744bca716854572fe5757912 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 28 Aug 2024 14:34:33 +0530 Subject: [PATCH 07/12] Run workflow on branch --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c29f5d087b..45e376a9e1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,7 +2,7 @@ name: test on: push: # Run on pushes to the default branch - branches: [main] + branches: [main,fix-ci] pull_request_target: # Also run on pull requests originated from forks branches: [main] From 26d38332dddb97bef6134a7659a587cb1e729444 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 29 Aug 2024 02:31:47 +0530 Subject: [PATCH 08/12] Use minified constraints style install of airflow providers --- scripts/test/pre-install-airflow.sh | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index 08dcf042d6..929d9f51aa 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -23,3 +23,12 @@ pip install uv uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt uv pip install pydantic --constraint /tmp/constraint.txt rm /tmp/constraint.txt + +if [ "$AIRFLOW_VERSION" = "2.7" ]; then + uv pip install "apache-airflow-providers-amazon[s3fs]==8.27.0" + uv pip install "apache-airflow-providers-cncf-kubernetes==8.3.4" + uv pip install "apache-airflow-providers-docker==3.12.3" + uv pip install "apache-airflow-providers-google==10.21.0" + uv pip install "apache-airflow-providers-microsoft-azure==10.3.0" + uv pip install "apache-airflow-providers-postgres==5.11.3" +fi From e4f0898d9eeea7e5752e415866c281a6327704e5 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 29 Aug 2024 02:33:10 +0530 Subject: [PATCH 09/12] Undo commented out code in DAGs & tests --- dev/dags/cosmos_manifest_example.py | 29 ++--- tests/operators/test_aws_eks.py | 194 ++++++++++++++-------------- 2 files changed, 111 insertions(+), 112 deletions(-) diff --git a/dev/dags/cosmos_manifest_example.py b/dev/dags/cosmos_manifest_example.py index cdf5c59d4e..a96eff9718 100644 --- a/dev/dags/cosmos_manifest_example.py +++ b/dev/dags/cosmos_manifest_example.py @@ -55,19 +55,19 @@ def cosmos_manifest_example() -> None: # [END local_example] # [START aws_s3_example] - # aws_s3_example = DbtTaskGroup( - # group_id="aws_s3_example", - # project_config=ProjectConfig( - # manifest_path="s3://cosmos-manifest-test/manifest.json", - # manifest_conn_id="aws_s3_conn", - # # `manifest_conn_id` is optional. If not provided, the default connection ID `aws_default` is used. - # project_name="jaffle_shop", - # ), - # profile_config=profile_config, - # render_config=render_config, - # execution_config=execution_config, - # operator_args={"install_deps": True}, - # ) + aws_s3_example = DbtTaskGroup( + group_id="aws_s3_example", + project_config=ProjectConfig( + manifest_path="s3://cosmos-manifest-test/manifest.json", + manifest_conn_id="aws_s3_conn", + # `manifest_conn_id` is optional. If not provided, the default connection ID `aws_default` is used. + project_name="jaffle_shop", + ), + profile_config=profile_config, + render_config=render_config, + execution_config=execution_config, + operator_args={"install_deps": True}, + ) # [END aws_s3_example] # [START gcp_gs_example] @@ -104,8 +104,7 @@ def cosmos_manifest_example() -> None: post_dbt = EmptyOperator(task_id="post_dbt") - # (pre_dbt >> local_example >> aws_s3_example >> gcp_gs_example >> azure_abfs_example >> post_dbt) - (pre_dbt >> local_example >> gcp_gs_example >> azure_abfs_example >> post_dbt) + (pre_dbt >> local_example >> aws_s3_example >> gcp_gs_example >> azure_abfs_example >> post_dbt) cosmos_manifest_example() diff --git a/tests/operators/test_aws_eks.py b/tests/operators/test_aws_eks.py index 9760e6f20f..35717a0617 100644 --- a/tests/operators/test_aws_eks.py +++ b/tests/operators/test_aws_eks.py @@ -1,97 +1,97 @@ -# from unittest.mock import MagicMock, patch -# -# import pytest -# from airflow.exceptions import AirflowException -# -# from cosmos.operators.aws_eks import ( -# DbtBuildAwsEksOperator, -# DbtLSAwsEksOperator, -# DbtRunAwsEksOperator, -# DbtSeedAwsEksOperator, -# DbtTestAwsEksOperator, -# ) -# -# -# @pytest.fixture() -# def mock_kubernetes_execute(): -# with patch("cosmos.operators.kubernetes.KubernetesPodOperator.execute") as mock_execute: -# yield mock_execute -# -# -# base_kwargs = { -# "conn_id": "my_airflow_connection", -# "cluster_name": "my-cluster", -# "task_id": "my-task", -# "image": "my_image", -# "project_dir": "my/dir", -# "vars": { -# "start_time": "{{ data_interval_start.strftime('%Y%m%d%H%M%S') }}", -# "end_time": "{{ data_interval_end.strftime('%Y%m%d%H%M%S') }}", -# }, -# "no_version_check": True, -# } -# -# -# def test_dbt_kubernetes_build_command(): -# """ -# Since we know that the KubernetesOperator is tested, we can just test that the -# command is built correctly and added to the "arguments" parameter. -# """ -# -# result_map = { -# "ls": DbtLSAwsEksOperator(**base_kwargs), -# "run": DbtRunAwsEksOperator(**base_kwargs), -# "test": DbtTestAwsEksOperator(**base_kwargs), -# "build": DbtBuildAwsEksOperator(**base_kwargs), -# "seed": DbtSeedAwsEksOperator(**base_kwargs), -# } -# -# for command_name, command_operator in result_map.items(): -# command_operator.build_kube_args(context=MagicMock(), cmd_flags=MagicMock()) -# assert command_operator.arguments == [ -# "dbt", -# command_name, -# "--vars", -# "end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n" -# "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n", -# "--no-version-check", -# "--project-dir", -# "my/dir", -# ] -# -# -# @patch("cosmos.operators.kubernetes.DbtKubernetesBaseOperator.build_kube_args") -# @patch("cosmos.operators.aws_eks.EksHook.generate_config_file") -# def test_dbt_kubernetes_operator_execute(mock_generate_config_file, mock_build_kube_args, mock_kubernetes_execute): -# """Tests that the execute method call results in both the build_kube_args method and the kubernetes execute method being called.""" -# operator = DbtLSAwsEksOperator( -# conn_id="my_airflow_connection", -# cluster_name="my-cluster", -# task_id="my-task", -# image="my_image", -# project_dir="my/dir", -# ) -# operator.execute(context={}) -# # Assert that the build_kube_args method was called in the execution -# mock_build_kube_args.assert_called_once() -# -# # Assert that the generate_config_file method was called in the execution to create the kubeconfig for eks -# mock_generate_config_file.assert_called_once_with(eks_cluster_name="my-cluster", pod_namespace="default") -# -# # Assert that the kubernetes execute method was called in the execution -# mock_kubernetes_execute.assert_called_once() -# assert mock_kubernetes_execute.call_args.args[-1] == {} -# -# -# def test_provided_config_file_fails(): -# """Tests that the constructor fails if it is called with a config_file.""" -# with pytest.raises(AirflowException) as err_context: -# DbtLSAwsEksOperator( -# conn_id="my_airflow_connection", -# cluster_name="my-cluster", -# task_id="my-task", -# image="my_image", -# project_dir="my/dir", -# config_file="my/config", -# ) -# assert "The config_file is not an allowed parameter for the EksPodOperator." in str(err_context.value) +from unittest.mock import MagicMock, patch + +import pytest +from airflow.exceptions import AirflowException + +from cosmos.operators.aws_eks import ( + DbtBuildAwsEksOperator, + DbtLSAwsEksOperator, + DbtRunAwsEksOperator, + DbtSeedAwsEksOperator, + DbtTestAwsEksOperator, +) + + +@pytest.fixture() +def mock_kubernetes_execute(): + with patch("cosmos.operators.kubernetes.KubernetesPodOperator.execute") as mock_execute: + yield mock_execute + + +base_kwargs = { + "conn_id": "my_airflow_connection", + "cluster_name": "my-cluster", + "task_id": "my-task", + "image": "my_image", + "project_dir": "my/dir", + "vars": { + "start_time": "{{ data_interval_start.strftime('%Y%m%d%H%M%S') }}", + "end_time": "{{ data_interval_end.strftime('%Y%m%d%H%M%S') }}", + }, + "no_version_check": True, +} + + +def test_dbt_kubernetes_build_command(): + """ + Since we know that the KubernetesOperator is tested, we can just test that the + command is built correctly and added to the "arguments" parameter. + """ + + result_map = { + "ls": DbtLSAwsEksOperator(**base_kwargs), + "run": DbtRunAwsEksOperator(**base_kwargs), + "test": DbtTestAwsEksOperator(**base_kwargs), + "build": DbtBuildAwsEksOperator(**base_kwargs), + "seed": DbtSeedAwsEksOperator(**base_kwargs), + } + + for command_name, command_operator in result_map.items(): + command_operator.build_kube_args(context=MagicMock(), cmd_flags=MagicMock()) + assert command_operator.arguments == [ + "dbt", + command_name, + "--vars", + "end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n" + "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n", + "--no-version-check", + "--project-dir", + "my/dir", + ] + + +@patch("cosmos.operators.kubernetes.DbtKubernetesBaseOperator.build_kube_args") +@patch("cosmos.operators.aws_eks.EksHook.generate_config_file") +def test_dbt_kubernetes_operator_execute(mock_generate_config_file, mock_build_kube_args, mock_kubernetes_execute): + """Tests that the execute method call results in both the build_kube_args method and the kubernetes execute method being called.""" + operator = DbtLSAwsEksOperator( + conn_id="my_airflow_connection", + cluster_name="my-cluster", + task_id="my-task", + image="my_image", + project_dir="my/dir", + ) + operator.execute(context={}) + # Assert that the build_kube_args method was called in the execution + mock_build_kube_args.assert_called_once() + + # Assert that the generate_config_file method was called in the execution to create the kubeconfig for eks + mock_generate_config_file.assert_called_once_with(eks_cluster_name="my-cluster", pod_namespace="default") + + # Assert that the kubernetes execute method was called in the execution + mock_kubernetes_execute.assert_called_once() + assert mock_kubernetes_execute.call_args.args[-1] == {} + + +def test_provided_config_file_fails(): + """Tests that the constructor fails if it is called with a config_file.""" + with pytest.raises(AirflowException) as err_context: + DbtLSAwsEksOperator( + conn_id="my_airflow_connection", + cluster_name="my-cluster", + task_id="my-task", + image="my_image", + project_dir="my/dir", + config_file="my/config", + ) + assert "The config_file is not an allowed parameter for the EksPodOperator." in str(err_context.value) From 5401e6cc0bf254db34b2e15142a255e8eb3db0ed Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 29 Aug 2024 03:56:11 +0530 Subject: [PATCH 10/12] Install with constraints --- scripts/test/pre-install-airflow.sh | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index 929d9f51aa..29ec01038a 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -22,13 +22,15 @@ mv /tmp/constraint.txt.tmp /tmp/constraint.txt pip install uv uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt uv pip install pydantic --constraint /tmp/constraint.txt -rm /tmp/constraint.txt + if [ "$AIRFLOW_VERSION" = "2.7" ]; then - uv pip install "apache-airflow-providers-amazon[s3fs]==8.27.0" - uv pip install "apache-airflow-providers-cncf-kubernetes==8.3.4" - uv pip install "apache-airflow-providers-docker==3.12.3" - uv pip install "apache-airflow-providers-google==10.21.0" - uv pip install "apache-airflow-providers-microsoft-azure==10.3.0" - uv pip install "apache-airflow-providers-postgres==5.11.3" + uv pip install "apache-airflow-providers-amazon[s3fs]" --constraint /tmp/constraint.txt + uv pip install "apache-airflow-providers-cncf-kubernetes" --constraint /tmp/constraint.txt + uv pip install "apache-airflow-providers-docker" --constraint /tmp/constraint.txt + uv pip install "apache-airflow-providers-google" --constraint /tmp/constraint.txt + uv pip install "apache-airflow-providers-microsoft-azure" --constraint /tmp/constraint.txt + uv pip install "apache-airflow-providers-postgres" --constraint /tmp/constraint.txt fi + +rm /tmp/constraint.txt From f059ab238f2baaef6403773d45161f4ea44750ce Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 29 Aug 2024 17:51:43 +0530 Subject: [PATCH 11/12] Exclude Python 3.8 & 3.9 with Apache Airflow 2.7 in unit & integration test jobs --- .github/workflows/test.yml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 45e376a9e1..932855ea41 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -59,6 +59,12 @@ jobs: airflow-version: "2.7" - python-version: "3.12" airflow-version: "2.8" + # It's observed that the dependencies resolution for Apache Airflow versions 2.7 are error-ring out with deep + # resolutions. This is a temporary exclusion until the issue is resolved. + - python-version: "3.8" + airflow-version: "2.7" + - python-version: "3.9" + airflow-version: "2.7" steps: - uses: actions/checkout@v3 with: @@ -104,6 +110,12 @@ jobs: airflow-version: "2.4" - python-version: "3.11" airflow-version: "2.5" + # It's observed that the dependencies resolution for Apache Airflow versions 2.7 are error-ring out with deep + # resolutions. This is a temporary exclusion until the issue is resolved. + - python-version: "3.8" + airflow-version: "2.7" + - python-version: "3.9" + airflow-version: "2.7" services: postgres: image: postgres From 5af98d7b4d4bda3434f00f7c683ec000a4646e39 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 29 Aug 2024 20:41:18 +0530 Subject: [PATCH 12/12] Update .github/workflows/test.yml --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 932855ea41..846872bff2 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,7 +2,7 @@ name: test on: push: # Run on pushes to the default branch - branches: [main,fix-ci] + branches: [main] pull_request_target: # Also run on pull requests originated from forks branches: [main]