Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,6 @@ repos:
ci:
autofix_commit_msg: 🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
autoupdate_commit_msg: ⬆ [pre-commit.ci] pre-commit autoupdate
skip:
- mypy # build of https://github.com/pre-commit/mirrors-mypy:types-PyYAML,types-attrs,attrs,types-requests,
#types-python-dateutil,apache-airflow@v1.5.0 for python@python3 exceeds tier max size 250MiB: 262.6MiB
2 changes: 0 additions & 2 deletions cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from cosmos.airflow.dag import DbtDag
from cosmos.airflow.task_group import DbtTaskGroup
from cosmos.constants import LoadMode, TestBehavior, ExecutionMode
from cosmos.dataset import get_dbt_dataset
from cosmos.operators.lazy_load import MissingPackage
from cosmos.config import (
ProjectConfig,
Expand Down Expand Up @@ -96,7 +95,6 @@
"DbtTestLocalOperator",
"DbtDepsLocalOperator",
"DbtSnapshotLocalOperator",
"get_dbt_dataset",
"DbtDag",
"DbtTaskGroup",
"DbtLSDockerOperator",
Expand Down
12 changes: 0 additions & 12 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from cosmos.constants import DbtResourceType, TestBehavior, ExecutionMode
from cosmos.core.airflow import get_airflow_task as create_airflow_task
from cosmos.core.graph.entities import Task as TaskMetadata
from cosmos.dataset import get_dbt_dataset
from cosmos.dbt.graph import DbtNode
from cosmos.log import get_logger
from airflow.models import BaseOperator
Expand Down Expand Up @@ -124,10 +123,8 @@ def build_airflow_graph(
task_args: dict[str, Any], # Cosmos/DBT - used to instantiate tasks
test_behavior: TestBehavior, # Cosmos-specific: how to inject tests to Airflow DAG
dbt_project_name: str, # DBT / Cosmos - used to name test task if mode is after_all,
conn_id: str, # Cosmos, dataset URI
task_group: TaskGroup | None = None,
on_warning_callback: Callable[..., Any] | None = None, # argument specific to the DBT test command
emit_datasets: bool = True, # Cosmos
) -> None:
"""
Instantiate dbt `nodes` as Airflow tasks within the given `task_group` (optional) or `dag` (mandatory).
Expand All @@ -142,22 +139,16 @@ def build_airflow_graph(
a single test task will be added after the Cosmos leave tasks, and it is named using `dbt_project_name`.
Finally, if the `test_behaviour` is `after_each`, a test will be added after each model.

If `emit_datasets` is True, tasks will create outlets using:
* `dbt_project_name`
* `conn_id`

:param nodes: Dictionary mapping dbt nodes (node.unique_id to node)
:param dag: Airflow DAG instance
:param execution_mode: Where Cosmos should run each dbt task (e.g. ExecutionMode.LOCAL, ExecutionMode.KUBERNETES).
Default is ExecutionMode.LOCAL.
:param task_args: Arguments to be used to instantiate an Airflow Task
:param test_behavior: When to run `dbt` tests. Default is TestBehavior.AFTER_EACH, that runs tests after each model.
:param dbt_project_name: Name of the dbt pipeline of interest
:param conn_id: Airflow connection ID
:param task_group: Airflow Task Group instance
:param on_warning_callback: A callback function called on warnings with additional Context variables “test_names”
and “test_results” of type List.
:param emit_datasets: Decides if Cosmos should add outlets to model classes or not.
"""
tasks_map = {}
task_or_group: TaskGroup | BaseOperator
Expand All @@ -167,8 +158,6 @@ def build_airflow_graph(
# If test_behaviour=="after_each", each model task will be bundled with a test task, using TaskGroup
for node_id, node in nodes.items():
task_meta = create_task_metadata(node=node, execution_mode=execution_mode, args=task_args)
if emit_datasets:
task_args["outlets"] = [get_dbt_dataset(conn_id, dbt_project_name, node.name)]
if task_meta and node.resource_type != DbtResourceType.TEST:
if node.resource_type == DbtResourceType.MODEL and test_behavior == TestBehavior.AFTER_EACH:
with TaskGroup(dag=dag, group_id=node.name, parent_group=task_group) as model_task_group:
Expand All @@ -190,7 +179,6 @@ def build_airflow_graph(
# If test_behaviour=="after_all", there will be one test task, run "by the end" of the DAG
# The end of a DAG is defined by the DAG leaf tasks (tasks which do not have downstream tasks)
if test_behavior == TestBehavior.AFTER_ALL:
task_args.pop("outlets", None)
test_meta = create_test_task_metadata(
f"{dbt_project_name}_test", execution_mode, task_args=task_args, on_warning_callback=on_warning_callback
)
Expand Down
6 changes: 3 additions & 3 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

from __future__ import annotations

import shutil
import contextlib
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterator

from cosmos.constants import TestBehavior, ExecutionMode, LoadMode
from cosmos.dbt.executable import get_system_dbt
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger
from cosmos.profiles import BaseProfileMapping
Expand Down Expand Up @@ -180,9 +180,9 @@ class ExecutionConfig:
Contains configuration about how to execute dbt.

:param execution_mode: The execution mode for dbt. Defaults to local
:param dbt_executable_path: The path to the dbt executable. Defaults to dbt-ol or dbt if
:param dbt_executable_path: The path to the dbt executable. Defaults to dbt if
available on the path.
"""

execution_mode: ExecutionMode = ExecutionMode.LOCAL
dbt_executable_path: str | Path = shutil.which("dbt-ol") or shutil.which("dbt") or "dbt"
dbt_executable_path: str | Path = get_system_dbt()
2 changes: 2 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
DBT_TARGET_PATH_ENVVAR = "DBT_TARGET_PATH"
DBT_TARGET_DIR_NAME = "target"
DBT_LOG_FILENAME = "dbt.log"
DBT_BINARY_NAME = "dbt"
OPENLINEAGE_PRODUCER = "https://github.com/astronomer/astronomer-cosmos/"


class LoadMode(Enum):
Expand Down
7 changes: 1 addition & 6 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,6 @@ def __init__(
manifest_path = project_config.parsed_manifest_path
dbt_executable_path = execution_config.dbt_executable_path

conn_id = "unknown"
if profile_config and profile_config.profile_mapping:
conn_id = profile_config.profile_mapping.conn_id

profile_args = {}
if profile_config.profile_mapping:
profile_args = profile_config.profile_mapping.profile_args
Expand Down Expand Up @@ -155,6 +151,7 @@ def __init__(
# the following args may be only needed for local / venv:
"project_dir": dbt_project.dir,
"profile_config": profile_config,
"emit_datasets": emit_datasets,
}

if dbt_executable_path:
Expand All @@ -170,7 +167,5 @@ def __init__(
task_args=task_args,
test_behavior=test_behavior,
dbt_project_name=dbt_project.name,
conn_id=conn_id,
on_warning_callback=on_warning_callback,
emit_datasets=emit_datasets,
)
29 changes: 0 additions & 29 deletions cosmos/dataset.py

This file was deleted.

2 changes: 1 addition & 1 deletion cosmos/dbt/executable.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ def get_system_dbt() -> str:
"""
Tries to identify which is the path to the dbt executable, return "dbt" otherwise.
"""
return shutil.which("dbt-ol") or shutil.which("dbt") or "dbt"
return shutil.which("dbt") or "dbt"
16 changes: 4 additions & 12 deletions cosmos/operators/base.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from __future__ import annotations

import os
import shutil
from typing import Any, Sequence, Tuple

import yaml
from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context
from airflow.utils.operator_helpers import context_to_airflow_vars

from cosmos.dbt.executable import get_system_dbt
from cosmos.log import get_logger


Expand Down Expand Up @@ -99,7 +99,7 @@ def __init__(
output_encoding: str = "utf-8",
skip_exit_code: int = 99,
cancel_query_on_kill: bool = True,
dbt_executable_path: str = "dbt",
dbt_executable_path: str = get_system_dbt(),
dbt_cmd_flags: list[str] | None = None,
dbt_cmd_global_flags: list[str] | None = None,
**kwargs: Any,
Expand All @@ -124,15 +124,7 @@ def __init__(
self.output_encoding = output_encoding
self.skip_exit_code = skip_exit_code
self.cancel_query_on_kill = cancel_query_on_kill
# dbt-ol is the OpenLineage wrapper for dbt, which automatically
# generates and emits lineage data to a specified backend.
dbt_ol_path = shutil.which("dbt-ol")
if dbt_executable_path == "dbt" and shutil.which("dbt-ol"):
self.dbt_executable_path = dbt_ol_path
elif dbt_executable_path == "dbt":
self.dbt_executable_path = shutil.which("dbt")
else:
self.dbt_executable_path = dbt_executable_path
self.dbt_executable_path = dbt_executable_path
self.dbt_cmd_flags = dbt_cmd_flags
self.dbt_cmd_global_flags = dbt_cmd_global_flags or []
super().__init__(**kwargs)
Expand Down Expand Up @@ -210,7 +202,7 @@ def build_cmd(
self,
context: Context,
cmd_flags: list[str] | None = None,
) -> Tuple[list[str | None], dict[str, str | bytes | os.PathLike[Any]]]:
) -> Tuple[list[str], dict[str, str | bytes | os.PathLike[Any]]]:
dbt_cmd = [self.dbt_executable_path]

dbt_cmd.extend(self.dbt_cmd_global_flags)
Expand Down
Loading