Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
42b0de1
Release 1.9.0
pankajkoti Feb 14, 2025
c5a49cd
Update Changelog to include PR 1521
pankajkoti Feb 14, 2025
3e6dd5d
Update CHANGELOG.rst
pankajkoti Feb 19, 2025
696dcbf
Update Changelog to include PR 1544, 1545
pankajkoti Feb 19, 2025
e8fc621
Release 1.9.0 (#1539)
pankajkoti Feb 20, 2025
f5d874f
Fix import error in dbt bigquery adapter mock for `dbt-bigquery<1.8` …
pankajkoti Feb 21, 2025
6205971
⬆ [pre-commit.ci] pre-commit autoupdate (#1560)
pre-commit-ci[bot] Feb 25, 2025
70142eb
Improve MWAA getting-started docs by removing unused imports (#1562)
jx2lee Feb 26, 2025
922deeb
Disable `example_cosmos_dbt_build.py` DAG in CI (#1567)
pankajastro Feb 27, 2025
9c1c3ac
Upgrade GitHub Actions Ubuntu version (#1561)
tatiana Feb 27, 2025
167f23b
Bugfix `operator_args` override configuration (#1558)
ghjklw Feb 27, 2025
9ed6502
Bugfix ProjectConfig install_dbt_deps (#1556)
ghjklw Feb 28, 2025
9b18724
Fix dbt project parsing `dbt_vars` behavior (#1543)
AlexandrKhabarov Mar 3, 2025
490821b
⬆ [pre-commit.ci] pre-commit autoupdate (#1583)
pre-commit-ci[bot] Mar 3, 2025
ad2d2ba
Update the GH bug issue template (#1586)
pankajastro Mar 4, 2025
970604f
Avoid reading the connection during DAG parsing of the async BigQuery…
joppevos Mar 4, 2025
708217a
Enable DAG example_cosmos_dbt_build.py in CI (#1573)
pankajastro Mar 4, 2025
ac92243
Fix: Workaround to incorrectly raised `gcsfs.retry.HttpError` (Invali…
tatiana Mar 11, 2025
4c4221a
⬆ [pre-commit.ci] pre-commit autoupdate (#1596)
pre-commit-ci[bot] Mar 11, 2025
a9b8dfa
Fix the async execution mode read sql files for dbt packages (#1588)
pankajastro Mar 12, 2025
42bb17d
Improve BQ async error handling (#1597)
tatiana Mar 13, 2025
dfb92c8
Fix path selector when `manifest.json` was created in MS Windows (#1601)
tatiana Mar 13, 2025
c2491ad
Fix log that prints 'Total filtered nodes' (#1603)
tatiana Mar 13, 2025
5719b94
Fix select behaviour using `LoadMode.MANIFEST` and a path with star (…
tatiana Mar 13, 2025
4bc447b
Run async in CI DAG without setup/teardown task (#1599)
pankajastro Mar 13, 2025
6f7dabf
Add test case that fully covers recent select issue (#1604)
tatiana Mar 13, 2025
68cf910
Add CI job to test multiple dbt versions for the async DAG (#1535)
pankajkoti Mar 13, 2025
0eb96eb
Update Changelog
pankajkoti Mar 13, 2025
cabdf2d
Support `on_warning_callback` with `TestBehavior.BUILD` and `Executio…
corsettigyg Feb 28, 2025
ff824f8
Include PR #1571
pankajkoti Mar 13, 2025
da29296
Improve unit tests speed from 89s to 14s (#1600)
tatiana Mar 13, 2025
c78b37d
Fix `DbtRunLocalOperator.partial()` support (#1609)
tatiana Mar 13, 2025
41875a7
Include PR #1600, PR #1609
pankajkoti Mar 13, 2025
b164c39
fix: container_name is null for ecs integration (#1592)
nicor88 Mar 13, 2025
a7d71cc
Include PR #1592
pankajkoti Mar 13, 2025
343643b
Release 1.9.1a4
pankajkoti Mar 13, 2025
75aca26
Release 1.9.1
tatiana Mar 13, 2025
06bdcec
Update CHANGELOG.rst
tatiana Mar 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/ISSUE_TEMPLATE/01-bug.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@ body:
label: ExecutionMode
description: Which ExecutionMode are you using?
options:
- "AIRFLOW_ASYNC"
- "AWS_ECS"
- "AWS_EKS"
- "AZURE_CONTAINER_INSTANCE"
- "DOCKER"
- "GCP_CLOUD_RUN_JOB"
- "KUBERNETES"
- "LOCAL"
- "VIRTUALENV"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:

jobs:
pages:
runs-on: ubuntu-20.04
runs-on: ubuntu-latest
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}
Expand Down
128 changes: 96 additions & 32 deletions .github/workflows/test.yml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ repos:
- --py37-plus
- --keep-runtime-typing
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.9.6
rev: v0.9.10
hooks:
- id: ruff
args:
Expand Down
61 changes: 59 additions & 2 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,44 @@
Changelog
=========

1.9.0a5 (2025-02-03)
1.9.1 (2025-03-13)
--------------------

Bug Fixes

* Fix import error in dbt bigquery adapter mock for ``dbt-bigquery<1.8`` for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti in #1548
* Fix ``operator_args`` override configuration by @ghjklw in #1558
* Fix missing ``install_dbt_deps`` in ``ProjectConfig`` ``__init__`` method by @ghjklw in #1556
* Fix dbt project parsing ``dbt_vars`` behavior passed via ``operator_args`` by @AlexandrKhabarov in #1543
* Avoid reading the connection during DAG parsing of the async BigQuery operator by @joppevos in #1582
* Fix: Workaround to incorrectly raised ``gcsfs.retry.HttpError`` (Invalid Credentials, 401) by @tatiana in #1598
* Fix the async execution mode read sql files for dbt packages by @pankajastro in #1588
* Improve BQ async error handling by @tatiana in #1597
* Fix path selector when ``manifest.json`` is created using MS Windows by @tatiana in #1601
* Fix log that prints 'Total filtered nodes' by @tatiana in #1603
* Fix select behaviour using ``LoadMode.MANIFEST`` and a path with star by @tatiana in #1602
* Support ``on_warning_callback`` with ``TestBehavior.BUILD`` and ``ExecutionMode.LOCAL`` by @corsettigyg in #1571
* Fix ``DbtRunLocalOperator.partial()`` support by @tatiana @ashb in #1609
* fix: ``container_name`` is null for ecs integration by @nicor88 in #1592

Docs

* Improve MWAA getting-started docs by removing unused imports by @jx2lee in #1562

Others

* Disable ``example_cosmos_dbt_build.py`` DAG in CI by @pankajastro in #1567
* Upgrade GitHub Actions Ubuntu version by @tatiana in #1561
* Update GitHub bug issue template by @pankajastro in #1586
* Enable DAG ``example_cosmos_dbt_build.py`` in CI by @pankajastro in #1573
* Run async DAG in DAG without setup/teardown task by @pankajastro in #1599
* Add test case that fully covers recent select issue by @tatiana in #1604
* Add CI job to test multiple dbt versions for the async DAG by @pankajkoti in #1535
* Improve unit tests speed from 89s to 14s by @tatiana in #1600
* Pre-commit updates: #1560, #1583, #1596


1.9.0 (2025-02-19)
--------------------

Breaking changes
Expand All @@ -19,23 +56,43 @@ Features
* Add structure to support multiple db for async operator execution by @pankajastro in #1483
* Support overriding the ``profile_config`` per dbt node or folder using config by @tatiana in #1492. More information `here <https://astronomer.github.io/astronomer-cosmos/profiles/#profile-customise-per-node>`_.
* Create and run accurate SQL statements when using ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti, @tatiana and @pankajastro in #1474
* Add AWS ECS task run execution mode by @CarlosGitto and @aoelvp94 in #1507
* Add support for running ``DbtSourceOperator`` individually by @victormacaubas in #1510
* Add setup task for async executions by @pankajastro in #1518
* Add teardown task for async executions by @pankajastro in #1529
* Add ``ProjectConfig.install_dbt_deps`` & change operator ``install_deps=True`` as default by @tatiana in #1521
* Extend Virtualenv operator and mock dbt adapters for setup & teardown tasks in ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti, @tatiana and @pankajastro in #1544

Bug Fixes

* Fix select complex intersection of three tag-based graph selectors by @tatiana in #1466
* Fix custom selector behaviour when the model name contains periods by @yakovlevvs and @60098727 in #1499
* Filter dbt and non-dbt kwargs correctly for async operator by @pankajastro in #1526

Enhancement

* Fix OpenLineage deprecation warning by @CorsettiS in #1449
* Move ``DbtRunner`` related functions into ``dbt/runner.py`` module by @tatiana in #1480
* Add ``on_warning_callback`` to ``DbtSourceKubernetesOperator`` and refactor previous operators by @LuigiCerone in #1501
* Gracefully error when users set incompatible ``RenderConfig.dbt_deps`` and ``operator_args`` ``install_deps`` by @tatiana in #1505
* Store compiled SQL as template field for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti in #1534

Docs

* Improve ``RenderConfig`` arguments documentation by @tatiana in #1514
* Improve callback documentation by @tatiana in #1516
* Improve partial parsing docs by @tatiana in #1520
* Fix typo in selecting & excluding docs by @pankajastro in #1523
* Document ``async_py_requirements`` added in ``ExecutionConfig`` for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti in #1545

Others

* Ignore dbt package tests when running Cosmos tests by @tatiana in #1502
* Refactor to consolidate async dbt adapter code by @pankajkoti in #1509
* Log elapsed time for sql file(s) upload/download by @pankajastro in #1536
* Remove the fallback operator for async task by @pankajastro in #1538
* GitHub Actions Dependabot: #1487
* Pre-commit updates: #1473, #1493
* Pre-commit updates: #1473, #1493, #1503, #1531


1.8.2 (2025-01-15)
Expand Down
2 changes: 1 addition & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
Contains dags, task groups, and operators.
"""

__version__ = "1.9.0a6"
__version__ = "1.9.1"


from cosmos.airflow.dag import DbtDag
Expand Down
2 changes: 2 additions & 0 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,11 @@ def create_task_metadata(
extra_context: dict[str, Any] = {
"dbt_node_config": node.context_dict,
"dbt_dag_task_group_identifier": dbt_dag_task_group_identifier,
"package_name": node.package_name,
}

if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES:
args["on_warning_callback"] = on_warning_callback
exclude_detached_tests_if_needed(node, args, detached_from_parent)
task_id, args = _get_task_id_and_args(
node, args, use_task_group, normalize_task_id, "build", include_resource_type=True
Expand Down
3 changes: 3 additions & 0 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ class ProjectConfig:
:param snapshots_relative_path: The relative path to the dbt snapshots directory within the project. Defaults to
snapshots
:param manifest_path: The absolute path to the dbt manifest file. Defaults to None
:param manifest_conn_id: Name of the Airflow connection used to access the manifest file if it is not stored locally. Defaults to None
:param project_name: Allows the user to define the project name.
Required if dbt_project_path is not defined. Defaults to the folder name of dbt_project_path.
:param env_vars: Dictionary of environment variables that are used for both rendering and execution. Rendering with
Expand All @@ -175,6 +176,7 @@ class ProjectConfig:
def __init__(
self,
dbt_project_path: str | Path | None = None,
install_dbt_deps: bool = True,
models_relative_path: str | Path = "models",
seeds_relative_path: str | Path = "seeds",
snapshots_relative_path: str | Path = "snapshots",
Expand Down Expand Up @@ -228,6 +230,7 @@ def __init__(
self.env_vars = env_vars
self.dbt_vars = dbt_vars
self.partial_parse = partial_parse
self.install_dbt_deps = install_dbt_deps

def validate_project(self) -> None:
"""
Expand Down
2 changes: 1 addition & 1 deletion cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def override_configuration(
if execution_config.invocation_mode:
operator_args["invocation_mode"] = execution_config.invocation_mode

if execution_config in (ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV):
if execution_config.execution_mode in (ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV):
if "install_deps" not in operator_args:
operator_args["install_deps"] = project_config.install_dbt_deps

Expand Down
24 changes: 19 additions & 5 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@
logger = get_logger(__name__)


def _normalize_path(path: str) -> str:
"""
Converts a potentially Windows path string into a Posix-friendly path.
"""
return Path(path.replace("\\", "/")).as_posix()


class CosmosLoadDbtException(Exception):
"""
Exception raised while trying to load a `dbt` project as a `DbtGraph` instance.
Expand All @@ -64,6 +71,7 @@ class DbtNode:
resource_type: DbtResourceType
depends_on: list[str]
file_path: Path
package_name: str | None = None
tags: list[str] = field(default_factory=lambda: [])
config: dict[str, Any] = field(default_factory=lambda: {})
has_freshness: bool = False
Expand Down Expand Up @@ -279,12 +287,17 @@ def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str,
except json.decoder.JSONDecodeError:
logger.debug("Skipped dbt ls line: %s", line)
else:
base_path = (
project_path.parent / node_dict["package_name"] if node_dict.get("package_name") else project_path # type: ignore
)

try:
node = DbtNode(
unique_id=node_dict["unique_id"],
package_name=node_dict.get("package_name"),
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
file_path=project_path / node_dict["original_file_path"],
file_path=base_path / node_dict["original_file_path"],
tags=node_dict.get("tags", []),
config=node_dict.get("config", {}),
has_freshness=(
Expand Down Expand Up @@ -376,8 +389,8 @@ def _add_vars_arg(self, cmd_args: list[str]) -> None:
"""
Change args list in-place so they include dbt vars, if they are set.
"""
if self.project.dbt_vars:
cmd_args.extend(["--vars", json.dumps(self.project.dbt_vars, sort_keys=True)])
if self.dbt_vars:
cmd_args.extend(["--vars", json.dumps(self.dbt_vars, sort_keys=True)])

@cached_property
def dbt_ls_args(self) -> list[str]:
Expand Down Expand Up @@ -530,7 +543,7 @@ def load(
self.update_node_dependency()

logger.info("Total nodes: %i", len(self.nodes))
logger.info("Total filtered nodes: %i", len(self.nodes))
logger.info("Total filtered nodes: %i", len(self.filtered_nodes))

def run_dbt_ls(
self, dbt_cmd: str, project_path: Path, tmp_dir: Path, env_vars: dict[str, str]
Expand Down Expand Up @@ -821,9 +834,10 @@ def load_from_dbt_manifest(self) -> None:
for unique_id, node_dict in resources.items():
node = DbtNode(
unique_id=unique_id,
package_name=node_dict.get("package_name"),
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
file_path=self.execution_config.project_path / Path(node_dict["original_file_path"]),
file_path=self.execution_config.project_path / _normalize_path(node_dict["original_file_path"]),
tags=node_dict["tags"],
config=node_dict["config"],
has_freshness=(
Expand Down
4 changes: 2 additions & 2 deletions cosmos/dbt/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]:
# Index nodes by name, we can improve performance by doing this once
# for multiple GraphSelectors
if PATH_SELECTOR in self.node_name:
path_selection = self.node_name[len(PATH_SELECTOR) :]
path_selection = self.node_name[len(PATH_SELECTOR) :].rstrip("*")
root_nodes.update({node_id for node_id, node in nodes.items() if path_selection in str(node.file_path)})

elif TAG_SELECTOR in self.node_name:
Expand Down Expand Up @@ -366,7 +366,7 @@ def _parse_tag_selector(self, item: str) -> None:
def _parse_path_selector(self, item: str) -> None:
index = len(PATH_SELECTOR)
if self.project_dir:
self.paths.append(self.project_dir / Path(item[index:]))
self.paths.append(self.project_dir / Path(item[index:].rstrip("*")))
else:
self.paths.append(Path(item[index:]))

Expand Down
7 changes: 3 additions & 4 deletions cosmos/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from cosmos import settings
from cosmos.constants import DEFAULT_TARGET_PATH, FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP
from cosmos.exceptions import CosmosValueError
from cosmos.settings import remote_target_path, remote_target_path_conn_id


def upload_to_aws_s3(
Expand Down Expand Up @@ -136,14 +135,14 @@ def _configure_remote_target_path() -> tuple[Path, str] | tuple[None, None]:
"""Configure the remote target path if it is provided."""
from airflow.version import version as airflow_version

if not remote_target_path:
if not settings.remote_target_path:
return None, None

_configured_target_path = None

target_path_str = str(remote_target_path)
target_path_str = str(settings.remote_target_path)

remote_conn_id = remote_target_path_conn_id
remote_conn_id = settings.remote_target_path_conn_id
if not remote_conn_id:
target_path_schema = urlparse(target_path_str).scheme
remote_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(target_path_schema, None) # type: ignore[assignment]
Expand Down
42 changes: 31 additions & 11 deletions cosmos/operators/_asynchronous/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,15 @@
from typing import TYPE_CHECKING, Any, Sequence

import airflow
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

try:
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
except ImportError:
raise ImportError(
"Could not import BigQueryInsertJobOperator. Ensure you've installed the Google Cloud provider separately or "
"with with `pip install apache-airflow-providers-google`."
)

from airflow.utils.context import Context
from airflow.utils.session import NEW_SESSION, provide_session
from packaging.version import Version
Expand All @@ -15,7 +23,7 @@
from cosmos.dataset import get_dataset_alias_name
from cosmos.exceptions import CosmosValueError
from cosmos.operators.local import AbstractDbtLocalBase
from cosmos.settings import enable_setup_async_task, remote_target_path, remote_target_path_conn_id
from cosmos.settings import remote_target_path, remote_target_path_conn_id

if TYPE_CHECKING: # pragma: no cover
from sqlalchemy.orm import Session
Expand All @@ -28,7 +36,11 @@ def _mock_bigquery_adapter() -> None:

import agate
from dbt.adapters.bigquery.connections import BigQueryAdapterResponse, BigQueryConnectionManager
from dbt_common.clients.agate_helper import empty_table

try:
from dbt_common.clients.agate_helper import empty_table
except (ModuleNotFoundError, ImportError): # pragma: no cover
from dbt.clients.agate_helper import empty_table

def execute( # type: ignore[no-untyped-def]
self, sql, auto_begin=False, fetch=None, limit: Optional[int] = None
Expand Down Expand Up @@ -69,9 +81,6 @@ def __init__(
self.project_dir = project_dir
self.profile_config = profile_config
self.gcp_conn_id = self.profile_config.profile_mapping.conn_id # type: ignore
profile = self.profile_config.profile_mapping.profile # type: ignore
self.gcp_project = profile["project"]
self.dataset = profile["dataset"]
self.extra_context = extra_context or {}
self.configuration: dict[str, Any] = {}
self.dbt_kwargs = dbt_kwargs or {}
Expand Down Expand Up @@ -99,6 +108,8 @@ def __init__(
self.async_context["profile_type"] = self.profile_config.get_profile_type()
self.async_context["async_operator"] = BigQueryInsertJobOperator
self.compiled_sql = ""
self.gcp_project = ""
self.dataset = ""

@property
def base_cmd(self) -> list[str]:
Expand Down Expand Up @@ -131,7 +142,7 @@ def get_remote_sql(self) -> str:
return sql # type: ignore

def execute(self, context: Context, **kwargs: Any) -> None:
if enable_setup_async_task:
if settings.enable_setup_async_task:
self.configuration = {
"query": {
"query": self.get_remote_sql(),
Expand All @@ -141,20 +152,29 @@ def execute(self, context: Context, **kwargs: Any) -> None:
super().execute(context=context)
else:
self.build_and_run_cmd(context=context, run_as_async=True, async_context=self.async_context)
self._store_compiled_sql(context=context)
self._store_template_fields(context=context)

@provide_session
def _store_compiled_sql(self, context: Context, session: Session = NEW_SESSION) -> None:
def _store_template_fields(self, context: Context, session: Session = NEW_SESSION) -> None:
from airflow.models.renderedtifields import RenderedTaskInstanceFields
from airflow.models.taskinstance import TaskInstance

if not enable_setup_async_task:
if not settings.enable_setup_async_task:
self.log.info("SQL cannot be made available, skipping registration of compiled_sql template field")
return
sql = self.get_remote_sql().strip()
self.log.debug("Executed SQL is: %s", sql)
self.compiled_sql = sql

if self.profile_config.profile_mapping is not None:
profile = self.profile_config.profile_mapping.profile
else:
raise CosmosValueError(
"The `profile_config.profile`_mapping attribute must be defined to use `ExecutionMode.AIRFLOW_ASYNC`"
)
self.gcp_project = profile["project"]
self.dataset = profile["dataset"]

# need to refresh the rendered task field record in the db because Airflow only does this
# before executing the task, not after
ti = context["ti"]
Expand Down Expand Up @@ -184,5 +204,5 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> Any:
"""
job_id = super().execute_complete(context=context, event=event)
self.log.info("Configuration is %s", str(self.configuration))
self._store_compiled_sql(context=context)
self._store_template_fields(context=context)
return job_id
Loading