diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d97d52b378..0dc20ef3ac 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -503,6 +503,7 @@ jobs: SNOWFLAKE_SCHEMA: ${{ secrets.SNOWFLAKE_SCHEMA }} SNOWFLAKE_WAREHOUSE: ${{ secrets.SNOWFLAKE_WAREHOUSE }} SNOWFLAKE_DATABASE: ${{ secrets.SNOWFLAKE_DATABASE }} + AIRFLOW_CONN_GCP_GS_CONN: ${{ secrets.AIRFLOW_CONN_GCP_GS_CONN }} RESOURCE_PREFIX: ${{ steps.set-resource-prefix.outputs.prefix }} - name: Upload coverage to Github diff --git a/cosmos/profiles/base.py b/cosmos/profiles/base.py index 74fd3c3791..3de33d9119 100755 --- a/cosmos/profiles/base.py +++ b/cosmos/profiles/base.py @@ -232,6 +232,10 @@ def env_vars(self) -> dict[str, str]: env_vars[env_var_name] = str(value) + if self.disable_event_tracking: + env_vars["DBT_SEND_ANONYMOUS_USAGE_STATS"] = "False" + env_vars["DO_NOT_TRACK"] = "1" + return env_vars def get_profile_file_contents( @@ -258,10 +262,7 @@ def get_profile_file_contents( } if self.dbt_config_vars: - profile_contents["config"] = self.dbt_config_vars.as_dict() - - if self.disable_event_tracking: - profile_contents["config"] = {"send_anonymous_usage_stats": False} + profile_contents[profile_name]["config"] = self.dbt_config_vars.as_dict() return str(yaml.dump(profile_contents, indent=4)) diff --git a/dev/dags/dbt/jaffle_shop/profiles.yml b/dev/dags/dbt/jaffle_shop/profiles.yml index 4915fbd4e8..7e97cda01a 100644 --- a/dev/dags/dbt/jaffle_shop/profiles.yml +++ b/dev/dags/dbt/jaffle_shop/profiles.yml @@ -22,6 +22,8 @@ snowflake_profile: schema: "{{ env_var('SNOWFLAKE_SCHEMA') }}" warehouse: "{{ env_var('SNOWFLAKE_WAREHOUSE') }}" database: "{{ env_var('SNOWFLAKE_DATABASE') }}" + config: + send_anonymous_usage_stats: false postgres_profile: target: dev diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 2c987beaf7..95e95557ab 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -2171,9 +2171,9 @@ def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir assert hash_args == "d41d8cd98f00b204e9800998ecf8427e" if sys.platform == "darwin": # We faced inconsistent hashing versions depending on the version of MacOS/Linux - the following line aims to address these. - assert hash_dir in ("71afaf84962c855b0b67caf59c808521",) + assert hash_dir in ("74c478329e90725557d095879030b5e8",) else: - assert hash_dir == "85cba4ef17dd7c161938da6980a6ff85" + assert hash_dir == "633a523f295ef0cd496525428815537b" @patch("cosmos.dbt.graph.datetime") @@ -2211,9 +2211,9 @@ def test_save_yaml_selectors_cache(mock_variable_set, mock_datetime, tmp_dbt_pro if sys.platform == "darwin": # We faced inconsistent hashing versions depending on the version of MacOS/Linux - the following line aims to address these. - assert hash_dir in ("71afaf84962c855b0b67caf59c808521",) + assert hash_dir in ("74c478329e90725557d095879030b5e8",) else: - assert hash_dir == "85cba4ef17dd7c161938da6980a6ff85" + assert hash_dir == "633a523f295ef0cd496525428815537b" @pytest.mark.integration diff --git a/tests/profiles/test_base_profile.py b/tests/profiles/test_base_profile.py index 330a219b4f..70fbfa9cf5 100644 --- a/tests/profiles/test_base_profile.py +++ b/tests/profiles/test_base_profile.py @@ -51,9 +51,10 @@ def test_disable_event_tracking(disable_event_tracking: bool): ) profile_contents = yaml.safe_load(test_profile.get_profile_file_contents(profile_name="fake-profile-name")) - assert ("config" in profile_contents) == disable_event_tracking + assert "config" not in profile_contents if disable_event_tracking: - assert profile_contents["config"]["send_anonymous_usage_stats"] is False + assert test_profile.env_vars["DO_NOT_TRACK"] == "1" + assert test_profile.env_vars["DBT_SEND_ANONYMOUS_USAGE_STATS"] == "False" def test_disable_event_tracking_and_send_anonymous_usage_stats(): @@ -107,7 +108,7 @@ def test_dbt_config_vars_config(config: bool): ) profile_contents = yaml.safe_load(test_profile.get_profile_file_contents(profile_name="fake-profile-name")) - assert ("config" in profile_contents) == config + assert ("config" in profile_contents["fake-profile-name"]) == config @pytest.mark.parametrize("dbt_config_var,dbt_config_value", [("debug", True), ("debug", False)]) @@ -122,8 +123,8 @@ def test_validate_dbt_config_vars(dbt_config_var: str, dbt_config_value: Any): ) profile_contents = yaml.safe_load(test_profile.get_profile_file_contents(profile_name="fake-profile-name")) - assert "config" in profile_contents - assert profile_contents["config"][dbt_config_var] == dbt_config_value + assert "config" in profile_contents["fake-profile-name"] + assert profile_contents["fake-profile-name"]["config"][dbt_config_var] == dbt_config_value @pytest.mark.parametrize( diff --git a/tests/test_dbtf.py b/tests/test_dbtf.py index 7b0aa867ee..1c3d24dc1f 100644 --- a/tests/test_dbtf.py +++ b/tests/test_dbtf.py @@ -4,7 +4,9 @@ import pytest from airflow.utils.state import DagRunState -from cosmos import DbtDag, ExecutionConfig, ProfileConfig, ProjectConfig +from cosmos import DbtDag, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig +from cosmos.constants import InvocationMode +from cosmos.profiles import GoogleCloudServiceAccountDictProfileMapping DBT_FUSION_BINARY = Path.home() / ".local/bin/dbt" DBT_PROJECT_PATH = Path(__file__).parent.parent / "dev/dags/dbt/jaffle_shop" @@ -14,18 +16,30 @@ dbt_project_path=DBT_PROJECT_PATH, ) -profile_config = ProfileConfig( +snowflake_profile_config = ProfileConfig( profile_name="snowflake_profile", target_name="dev", profiles_yml_filepath=DBT_PROFILES_YAML_FILEPATH, ) -execution_config = ExecutionConfig(dbt_executable_path=DBT_FUSION_BINARY) +bigquery_profile_config = ProfileConfig( + profile_name="default", + target_name="dev", + profile_mapping=GoogleCloudServiceAccountDictProfileMapping( + conn_id="gcp_gs_conn", + profile_args={"dataset": "release_17", "project": "astronomer-dag-authoring"}, + disable_event_tracking=True, + ), +) + +render_config = RenderConfig(dbt_executable_path=DBT_FUSION_BINARY, invocation_mode=InvocationMode.SUBPROCESS) + +execution_config = ExecutionConfig(dbt_executable_path=DBT_FUSION_BINARY, invocation_mode=InvocationMode.SUBPROCESS) @pytest.mark.integration @pytest.mark.dbtfusion -def test_dbt_dag_with_dbt_fusion(): +def test_dbt_snowflake_dag_with_dbt_fusion(): """ Run a DbtDag using dbt Fusion. Confirm it succeeds and has the expected amount of both: @@ -36,7 +50,8 @@ def test_dbt_dag_with_dbt_fusion(): snowflake_dag = DbtDag( execution_config=execution_config, project_config=project_config, - profile_config=profile_config, + profile_config=snowflake_profile_config, + render_config=render_config, start_date=datetime(2023, 1, 1), dag_id="snowflake_dbt_fusion_dag", tags=["profiles"], @@ -64,3 +79,47 @@ def test_dbt_dag_with_dbt_fusion(): "orders.test", ] assert tasks_names == expected_task_names + + +@pytest.mark.integration +@pytest.mark.dbtfusion +def test_dbt_bigquery_dag_with_dbt_fusion(): + """ + Run a DbtDag using dbt Fusion. + Confirm it succeeds and has the expected amount of both: + - dbt resources + - Airflow tasks + And that the tasks are in the expected topological order. + """ + bigquery_dag = DbtDag( + execution_config=execution_config, + project_config=project_config, + profile_config=bigquery_profile_config, + render_config=render_config, + start_date=datetime(2023, 1, 1), + dag_id="bigquery_dbt_fusion_dag", + tags=["profiles"], + ) + outcome = bigquery_dag.test() + assert outcome.state == DagRunState.SUCCESS + + assert len(bigquery_dag.dbt_graph.filtered_nodes) == 23 + + assert len(bigquery_dag.task_dict) == 13 + tasks_names = [task.task_id for task in bigquery_dag.topological_sort()] + expected_task_names = [ + "raw_customers_seed", + "raw_orders_seed", + "raw_payments_seed", + "stg_customers.run", + "stg_customers.test", + "stg_orders.run", + "stg_orders.test", + "stg_payments.run", + "stg_payments.test", + "customers.run", + "customers.test", + "orders.run", + "orders.test", + ] + assert tasks_names == expected_task_names