Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ class ProjectConfig:
:param dbt_vars: Dictionary of dbt variables for the project. This argument overrides variables defined in your dbt_project.yml
file. The dictionary is dumped to a yaml string and passed to dbt commands as the --vars argument. Variables are only
supported for rendering when using ``RenderConfig.LoadMode.DBT_LS`` and ``RenderConfig.LoadMode.CUSTOM`` load mode.
:param partial_parse: If True, then attempt to use the ``partial_parse.msgpack`` if it exists. This is only used
for the ``LoadMode.DBT_LS`` load mode, and for the ``ExecutionMode.LOCAL`` and ``ExecutionMode.VIRTUALENV``
execution modes.
"""

dbt_project_path: Path | None = None
Expand All @@ -141,6 +144,7 @@ def __init__(
project_name: str | None = None,
env_vars: dict[str, str] | None = None,
dbt_vars: dict[str, str] | None = None,
partial_parse: bool = True,
):
# Since we allow dbt_project_path to be defined in ExecutionConfig and RenderConfig
# dbt_project_path may not always be defined here.
Expand All @@ -166,6 +170,7 @@ def __init__(

self.env_vars = env_vars
self.dbt_vars = dbt_vars
self.partial_parse = partial_parse

def validate_project(self) -> None:
"""
Expand Down Expand Up @@ -292,7 +297,7 @@ class ExecutionConfig:
:param execution_mode: The execution mode for dbt. Defaults to local
:param test_indirect_selection: The mode to configure the test behavior when performing indirect selection.
:param dbt_executable_path: The path to the dbt executable for runtime execution. Defaults to dbt if available on the path.
:param dbt_project_path Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path
:param dbt_project_path: Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path
"""

execution_mode: ExecutionMode = ExecutionMode.LOCAL
Expand Down
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
DBT_LOG_DIR_NAME = "logs"
DBT_TARGET_PATH_ENVVAR = "DBT_TARGET_PATH"
DBT_TARGET_DIR_NAME = "target"
DBT_PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack"
DBT_LOG_FILENAME = "dbt.log"
DBT_BINARY_NAME = "dbt"

Expand Down
1 change: 1 addition & 0 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ def __init__(
task_args = {
**operator_args,
"project_dir": execution_config.project_path,
"partial_parse": project_config.partial_parse,
"profile_config": profile_config,
"emit_datasets": render_config.emit_datasets,
"env": env_vars,
Expand Down
8 changes: 7 additions & 1 deletion cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
LoadMode,
)
from cosmos.dbt.parser.project import LegacyDbtProject
from cosmos.dbt.project import create_symlinks, environ
from cosmos.dbt.project import create_symlinks, copy_msgpack_for_partial_parse, environ
from cosmos.dbt.selector import select_nodes
from cosmos.log import get_logger

Expand Down Expand Up @@ -204,6 +204,9 @@ def run_dbt_ls(
if self.render_config.selector:
ls_command.extend(["--selector", self.render_config.selector])

if not self.project.partial_parse:
ls_command.append("--no-partial-parse")

ls_command.extend(self.local_flags)

stdout = run_command(ls_command, tmp_dir, env_vars)
Expand Down Expand Up @@ -248,6 +251,9 @@ def load_via_dbt_ls(self) -> None:
tmpdir_path = Path(tmpdir)
create_symlinks(self.render_config.project_path, tmpdir_path, self.render_config.dbt_deps)

if self.project.partial_parse:
copy_msgpack_for_partial_parse(self.render_config.project_path, tmpdir_path)

with self.profile_config.ensure_profile(use_mock_values=True) as profile_values, environ(
self.project.env_vars or self.render_config.env_vars or {}
):
Expand Down
16 changes: 12 additions & 4 deletions cosmos/dbt/project.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from __future__ import annotations

from pathlib import Path
import shutil
import os
from cosmos.constants import (
DBT_LOG_DIR_NAME,
DBT_TARGET_DIR_NAME,
)
from cosmos.constants import DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, DBT_PARTIAL_PARSE_FILE_NAME
from contextlib import contextmanager
from typing import Generator

Expand All @@ -21,6 +19,16 @@ def create_symlinks(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool
os.symlink(project_path / child_name, tmp_dir / child_name)


def copy_msgpack_for_partial_parse(project_path: Path, tmp_dir: Path) -> None:
partial_parse_file = Path(project_path) / DBT_TARGET_DIR_NAME / DBT_PARTIAL_PARSE_FILE_NAME

if partial_parse_file.exists():
tmp_target_dir = tmp_dir / DBT_TARGET_DIR_NAME
tmp_target_dir.mkdir(exist_ok=True)

shutil.copy(str(partial_parse_file), str(tmp_target_dir / DBT_PARTIAL_PARSE_FILE_NAME))


@contextmanager
def environ(env_vars: dict[str, str]) -> Generator[None, None, None]:
"""Temporarily set environment variables inside the context manager and restore
Expand Down
6 changes: 6 additions & 0 deletions cosmos/hooks/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,9 @@ def send_sigterm(self) -> None:
logger.info("Sending SIGTERM signal to process group")
if self.sub_process and hasattr(self.sub_process, "pid"):
os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM)

