Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 96 additions & 32 deletions .github/workflows/test.yml

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions dev/dags/simple_dag_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
from pathlib import Path

from cosmos import DbtDag, ExecutionConfig, ExecutionMode, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import TestBehavior
from cosmos.profiles import GoogleCloudServiceAccountDictProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))

DBT_ADAPTER_VERSION = os.getenv("DBT_ADAPTER_VERSION", "1.9")

profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
Expand All @@ -26,12 +29,9 @@
profile_config=profile_config,
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.AIRFLOW_ASYNC,
async_py_requirements=["dbt-bigquery"],
),
render_config=RenderConfig(
select=["path:models"],
# test_behavior=TestBehavior.NONE
async_py_requirements=[f"dbt-bigquery=={DBT_ADAPTER_VERSION}"],
),
render_config=RenderConfig(select=["path:models"], test_behavior=TestBehavior.NONE),
# normal dag parameters
schedule_interval=None,
start_date=datetime(2023, 1, 1),
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow}
[[tool.hatch.envs.tests.matrix]]
python = ["3.8", "3.9", "3.10", "3.11", "3.12"]
airflow = ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10"]
dbt = ["1.5", "1.6", "1.7", "1.8", "1.9"]

[tool.hatch.envs.tests.overrides]
matrix.airflow.dependencies = [
Expand All @@ -176,6 +177,7 @@ test-integration = 'sh scripts/test/integration.sh'
test-kubernetes = "sh scripts/test/integration-kubernetes.sh"
test-kubernetes-setup = "sh scripts/test/kubernetes-setup.sh"
test-integration-dbt-1-5-4 = 'sh scripts/test/integration-dbt-1-5-4.sh'
test-integration-dbt-async = 'sh scripts/test/integration-dbt-async.sh {matrix:dbt}'
test-integration-expensive = 'sh scripts/test/integration-expensive.sh'
test-integration-setup = 'sh scripts/test/integration-setup.sh'
test-performance = 'sh scripts/test/performance.sh'
Expand Down
29 changes: 29 additions & 0 deletions scripts/test/integration-dbt-async.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/bin/bash

set -x
set -e

DBT_VERSION="$1"
echo "DBT_VERSION:"
echo "$DBT_VERSION"


pip uninstall dbt-adapters dbt-common dbt-core dbt-extractor dbt-postgres dbt-semantic-interfaces -y
pip install "dbt-postgres==$DBT_VERSION" "dbt-databricks==$DBT_VERSION" "dbt-bigquery==$DBT_VERSION"
export SOURCE_RENDERING_BEHAVIOR=all
rm -rf airflow.*; \
airflow db init; \

if [ "$DBT_VERSION" = "1.7" ]; then
# Otherwise, we will get the following error:
# stderr: MessageToJson() got an unexpected keyword argument 'including_default_value_fields'
echo "DBT version is 1.7 — Installing protobuf==4.25.6..."
pip install protobuf==4.25.6
fi

rm -rf dbt/jaffle_shop/dbt_packages;
pytest -vv \
--cov=cosmos \
--cov-report=term-missing \
--cov-report=xml \
"tests/test_async_example_dag.py::test_example_dag[simple_dag_async]"
1 change: 1 addition & 0 deletions scripts/test/integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ pytest -vv \
--cov-report=xml \
-m 'integration' \
--ignore=tests/perf \
--ignore=tests/test_async_example_dag.py \
--ignore=tests/test_example_k8s_dags.py \
-k 'not ( example_cosmos_python_models or example_virtualenv or jaffle_shop_kubernetes)'
1 change: 1 addition & 0 deletions scripts/test/performance.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ pytest -vv \
-s \
-m 'perf' \
--ignore=tests/test_example_dags.py \
--ignore=tests/test_async_example_dag.py \
--ignore=tests/test_example_dags_no_connections.py \
--ignore=tests/test_example_k8s_dags.py
1 change: 1 addition & 0 deletions scripts/test/unit-cov.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ pytest \
-m "not (integration or perf)" \
--ignore=tests/perf \
--ignore=tests/test_example_dags.py \
--ignore=tests/test_async_example_dag.py \
--ignore=tests/test_example_dags_no_connections.py \
--ignore=tests/test_example_k8s_dags.py
1 change: 1 addition & 0 deletions scripts/test/unit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ pytest \
-m "not (integration or perf)" \
--ignore=tests/perf \
--ignore=tests/test_example_dags.py \
--ignore=tests/test_async_example_dag.py \
--ignore=tests/test_example_dags_no_connections.py \
--ignore=tests/test_example_k8s_dags.py
73 changes: 73 additions & 0 deletions tests/test_async_example_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# We already have tests/test_example_dags.py, but it doesn’t run against multiple dbt versions in CI.
# Some dbt versions have shown parsing issues with certain example DAGs — something we may need to address over time.
# With PR #1535, the goal is to test the async example DAG across multiple dbt versions. To prevent the CI job from
# failing early due to unrelated DAG parsing errors, PR #1535 introduces this new test_async_example_dag.py file.
# This file replicates tests/test_example_dags.py but excludes all DAGs except simple_async_dag by adding them to
# .airflowignore. This ensures the CI job focuses solely on testing simple_async_dag over multiple dbt versions
# without being disrupted by other DAG parsing issues.

from __future__ import annotations
Comment thread
pankajkoti marked this conversation as resolved.

from pathlib import Path

try:
from functools import cache
except ImportError:
from functools import lru_cache as cache


import airflow
import pytest
from airflow.models.dagbag import DagBag
from airflow.utils.db import create_default_connections
from airflow.utils.session import provide_session
from packaging.version import Version

EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "dev/dags"
ALL_FILES_TO_IGNORE = [
f.name for f in EXAMPLE_DAGS_DIR.iterdir() if f.is_file() and f.suffix == ".py" and f.name != "simple_dag_async.py"
]

AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore"
AIRFLOW_VERSION = Version(airflow.__version__)


@provide_session
def get_session(session=None):
create_default_connections(session)
return session


@pytest.fixture()
def session():
return get_session()


@cache
def get_dag_bag() -> DagBag:
"""Create a DagBag by adding the files that are not supported to .airflowignore"""

with open(AIRFLOW_IGNORE_FILE, "w+") as file:
for dagfile in ALL_FILES_TO_IGNORE:
print(f"Adding {dagfile} to .airflowignore")
file.writelines([f"{dagfile}\n"])

print(".airflowignore contents: ")
print(AIRFLOW_IGNORE_FILE.read_text())
db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False)
assert db.dags
assert not db.import_errors
return db


def get_dag_ids() -> list[str]:
dag_bag = get_dag_bag()
return dag_bag.dag_ids


@pytest.mark.integration
@pytest.mark.parametrize("dag_id", get_dag_ids())
def test_example_dag(session, dag_id: str):
dag_bag = get_dag_bag()
dag = dag_bag.get_dag(dag_id)
dag.test()