From e2fcfa06e36cd44494af327317e5759237e95865 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 30 Jul 2025 12:09:39 +0530 Subject: [PATCH 01/10] Try fixing dbt fusion tests --- .github/workflows/test.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b1b6b88468..16139bbcc5 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,7 +2,7 @@ name: test on: push: # Run on pushes to the default branch - branches: [main] + branches: [main,fix-dbt-fusion] # Also run on pull requests originating from forks. Although this is insecure by default, we need it to run # integration tests on forked PRs. As a guardrail, we’ve added an Authorize step to each job, which requires manually # approving the workflow run for each pushed commit. Approval only happens after a careful code review of the changes. @@ -489,7 +489,7 @@ jobs: SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }} SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }} SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }} - SNOWFLAKE_SCHEMA: ${{ secrets.SNOWFLAKE_SCHEMA }} + SNOWFLAKE_SCHEMA: COSMOS_${{ github.event.pull_request.number || github.run_id }}_Py${{ matrix.python-version }}_AF${{ matrix.airflow-version }}_DBT${{ matrix.dbt-version }} SNOWFLAKE_WAREHOUSE: ${{ secrets.SNOWFLAKE_WAREHOUSE }} SNOWFLAKE_DATABASE: ${{ secrets.SNOWFLAKE_DATABASE }} From 3fc7e1e13bd42e8e343991ae5307d09a1c6f8249 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 30 Jul 2025 13:45:25 +0530 Subject: [PATCH 02/10] Use macro to set resource prefix --- .github/workflows/test.yml | 10 +++++++++- .../dbt/jaffle_shop/macros/generate_alias_name.sql | 8 ++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) create mode 100644 dev/dags/dbt/jaffle_shop/macros/generate_alias_name.sql diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 16139bbcc5..fa7050238e 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -477,6 +477,13 @@ jobs: uv pip install --system hatch hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} run pip freeze + - name: Set RESOURCE_PREFIX without periods + run: | + PYTHON_VER=$(echo "${{ matrix.python-version }}" | tr -d '.') + AIRFLOW_VER=$(echo "${{ matrix.airflow-version }}" | tr -d '.') + DBT_VER=$(echo "${{ matrix.dbt-version }}" | tr -d '.') + echo "RESOURCE_PREFIX=COSMOS_${{ github.run_id }}_Py${PYTHON_VER}_AF${AIRFLOW_VER}_DBT${DBT_VER}" >> $GITHUB_ENV + - name: Test Cosmos against Airflow ${{ matrix.airflow-version }}, Python ${{ matrix.python-version }} and dbt ${{ matrix.dbt-version }} run: | hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-integration-dbtf-setup @@ -489,7 +496,8 @@ jobs: SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }} SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }} SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }} - SNOWFLAKE_SCHEMA: COSMOS_${{ github.event.pull_request.number || github.run_id }}_Py${{ matrix.python-version }}_AF${{ matrix.airflow-version }}_DBT${{ matrix.dbt-version }} + SNOWFLAKE_SCHEMA: ${{ secrets.SNOWFLAKE_SCHEMA }} + RESOURCE_PREFIX: ${{ env.RESOURCE_PREFIX }} SNOWFLAKE_WAREHOUSE: ${{ secrets.SNOWFLAKE_WAREHOUSE }} SNOWFLAKE_DATABASE: ${{ secrets.SNOWFLAKE_DATABASE }} diff --git a/dev/dags/dbt/jaffle_shop/macros/generate_alias_name.sql b/dev/dags/dbt/jaffle_shop/macros/generate_alias_name.sql new file mode 100644 index 0000000000..6e482be95e --- /dev/null +++ b/dev/dags/dbt/jaffle_shop/macros/generate_alias_name.sql @@ -0,0 +1,8 @@ +{% macro generate_alias_name(custom_alias_name=None, node=None) -%} + {%- set base = custom_alias_name if custom_alias_name else node.name -%} + {%- if env_var('RESOURCE_PREFIX', '') -%} + {{ return(env_var('RESOURCE_PREFIX') ~ '_' ~ base) }} + {%- else -%} + {{ base }} + {%- endif -%} +{%- endmacro %} From febe9b735baf351d1cff95a62a0d83a2a8ea4071 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 30 Jul 2025 14:34:42 +0530 Subject: [PATCH 03/10] Correct hashdir post changes in project --- tests/dbt/test_graph.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 4b125bf471..38f993cfb6 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1908,9 +1908,9 @@ def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir assert hash_args == "d41d8cd98f00b204e9800998ecf8427e" if sys.platform == "darwin": # We faced inconsistent hashing versions depending on the version of MacOS/Linux - the following line aims to address these. - assert hash_dir in ("c2c47529eaec412281bdb243a479b734", "fa93914ecb491cc40a17ab956397359a") + assert hash_dir in ("c2c47529eaec412281bdb243a479b734", "71bbf303ad4e06a7b1e2be20e0b73c0d") else: - assert hash_dir == "fa93914ecb491cc40a17ab956397359a" + assert hash_dir == "71bbf303ad4e06a7b1e2be20e0b73c0d" @pytest.mark.integration From 485e467e71375a167be55dd3c52938aa5dbae64e Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 30 Jul 2025 16:39:38 +0530 Subject: [PATCH 04/10] Cleanup snowflake resources created by the CI job --- .github/workflows/test.yml | 18 +++++++- scripts/ci_dbtf_delete_snowflake_resources.py | 45 +++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 scripts/ci_dbtf_delete_snowflake_resources.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index fa7050238e..9f521d1c8d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -497,9 +497,9 @@ jobs: SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }} SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }} SNOWFLAKE_SCHEMA: ${{ secrets.SNOWFLAKE_SCHEMA }} - RESOURCE_PREFIX: ${{ env.RESOURCE_PREFIX }} SNOWFLAKE_WAREHOUSE: ${{ secrets.SNOWFLAKE_WAREHOUSE }} SNOWFLAKE_DATABASE: ${{ secrets.SNOWFLAKE_DATABASE }} + RESOURCE_PREFIX: ${{ env.RESOURCE_PREFIX }} - name: Upload coverage to Github uses: actions/upload-artifact@v4 @@ -508,6 +508,22 @@ jobs: path: .coverage include-hidden-files: true + - name: Clean up Snowflake resources + if: always() + run: | + pip install snowflake-connector-python + # Trigger a python script to delete the resources + python scripts/ci_dbtf_delete_snowflake_resources.py + env: + SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }} + SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }} + SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }} + SNOWFLAKE_SCHEMA: ${{ secrets.SNOWFLAKE_SCHEMA }} + SNOWFLAKE_WAREHOUSE: ${{ secrets.SNOWFLAKE_WAREHOUSE }} + SNOWFLAKE_DATABASE: ${{ secrets.SNOWFLAKE_DATABASE }} + RESOURCE_PREFIX: ${{ env.RESOURCE_PREFIX }} + + Run-Performance-Tests: needs: Authorize runs-on: ubuntu-latest diff --git a/scripts/ci_dbtf_delete_snowflake_resources.py b/scripts/ci_dbtf_delete_snowflake_resources.py new file mode 100644 index 0000000000..2535925a36 --- /dev/null +++ b/scripts/ci_dbtf_delete_snowflake_resources.py @@ -0,0 +1,45 @@ +import os + +import snowflake.connector + + +def delete_snowflake_resource(): + """ + Delete Snowflake resources with a given prefix(set as an environment variable). + """ + conn = snowflake.connector.connect( + user=os.environ["SNOWFLAKE_USER"], + password=os.environ["SNOWFLAKE_PASSWORD"], + account=os.environ["SNOWFLAKE_ACCOUNT"], + warehouse=os.environ["SNOWFLAKE_WAREHOUSE"], + database=os.environ["SNOWFLAKE_DATABASE"], + schema=os.environ["SNOWFLAKE_SCHEMA"], + ) + prefix = os.getenv("RESOURCE_PREFIX", "") + if prefix: + cursor = conn.cursor() + cursor.execute( + f""" + SELECT table_name, table_type + FROM information_schema.tables + WHERE table_schema = '{os.environ['SNOWFLAKE_SCHEMA']}' + AND table_name LIKE '{prefix}_%' + """ + ) + + resources = cursor.fetchall() + + for resource_name, resource_type in resources: + if resource_type == "BASE TABLE": + cursor.execute(f"DROP TABLE IF EXISTS {resource_name}") + elif resource_type == "VIEW": + cursor.execute(f"DROP VIEW IF EXISTS {resource_name}") + cursor.close() + print(f"Deleted {len(resources)} resources") + else: + print("No resources to delete") + conn.close() + + +if __name__ == "__main__": + delete_snowflake_resource() From f9de8381a7c92c9a4d387e41514f7be3add51123 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 30 Jul 2025 16:50:07 +0530 Subject: [PATCH 05/10] Print query --- .github/workflows/test.yml | 2 +- scripts/ci_dbtf_delete_snowflake_resources.py | 17 ++++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9f521d1c8d..bcf2587df7 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -482,7 +482,7 @@ jobs: PYTHON_VER=$(echo "${{ matrix.python-version }}" | tr -d '.') AIRFLOW_VER=$(echo "${{ matrix.airflow-version }}" | tr -d '.') DBT_VER=$(echo "${{ matrix.dbt-version }}" | tr -d '.') - echo "RESOURCE_PREFIX=COSMOS_${{ github.run_id }}_Py${PYTHON_VER}_AF${AIRFLOW_VER}_DBT${DBT_VER}" >> $GITHUB_ENV + echo "RESOURCE_PREFIX=COSMOS_${{ github.run_id }}_PY${PYTHON_VER}_AF${AIRFLOW_VER}_DBT${DBT_VER}" >> $GITHUB_ENV - name: Test Cosmos against Airflow ${{ matrix.airflow-version }}, Python ${{ matrix.python-version }} and dbt ${{ matrix.dbt-version }} run: | diff --git a/scripts/ci_dbtf_delete_snowflake_resources.py b/scripts/ci_dbtf_delete_snowflake_resources.py index 2535925a36..3f2d982116 100644 --- a/scripts/ci_dbtf_delete_snowflake_resources.py +++ b/scripts/ci_dbtf_delete_snowflake_resources.py @@ -18,15 +18,14 @@ def delete_snowflake_resource(): prefix = os.getenv("RESOURCE_PREFIX", "") if prefix: cursor = conn.cursor() - cursor.execute( - f""" - SELECT table_name, table_type - FROM information_schema.tables - WHERE table_schema = '{os.environ['SNOWFLAKE_SCHEMA']}' - AND table_name LIKE '{prefix}_%' - """ - ) - + query = f""" + SELECT table_name, table_type + FROM information_schema.tables + WHERE table_schema = '{os.environ['SNOWFLAKE_SCHEMA']}' + AND table_name LIKE '{prefix}_%' + """ + print(query) + cursor.execute(query) resources = cursor.fetchall() for resource_name, resource_type in resources: From 3557548366ca614bbcff54643d80fd6346d99078 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 30 Jul 2025 17:03:43 +0530 Subject: [PATCH 06/10] Use safer approach than for using resource prefix --- .github/workflows/test.yml | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index bcf2587df7..f32f66471f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -478,11 +478,13 @@ jobs: hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} run pip freeze - name: Set RESOURCE_PREFIX without periods + id: set-resource-prefix run: | PYTHON_VER=$(echo "${{ matrix.python-version }}" | tr -d '.') AIRFLOW_VER=$(echo "${{ matrix.airflow-version }}" | tr -d '.') DBT_VER=$(echo "${{ matrix.dbt-version }}" | tr -d '.') - echo "RESOURCE_PREFIX=COSMOS_${{ github.run_id }}_PY${PYTHON_VER}_AF${AIRFLOW_VER}_DBT${DBT_VER}" >> $GITHUB_ENV + PREFIX="COSMOS_${{ github.run_id }}_PY${PYTHON_VER}_AF${AIRFLOW_VER}_DBT${DBT_VER}" + echo "prefix=${PREFIX}" >> $GITHUB_OUTPUT - name: Test Cosmos against Airflow ${{ matrix.airflow-version }}, Python ${{ matrix.python-version }} and dbt ${{ matrix.dbt-version }} run: | @@ -499,7 +501,7 @@ jobs: SNOWFLAKE_SCHEMA: ${{ secrets.SNOWFLAKE_SCHEMA }} SNOWFLAKE_WAREHOUSE: ${{ secrets.SNOWFLAKE_WAREHOUSE }} SNOWFLAKE_DATABASE: ${{ secrets.SNOWFLAKE_DATABASE }} - RESOURCE_PREFIX: ${{ env.RESOURCE_PREFIX }} + RESOURCE_PREFIX: ${{ steps.set-resource-prefix.outputs.prefix }} - name: Upload coverage to Github uses: actions/upload-artifact@v4 @@ -521,7 +523,7 @@ jobs: SNOWFLAKE_SCHEMA: ${{ secrets.SNOWFLAKE_SCHEMA }} SNOWFLAKE_WAREHOUSE: ${{ secrets.SNOWFLAKE_WAREHOUSE }} SNOWFLAKE_DATABASE: ${{ secrets.SNOWFLAKE_DATABASE }} - RESOURCE_PREFIX: ${{ env.RESOURCE_PREFIX }} + RESOURCE_PREFIX: ${{ steps.set-resource-prefix.outputs.prefix }} Run-Performance-Tests: From b6666d2234547fb4f8de23d24aa75942fd746641 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 30 Jul 2025 18:01:21 +0530 Subject: [PATCH 07/10] Remove SQL vulnerability - apply Copilot suggestion Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- scripts/ci_dbtf_delete_snowflake_resources.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/scripts/ci_dbtf_delete_snowflake_resources.py b/scripts/ci_dbtf_delete_snowflake_resources.py index 3f2d982116..7d852b19dc 100644 --- a/scripts/ci_dbtf_delete_snowflake_resources.py +++ b/scripts/ci_dbtf_delete_snowflake_resources.py @@ -18,21 +18,20 @@ def delete_snowflake_resource(): prefix = os.getenv("RESOURCE_PREFIX", "") if prefix: cursor = conn.cursor() - query = f""" + query = """ SELECT table_name, table_type FROM information_schema.tables - WHERE table_schema = '{os.environ['SNOWFLAKE_SCHEMA']}' - AND table_name LIKE '{prefix}_%' + WHERE table_schema = %s + AND table_name LIKE %s """ - print(query) - cursor.execute(query) + cursor.execute(query, (os.environ['SNOWFLAKE_SCHEMA'], f"{prefix}_%")) resources = cursor.fetchall() for resource_name, resource_type in resources: if resource_type == "BASE TABLE": - cursor.execute(f"DROP TABLE IF EXISTS {resource_name}") + cursor.execute("DROP TABLE IF EXISTS %s", (resource_name,)) elif resource_type == "VIEW": - cursor.execute(f"DROP VIEW IF EXISTS {resource_name}") + cursor.execute("DROP VIEW IF EXISTS %s", (resource_name,)) cursor.close() print(f"Deleted {len(resources)} resources") else: From 51b5d12ad91b1c16a753faee97b5e9d237b73034 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 30 Jul 2025 12:31:36 +0000 Subject: [PATCH 08/10] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/ci_dbtf_delete_snowflake_resources.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/ci_dbtf_delete_snowflake_resources.py b/scripts/ci_dbtf_delete_snowflake_resources.py index 7d852b19dc..0375c158af 100644 --- a/scripts/ci_dbtf_delete_snowflake_resources.py +++ b/scripts/ci_dbtf_delete_snowflake_resources.py @@ -24,7 +24,7 @@ def delete_snowflake_resource(): WHERE table_schema = %s AND table_name LIKE %s """ - cursor.execute(query, (os.environ['SNOWFLAKE_SCHEMA'], f"{prefix}_%")) + cursor.execute(query, (os.environ["SNOWFLAKE_SCHEMA"], f"{prefix}_%")) resources = cursor.fetchall() for resource_name, resource_type in resources: From 4bda381795a62743cf3d4961f0c15d0e0a302436 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 30 Jul 2025 18:15:28 +0530 Subject: [PATCH 09/10] Fix failure post application of GitHub copilot's suggestiona round SQL Injection vulnerability --- scripts/ci_dbtf_delete_snowflake_resources.py | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/scripts/ci_dbtf_delete_snowflake_resources.py b/scripts/ci_dbtf_delete_snowflake_resources.py index 0375c158af..8c86eb05b0 100644 --- a/scripts/ci_dbtf_delete_snowflake_resources.py +++ b/scripts/ci_dbtf_delete_snowflake_resources.py @@ -1,4 +1,5 @@ import os +import re import snowflake.connector @@ -18,6 +19,7 @@ def delete_snowflake_resource(): prefix = os.getenv("RESOURCE_PREFIX", "") if prefix: cursor = conn.cursor() + # Use parameterized query for the SELECT (safe for values) query = """ SELECT table_name, table_type FROM information_schema.tables @@ -28,14 +30,28 @@ def delete_snowflake_resource(): resources = cursor.fetchall() for resource_name, resource_type in resources: - if resource_type == "BASE TABLE": - cursor.execute("DROP TABLE IF EXISTS %s", (resource_name,)) - elif resource_type == "VIEW": - cursor.execute("DROP VIEW IF EXISTS %s", (resource_name,)) + # Validate table name contains only safe characters (prevent injection) + if not re.match(r"^[A-Z0-9_]+$", resource_name): + print(f"Skipping potentially unsafe table name: {resource_name}") + continue + + try: + if resource_type == "BASE TABLE": + # Table names must be part of SQL string, not parameterized + drop_sql = f"DROP TABLE IF EXISTS {resource_name}" + print(f"Executing: {drop_sql}") + cursor.execute(drop_sql) + elif resource_type == "VIEW": + drop_sql = f"DROP VIEW IF EXISTS {resource_name}" + print(f"Executing: {drop_sql}") + cursor.execute(drop_sql) + except Exception as e: + print(f"Failed to drop {resource_name}: {e}") + cursor.close() - print(f"Deleted {len(resources)} resources") + print(f"Processed {len(resources)} resources") else: - print("No resources to delete") + print("No RESOURCE_PREFIX set, skipping cleanup") conn.close() From 624e5f5382fcfed3faf71b386e069769650dfe86 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Mon, 4 Aug 2025 17:19:06 +0100 Subject: [PATCH 10/10] Update .github/workflows/test.yml Co-authored-by: Pankaj Koti --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f32f66471f..106396ca7d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,7 +2,7 @@ name: test on: push: # Run on pushes to the default branch - branches: [main,fix-dbt-fusion] + branches: [main] # Also run on pull requests originating from forks. Although this is insecure by default, we need it to run # integration tests on forked PRs. As a guardrail, we’ve added an Authorize step to each job, which requires manually # approving the workflow run for each pushed commit. Approval only happens after a careful code review of the changes.