def send_sigint(self) -> None:
"""Sends SIGINT signal to ``self.sub_process`` if one exists."""
logger.info("Sending SIGINT signal to process group")
if self.sub_process and hasattr(self.sub_process, "pid"):
os.killpg(os.getpgid(self.sub_process.pid), signal.SIGINT)
16 changes: 9 additions & 7 deletions cosmos/operators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ class AbstractDbtBaseOperator(BaseOperator, metaclass=ABCMeta):
:param skip_exit_code: If task exits with this exit code, leave the task
in ``skipped`` state (default: 99). If set to ``None``, any non-zero
exit code will be treated as a failure.
:param partial_parse: If True (default), then the operator will use the
``partial_parse.msgpack`` during execution if it exists. If False, then
a flag will be explicitly set to turn off partial parsing.
:param cancel_query_on_kill: If true, then cancel any running queries when the task's on_kill() is executed.
Otherwise, the query will keep running when the task is killed.
:param dbt_executable_path: Path to dbt executable can be used with venv
Expand All @@ -68,13 +71,7 @@ class AbstractDbtBaseOperator(BaseOperator, metaclass=ABCMeta):
"vars",
"models",
)
global_boolean_flags = (
"no_version_check",
"cache_selected_only",
"fail_fast",
"quiet",
"warn_error",
)
global_boolean_flags = ("no_version_check", "cache_selected_only", "fail_fast", "quiet", "warn_error")

intercept_flag = True

Expand Down Expand Up @@ -105,6 +102,7 @@ def __init__(
append_env: bool = False,
output_encoding: str = "utf-8",
skip_exit_code: int = 99,
partial_parse: bool = True,
cancel_query_on_kill: bool = True,
dbt_executable_path: str = get_system_dbt(),
dbt_cmd_flags: list[str] | None = None,
Expand All @@ -131,6 +129,7 @@ def __init__(
self.append_env = append_env
self.output_encoding = output_encoding
self.skip_exit_code = skip_exit_code
self.partial_parse = partial_parse
self.cancel_query_on_kill = cancel_query_on_kill
self.dbt_executable_path = dbt_executable_path
self.dbt_cmd_flags = dbt_cmd_flags
Expand Down Expand Up @@ -219,6 +218,9 @@ def build_cmd(

dbt_cmd.extend(self.dbt_cmd_global_flags)

if not self.partial_parse:
dbt_cmd.append("--no-partial-parse")

dbt_cmd.extend(self.base_cmd)

if self.indirect_selection:
Expand Down
16 changes: 11 additions & 5 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@

from sqlalchemy.orm import Session

from cosmos.constants import DEFAULT_OPENLINEAGE_NAMESPACE, OPENLINEAGE_PRODUCER
from cosmos.constants import (
DEFAULT_OPENLINEAGE_NAMESPACE,
OPENLINEAGE_PRODUCER,
DBT_TARGET_DIR_NAME,
DBT_PARTIAL_PARSE_FILE_NAME,
)
from cosmos.config import ProfileConfig
from cosmos.log import get_logger
from cosmos.operators.base import (
Expand All @@ -52,7 +57,7 @@
FullOutputSubprocessResult,
)
from cosmos.dbt.parser.output import extract_log_issues, parse_output
from cosmos.dbt.project import create_symlinks
from cosmos.dbt.project import create_symlinks, copy_msgpack_for_partial_parse

DBT_NO_TESTS_MSG = "Nothing to do"
DBT_WARN_MSG = "WARN"
Expand Down Expand Up @@ -208,6 +213,9 @@ def run_command(

create_symlinks(Path(self.project_dir), Path(tmp_project_dir), self.install_deps)

if self.partial_parse:
copy_msgpack_for_partial_parse(Path(self.project_dir), Path(tmp_project_dir))

with self.profile_config.ensure_profile() as profile_values:
(profile_path, env_vars) = profile_values
env.update(env_vars)
Expand Down Expand Up @@ -374,9 +382,7 @@ def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None

def on_kill(self) -> None:
if self.cancel_query_on_kill:
self.subprocess_hook.log.info("Sending SIGINT signal to process group")
if self.subprocess_hook.sub_process and hasattr(self.subprocess_hook.sub_process, "pid"):
os.killpg(os.getpgid(self.subprocess_hook.sub_process.pid), signal.SIGINT)
self.subprocess_hook.send_sigint()
else:
self.subprocess_hook.send_sigterm()

Expand Down
3 changes: 3 additions & 0 deletions docs/configuration/project-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ variables that should be used for rendering and execution. It takes the followin
will only be rendered at execution time, not at render time.
- ``env_vars``: (new in v1.3) A dictionary of environment variables used for rendering and execution. Rendering with
env vars is only supported when using ``RenderConfig.LoadMode.DBT_LS`` load mode.
- ``partial_parse``: (new in v1.4) If True, then attempt to use the ``partial_parse.msgpack`` if it exists. This is only used
for the ``LoadMode.DBT_LS`` load mode, and for the ``ExecutionMode.LOCAL`` and ``ExecutionMode.VIRTUALENV``
execution modes.

Project Config Example
----------------------
Expand Down
4 changes: 4 additions & 0 deletions docs/getting_started/execution-modes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ The ``local`` execution mode assumes a ``dbt`` binary is reachable within the Ai
If ``dbt`` was not installed as part of the Cosmos packages,
users can define a custom path to ``dbt`` by declaring the argument ``dbt_executable_path``.

By default, if Cosmos sees a ``partial_parse.msgpack`` in the target directory of the dbt project directory when using ``local`` execution, it will use this for partial parsing to speed up task execution.
This can be turned off by setting ``partial_parse=False`` in the ``ProjectConfig``.

When using the ``local`` execution mode, Cosmos converts Airflow Connections into a native ``dbt`` profiles file (``profiles.yml``).

Example of how to use, for instance, when ``dbt`` was installed together with Cosmos:
Expand All @@ -76,6 +79,7 @@ The ``virtualenv`` mode isolates the Airflow worker dependencies from ``dbt`` by
In this case, users are responsible for declaring which version of ``dbt`` they want to use by giving the argument ``py_requirements``. This argument can be set directly in operator instances or when instantiating ``DbtDag`` and ``DbtTaskGroup`` as part of ``operator_args``.

Similar to the ``local`` execution mode, Cosmos converts Airflow Connections into a way ``dbt`` understands them by creating a ``dbt`` profile file (``profiles.yml``).
Also similar to the ``local`` execution mode, Cosmos will by default attempt to use a ``partial_parse.msgpack`` if one exists to speed up parsing.

Some drawbacks of this approach:

Expand Down
28 changes: 28 additions & 0 deletions tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,34 @@ def test_load_via_dbt_ls_render_config_selector_arg_is_used(
assert ls_command[ls_command.index("--selector") + 1] == selector


@patch("cosmos.dbt.graph.Popen")
@patch("cosmos.dbt.graph.DbtGraph.update_node_dependency")
@patch("cosmos.config.RenderConfig.validate_dbt_command")
def test_load_via_dbt_ls_render_config_no_partial_parse(
mock_validate, mock_update_nodes, mock_popen, tmp_dbt_project_dir
):
"""Tests that --no-partial-parse appears when partial_parse=False."""
mock_popen().communicate.return_value = ("", "")
mock_popen().returncode = 0
project_config = ProjectConfig(partial_parse=False)
render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, load_method=LoadMode.DBT_LS)
profile_config = ProfileConfig(
profile_name="test",
target_name="test",
profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml",
)
execution_config = MagicMock()
dbt_graph = DbtGraph(
project=project_config,
render_config=render_config,
execution_config=execution_config,
profile_config=profile_config,
)
dbt_graph.load_via_dbt_ls()
ls_command = mock_popen.call_args.args[0]
assert "--no-partial-parse" in ls_command


@pytest.mark.parametrize("load_method", [LoadMode.DBT_MANIFEST, LoadMode.CUSTOM])
def test_load_method_with_unsupported_render_config_selector_arg(load_method):
"""Tests that error is raised when RenderConfig.selector is used with LoadMode.DBT_MANIFEST or LoadMode.CUSTOM."""
Expand Down
31 changes: 29 additions & 2 deletions tests/dbt/test_project.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from pathlib import Path
from cosmos.dbt.project import create_symlinks, environ
import os
from pathlib import Path
from unittest.mock import patch

import pytest

from cosmos.dbt.project import create_symlinks, copy_msgpack_for_partial_parse, environ

DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt"


Expand All @@ -17,6 +20,30 @@ def test_create_symlinks(tmp_path):
assert child.name not in ("logs", "target", "profiles.yml", "dbt_packages")


@pytest.mark.parametrize("exists", [True, False])
def test_copy_manifest_for_partial_parse(tmp_path, exists):
project_path = tmp_path / "project"
target_path = project_path / "target"
partial_parse_file = target_path / "partial_parse.msgpack"

target_path.mkdir(parents=True)

if exists:
partial_parse_file.write_bytes(b"")

tmp_dir = tmp_path / "tmp_dir"
tmp_dir.mkdir()

copy_msgpack_for_partial_parse(project_path, tmp_dir)

tmp_partial_parse_file = tmp_dir / "target" / "partial_parse.msgpack"

if exists:
assert tmp_partial_parse_file.exists()
else:
assert not tmp_partial_parse_file.exists()


@patch.dict(os.environ, {"VAR1": "value1", "VAR2": "value2"})
def test_environ_context_manager():
# Define the expected environment variables
Expand Down
Empty file added tests/hooks/__init__.py
Empty file.
70 changes: 70 additions & 0 deletions tests/hooks/test_subprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import MagicMock, patch
import signal

import pytest

from cosmos.hooks.subprocess import FullOutputSubprocessHook

OS_ENV_KEY = "SUBPROCESS_ENV_TEST"
OS_ENV_VAL = "this-is-from-os-environ"


@pytest.mark.parametrize(
"env,expected",
[
({"ABC": "123", "AAA": "456"}, {"ABC": "123", "AAA": "456", OS_ENV_KEY: ""}),
({}, {OS_ENV_KEY: ""}),
(None, {OS_ENV_KEY: OS_ENV_VAL}),
],
ids=["with env", "empty env", "no env"],
)
def test_env(env, expected):
"""
Test that env variables are exported correctly to the command environment.
When ``env`` is ``None``, ``os.environ`` should be passed to ``Popen``.
Otherwise, the variables in ``env`` should be available, and ``os.environ`` should not.
"""
hook = FullOutputSubprocessHook()

def build_cmd(keys, filename):
"""
Produce bash command to echo env vars into filename.
Will always echo the special test var named ``OS_ENV_KEY`` into the file to test whether
``os.environ`` is passed or not.
"""
return "\n".join(f"echo {k}=${k}>> {filename}" for k in [*keys, OS_ENV_KEY])

with TemporaryDirectory() as tmp_dir, patch.dict("os.environ", {OS_ENV_KEY: OS_ENV_VAL}):
tmp_file = Path(tmp_dir, "test.txt")
command = build_cmd(env and env.keys() or [], tmp_file.as_posix())
hook.run_command(command=["bash", "-c", command], env=env)
actual = dict([x.split("=") for x in tmp_file.read_text().splitlines()])
assert actual == expected


def test_subprocess_hook():
hook = FullOutputSubprocessHook()
result = hook.run_command(command=["bash", "-c", f'echo "foo"'])
assert result.exit_code == 0
assert result.output == "foo"
assert result.full_output == ["foo"]


@patch("os.getpgid", return_value=123)
@patch("os.killpg")
def test_send_sigint(mock_killpg, mock_getpgid):
hook = FullOutputSubprocessHook()
hook.sub_process = MagicMock()
hook.send_sigint()
mock_killpg.assert_called_with(123, signal.SIGINT)


@patch("os.getpgid", return_value=123)
@patch("os.killpg")
def test_send_sigterm(mock_killpg, mock_getpgid):
hook = FullOutputSubprocessHook()
hook.sub_process = MagicMock()
hook.send_sigterm()
mock_killpg.assert_called_with(123, signal.SIGTERM)
Loading