diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6889fb8514..af71efc1c3 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -121,8 +121,9 @@ jobs: env: AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ AIRFLOW_CONN_AIRFLOW_DB: postgres://postgres:postgres@0.0.0.0:5432/postgres - PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH AIRFLOW_CONN_DATABRICKS_DEFAULT: ${{ secrets.AIRFLOW_CONN_DATABRICKS_DEFAULT }} + AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0 + PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }} DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }} DATABRICKS_WAREHOUSE_ID: ${{ secrets.DATABRICKS_WAREHOUSE_ID }} @@ -192,6 +193,7 @@ jobs: AIRFLOW_CONN_AIRFLOW_DB: postgres://postgres:postgres@0.0.0.0:5432/postgres PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH AIRFLOW_CONN_DATABRICKS_DEFAULT: ${{ secrets.AIRFLOW_CONN_DATABRICKS_DEFAULT }} + AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0 DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }} DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }} DATABRICKS_WAREHOUSE_ID: ${{ secrets.DATABRICKS_WAREHOUSE_ID }} @@ -256,6 +258,7 @@ jobs: env: AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ AIRFLOW_CONN_AIRFLOW_DB: postgres://postgres:postgres@0.0.0.0:5432/postgres + AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0 PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH AIRFLOW_CONN_DATABRICKS_DEFAULT: ${{ secrets.AIRFLOW_CONN_DATABRICKS_DEFAULT }} DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }} diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f363097ac2..53f80df2f7 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -53,13 +53,13 @@ repos: - --py37-plus - --keep-runtime-typing - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.1.1 + rev: v0.1.3 hooks: - id: ruff args: - --fix - repo: https://github.com/psf/black - rev: 23.10.0 + rev: 23.10.1 hooks: - id: black args: [ "--config", "./pyproject.toml" ] @@ -76,6 +76,12 @@ repos: name: mypy-python additional_dependencies: [types-PyYAML, types-attrs, attrs, types-requests, types-python-dateutil, apache-airflow] files: ^cosmos + - repo: https://github.com/pycqa/flake8 + rev: 6.1.0 + hooks: + - id: flake8 + entry: pflake8 + additional_dependencies: [pyproject-flake8] ci: autofix_commit_msg: 🎨 [pre-commit.ci] Auto format from pre-commit.com hooks diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 683f9c55db..b30064613d 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,22 @@ Changelog ========= +1.2.2 (2023-11-06) +------------------ + +Bug fixes + +* Support ``ProjectConfig.dbt_project_path = None`` & different paths for Rendering and Execution by @MrBones757 in #634 +* Fix adding test nodes to DAGs built using ``LoadMethod.DBT_MANIFEST`` and ``LoadMethod.CUSTOM`` by @edgga in #615 + +Others + +* Add pre-commit hook for McCabe max complexity check and fix errors by @jbandoro in #629 +* Update contributing docs for running integration tests by @jbandoro in #638 +* Fix CI issue running integration tests by @tatiana in #640 and #644 +* pre-commit updates in #637 + + 1.2.1 (2023-10-25) ------------------ diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 905600cde8..4fd57f701c 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -5,7 +5,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.2.1" +__version__ = "1.2.2" from cosmos.airflow.dag import DbtDag from cosmos.airflow.task_group import DbtTaskGroup diff --git a/cosmos/config.py b/cosmos/config.py index 610e6e489d..87baba8645 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -4,7 +4,7 @@ import contextlib import tempfile -from dataclasses import dataclass, field +from dataclasses import InitVar, dataclass, field from pathlib import Path from typing import Any, Iterator, Callable @@ -31,6 +31,9 @@ class RenderConfig: :param select: A list of dbt select arguments (e.g. 'config.materialized:incremental') :param exclude: A list of dbt exclude arguments (e.g. 'tag:nightly') :param dbt_deps: Configure to run dbt deps when using dbt ls for dag parsing + :param node_converters: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``. + :param dbt_executable_path: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. Mutually Exclusive with ProjectConfig.dbt_project_path + :param dbt_project_path Configures the DBT project location accessible on the airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM`` """ emit_datasets: bool = True @@ -40,6 +43,13 @@ class RenderConfig: exclude: list[str] = field(default_factory=list) dbt_deps: bool = True node_converters: dict[DbtResourceType, Callable[..., Any]] | None = None + dbt_executable_path: str | Path = get_system_dbt() + dbt_project_path: InitVar[str | Path | None] = None + + project_path: Path | None = field(init=False) + + def __post_init__(self, dbt_project_path: str | Path | None) -> None: + self.project_path = Path(dbt_project_path) if dbt_project_path else None class ProjectConfig: @@ -72,11 +82,13 @@ def __init__( manifest_path: str | Path | None = None, project_name: str | None = None, ): + # Since we allow dbt_project_path to be defined in ExecutionConfig and RenderConfig + # dbt_project_path may not always be defined here. + # We do, however, still require that both manifest_path and project_name be defined, or neither be defined. if not dbt_project_path: - if not manifest_path or not project_name: + if manifest_path and not project_name or project_name and not manifest_path: raise CosmosValueError( - "ProjectConfig requires dbt_project_path and/or manifest_path to be defined." - " If only manifest_path is defined, project_name must also be defined." + "If ProjectConfig.dbt_project_path is not defined, ProjectConfig.manifest_path and ProjectConfig.project_name must be defined together, or both left undefined." ) if project_name: self.project_name = project_name @@ -210,10 +222,17 @@ class ExecutionConfig: :param execution_mode: The execution mode for dbt. Defaults to local :param test_indirect_selection: The mode to configure the test behavior when performing indirect selection. - :param dbt_executable_path: The path to the dbt executable. Defaults to dbt if - available on the path. + :param dbt_executable_path: The path to the dbt executable for runtime execution. Defaults to dbt if available on the path. + :param dbt_project_path Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path """ execution_mode: ExecutionMode = ExecutionMode.LOCAL test_indirect_selection: TestIndirectSelection = TestIndirectSelection.EAGER dbt_executable_path: str | Path = get_system_dbt() + + dbt_project_path: InitVar[str | Path | None] = None + + project_path: Path | None = field(init=False) + + def __post_init__(self, dbt_project_path: str | Path | None) -> None: + self.project_path = Path(dbt_project_path) if dbt_project_path else None diff --git a/cosmos/converter.py b/cosmos/converter.py index b1653ba525..dbc290271e 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -16,7 +16,6 @@ from cosmos.exceptions import CosmosValueError from cosmos.log import get_logger - logger = get_logger(__name__) @@ -92,8 +91,8 @@ def __init__( self, project_config: ProjectConfig, profile_config: ProfileConfig, - execution_config: ExecutionConfig = ExecutionConfig(), - render_config: RenderConfig = RenderConfig(), + execution_config: ExecutionConfig | None = None, + render_config: RenderConfig | None = None, dag: DAG | None = None, task_group: TaskGroup | None = None, operator_args: dict[str, Any] | None = None, @@ -103,19 +102,37 @@ def __init__( ) -> None: project_config.validate_project() - emit_datasets = render_config.emit_datasets - test_behavior = render_config.test_behavior - select = render_config.select - exclude = render_config.exclude - dbt_deps = render_config.dbt_deps - execution_mode = execution_config.execution_mode - test_indirect_selection = execution_config.test_indirect_selection - load_mode = render_config.load_method - dbt_executable_path = execution_config.dbt_executable_path - node_converters = render_config.node_converters - - if not project_config.dbt_project_path: - raise CosmosValueError("A Project Path in ProjectConfig is required for generating a Task Operators.") + if not execution_config: + execution_config = ExecutionConfig() + if not render_config: + render_config = RenderConfig() + + # Since we now support both project_config.dbt_project_path, render_config.project_path and execution_config.project_path + # We need to ensure that only one interface is being used. + if project_config.dbt_project_path and (render_config.project_path or execution_config.project_path): + raise CosmosValueError( + "ProjectConfig.dbt_project_path is mutually exclusive with RenderConfig.dbt_project_path and ExecutionConfig.dbt_project_path." + + "If using RenderConfig.dbt_project_path or ExecutionConfig.dbt_project_path, ProjectConfig.dbt_project_path should be None" + ) + + # If we are using the old interface, we should migrate it to the new interface + # This is safe to do now since we have validated which config interface we're using + if project_config.dbt_project_path: + render_config.project_path = project_config.dbt_project_path + execution_config.project_path = project_config.dbt_project_path + + # At this point, execution_config.project_path should always be non-null + if not execution_config.project_path: + raise CosmosValueError( + "ExecutionConfig.dbt_project_path is required for the execution of dbt tasks in all execution modes." + ) + + # We now have a guaranteed execution_config.project_path, but still need to process render_config.project_path + # We require render_config.project_path when we dont have a manifest + if not project_config.manifest_path and not render_config.project_path: + raise CosmosValueError( + "RenderConfig.dbt_project_path is required for rendering an airflow DAG from a DBT Graph if no manifest is provided." + ) profile_args = {} if profile_config.profile_mapping: @@ -136,36 +153,34 @@ def __init__( # We may want to consider defaulting this value in our actual ProjceConfig class? dbt_graph = DbtGraph( project=project_config, - exclude=exclude, - select=select, - dbt_cmd=dbt_executable_path, + render_config=render_config, + execution_config=execution_config, + dbt_cmd=render_config.dbt_executable_path, profile_config=profile_config, operator_args=operator_args, - dbt_deps=dbt_deps, ) - dbt_graph.load(method=load_mode, execution_mode=execution_mode) + dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode) task_args = { **operator_args, - # the following args may be only needed for local / venv: - "project_dir": project_config.dbt_project_path, + "project_dir": execution_config.project_path, "profile_config": profile_config, - "emit_datasets": emit_datasets, + "emit_datasets": render_config.emit_datasets, } - if dbt_executable_path: - task_args["dbt_executable_path"] = dbt_executable_path + if execution_config.dbt_executable_path: + task_args["dbt_executable_path"] = execution_config.dbt_executable_path - validate_arguments(select, exclude, profile_args, task_args) + validate_arguments(render_config.select, render_config.exclude, profile_args, task_args) build_airflow_graph( nodes=dbt_graph.filtered_nodes, dag=dag or (task_group and task_group.dag), task_group=task_group, - execution_mode=execution_mode, + execution_mode=execution_config.execution_mode, task_args=task_args, - test_behavior=test_behavior, - test_indirect_selection=test_indirect_selection, + test_behavior=render_config.test_behavior, + test_indirect_selection=execution_config.test_indirect_selection, dbt_project_name=project_config.project_name, on_warning_callback=on_warning_callback, - node_converters=node_converters, + node_converters=render_config.node_converters, ) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index c41ee07130..0322c8ac4a 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -10,7 +10,7 @@ from subprocess import PIPE, Popen from typing import Any -from cosmos.config import ProfileConfig, ProjectConfig +from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig from cosmos.constants import ( DBT_LOG_DIR_NAME, DBT_LOG_FILENAME, @@ -53,6 +53,64 @@ class DbtNode: has_test: bool = False +def create_symlinks(project_path: Path, tmp_dir: Path) -> None: + """Helper function to create symlinks to the dbt project files.""" + ignore_paths = (DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, "dbt_packages", "profiles.yml") + for child_name in os.listdir(project_path): + if child_name not in ignore_paths: + os.symlink(project_path / child_name, tmp_dir / child_name) + + +def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> str: + """Run a command in a subprocess, returning the stdout.""" + logger.info("Running command: `%s`", " ".join(command)) + logger.info("Environment variable keys: %s", env_vars.keys()) + process = Popen( + command, + stdout=PIPE, + stderr=PIPE, + cwd=tmp_dir, + universal_newlines=True, + env=env_vars, + ) + stdout, stderr = process.communicate() + returncode = process.returncode + + if 'Run "dbt deps" to install package dependencies' in stdout and command[1] == "ls": + raise CosmosLoadDbtException( + "Unable to run dbt ls command due to missing dbt_packages. Set RenderConfig.dbt_deps=True." + ) + + if returncode or "Error" in stdout: + details = stderr or stdout + raise CosmosLoadDbtException(f"Unable to run {command} due to the error:\n{details}") + + return stdout + + +def parse_dbt_ls_output(project_path: Path, ls_stdout: str) -> dict[str, DbtNode]: + """Parses the output of `dbt ls` into a dictionary of `DbtNode` instances.""" + nodes = {} + for line in ls_stdout.split("\n"): + try: + node_dict = json.loads(line.strip()) + except json.decoder.JSONDecodeError: + logger.debug("Skipped dbt ls line: %s", line) + else: + node = DbtNode( + name=node_dict.get("alias", node_dict["name"]), + unique_id=node_dict["unique_id"], + resource_type=DbtResourceType(node_dict["resource_type"]), + depends_on=node_dict.get("depends_on", {}).get("nodes", []), + file_path=project_path / node_dict["original_file_path"], + tags=node_dict["tags"], + config=node_dict["config"], + ) + nodes[node.unique_id] = node + logger.debug("Parsed dbt resource `%s` of type `%s`", node.unique_id, node.resource_type) + return nodes + + class DbtGraph: """ A dbt project graph (represented by `nodes` and `filtered_nodes`). @@ -64,9 +122,8 @@ class DbtGraph: dbt_graph = DbtGraph( project=ProjectConfig(dbt_project_path=DBT_PROJECT_PATH), - exclude=["*orders*"], - select=[], - dbt_cmd="/usr/local/bin/dbt", + render_config=RenderConfig(exclude=["*orders*"], select=[]), + dbt_cmd="/usr/local/bin/dbt" ) dbt_graph.load(method=LoadMode.DBT_LS, execution_mode=ExecutionMode.LOCAL) """ @@ -77,22 +134,17 @@ class DbtGraph: def __init__( self, project: ProjectConfig, + render_config: RenderConfig = RenderConfig(), + execution_config: ExecutionConfig = ExecutionConfig(), profile_config: ProfileConfig | None = None, - exclude: list[str] | None = None, - select: list[str] | None = None, dbt_cmd: str = get_system_dbt(), operator_args: dict[str, Any] | None = None, - dbt_deps: bool | None = True, ): self.project = project - self.exclude = exclude or [] - self.select = select or [] + self.render_config = render_config self.profile_config = profile_config + self.execution_config = execution_config self.operator_args = operator_args or {} - self.dbt_deps = dbt_deps - - # specific to loading using ls - self.dbt_deps = dbt_deps self.dbt_cmd = dbt_cmd def load( @@ -131,6 +183,31 @@ def load( else: load_method[method]() + def run_dbt_ls(self, project_path: Path, tmp_dir: Path, env_vars: dict[str, str]) -> dict[str, DbtNode]: + """Runs dbt ls command and returns the parsed nodes.""" + ls_command = [self.dbt_cmd, "ls", "--output", "json"] + + if self.render_config.exclude: + ls_command.extend(["--exclude", *self.render_config.exclude]) + + if self.render_config.select: + ls_command.extend(["--select", *self.render_config.select]) + + ls_command.extend(self.local_flags) + + stdout = run_command(ls_command, tmp_dir, env_vars) + + logger.debug("dbt ls output: %s", stdout) + log_filepath = self.log_dir / DBT_LOG_FILENAME + logger.debug("dbt logs available in: %s", log_filepath) + if log_filepath.exists(): + with open(log_filepath) as logfile: + for line in logfile: + logger.debug(line.strip()) + + nodes = parse_dbt_ls_output(project_path, stdout) + return nodes + def load_via_dbt_ls(self) -> None: """ This is the most accurate way of loading `dbt` projects and filtering them out, since it uses the `dbt` command @@ -143,40 +220,31 @@ def load_via_dbt_ls(self) -> None: * self.nodes * self.filtered_nodes """ - logger.info( - "Trying to parse the dbt project `%s` in `%s` using dbt ls...", - self.project.project_name, - self.project.dbt_project_path, - ) + logger.info(f"Trying to parse the dbt project in `{self.render_config.project_path}` using dbt ls...") + if not self.render_config.project_path or not self.execution_config.project_path: + raise CosmosLoadDbtException( + "Unable to load project via dbt ls without RenderConfig.dbt_project_path and ExecutionConfig.dbt_project_path" + ) - if not self.project.dbt_project_path or not self.profile_config: - raise CosmosLoadDbtException("Unable to load dbt project without project files and a profile config") + if not self.profile_config: + raise CosmosLoadDbtException("Unable to load project via dbt ls without a profile config.") if not shutil.which(self.dbt_cmd): raise CosmosLoadDbtException(f"Unable to find the dbt executable: {self.dbt_cmd}") - with self.profile_config.ensure_profile(use_mock_values=True) as profile_values: - (profile_path, env_vars) = profile_values - env = os.environ.copy() - env.update(env_vars) + with tempfile.TemporaryDirectory() as tmpdir: + logger.info( + f"Content of the dbt project dir {self.render_config.project_path}: `{os.listdir(self.render_config.project_path)}`" + ) + tmpdir_path = Path(tmpdir) + create_symlinks(self.render_config.project_path, tmpdir_path) - with tempfile.TemporaryDirectory() as tmpdir: - logger.info( - "Content of the dbt project dir <%s>: `%s`", - self.project.dbt_project_path, - os.listdir(self.project.dbt_project_path), - ) - logger.info("Creating symlinks from %s to `%s`", self.project.dbt_project_path, tmpdir) - # We create symbolic links to the original directory files and directories. - # This allows us to run the dbt command from within the temporary directory, outputting any necessary - # artifact and also allow us to run `dbt deps` - tmpdir_path = Path(tmpdir) - ignore_paths = (DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, "dbt_packages", "profiles.yml") - for child_name in os.listdir(self.project.dbt_project_path): - if child_name not in ignore_paths: - os.symlink(self.project.dbt_project_path / child_name, tmpdir_path / child_name) - - local_flags = [ + with self.profile_config.ensure_profile(use_mock_values=True) as profile_values: + (profile_path, env_vars) = profile_values + env = os.environ.copy() + env.update(env_vars) + + self.local_flags = [ "--project-dir", str(tmpdir), "--profiles-dir", @@ -186,92 +254,18 @@ def load_via_dbt_ls(self) -> None: "--target", self.profile_config.target_name, ] - log_dir = Path(env.get(DBT_LOG_PATH_ENVVAR) or tmpdir_path / DBT_LOG_DIR_NAME) - target_dir = Path(env.get(DBT_TARGET_PATH_ENVVAR) or tmpdir_path / DBT_TARGET_DIR_NAME) - env[DBT_LOG_PATH_ENVVAR] = str(log_dir) - env[DBT_TARGET_PATH_ENVVAR] = str(target_dir) + self.log_dir = Path(env.get(DBT_LOG_PATH_ENVVAR) or tmpdir_path / DBT_LOG_DIR_NAME) + self.target_dir = Path(env.get(DBT_TARGET_PATH_ENVVAR) or tmpdir_path / DBT_TARGET_DIR_NAME) + env[DBT_LOG_PATH_ENVVAR] = str(self.log_dir) + env[DBT_TARGET_PATH_ENVVAR] = str(self.target_dir) - if self.dbt_deps: + if self.render_config.dbt_deps: deps_command = [self.dbt_cmd, "deps"] - deps_command.extend(local_flags) - logger.info("Running command: `%s`", " ".join(deps_command)) - logger.info("Environment variable keys: %s", env.keys()) - process = Popen( - deps_command, - stdout=PIPE, - stderr=PIPE, - cwd=tmpdir, - universal_newlines=True, - env=env, - ) - stdout, stderr = process.communicate() - returncode = process.returncode + deps_command.extend(self.local_flags) + stdout = run_command(deps_command, tmpdir_path, env) logger.debug("dbt deps output: %s", stdout) - if returncode or "Error" in stdout: - details = stderr or stdout - raise CosmosLoadDbtException(f"Unable to run dbt deps command due to the error:\n{details}") - - ls_command = [self.dbt_cmd, "ls", "--output", "json"] - - if self.exclude: - ls_command.extend(["--exclude", *self.exclude]) - - if self.select: - ls_command.extend(["--select", *self.select]) - - ls_command.extend(local_flags) - - logger.info("Running command: `%s`", " ".join(ls_command)) - logger.info("Environment variable keys: %s", env.keys()) - - process = Popen( - ls_command, - stdout=PIPE, - stderr=PIPE, - cwd=tmpdir, - universal_newlines=True, - env=env, - ) - - stdout, stderr = process.communicate() - returncode = process.returncode - - logger.debug("dbt output: %s", stdout) - log_filepath = log_dir / DBT_LOG_FILENAME - logger.debug("dbt logs available in: %s", log_filepath) - if log_filepath.exists(): - with open(log_filepath) as logfile: - for line in logfile: - logger.debug(line.strip()) - - if 'Run "dbt deps" to install package dependencies' in stdout: - raise CosmosLoadDbtException( - "Unable to run dbt ls command due to missing dbt_packages. Set render_config.dbt_deps=True." - ) - - if returncode or "Error" in stdout: - details = stderr or stdout - raise CosmosLoadDbtException(f"Unable to run dbt ls command due to the error:\n{details}") - - nodes = {} - for line in stdout.split("\n"): - try: - node_dict = json.loads(line.strip()) - except json.decoder.JSONDecodeError: - logger.debug("Skipped dbt ls line: %s", line) - else: - node = DbtNode( - name=node_dict.get("alias", node_dict["name"]), - unique_id=node_dict["unique_id"], - resource_type=DbtResourceType(node_dict["resource_type"]), - depends_on=node_dict.get("depends_on", {}).get("nodes", []), - file_path=self.project.dbt_project_path / node_dict["original_file_path"], - tags=node_dict["tags"], - config=node_dict["config"], - ) - nodes[node.unique_id] = node - logger.debug("Parsed dbt resource `%s` of type `%s`", node.unique_id, node.resource_type) + nodes = self.run_dbt_ls(self.execution_config.project_path, tmpdir_path, env) self.nodes = nodes self.filtered_nodes = nodes @@ -295,14 +289,16 @@ def load_via_custom_parser(self) -> None: """ logger.info("Trying to parse the dbt project `%s` using a custom Cosmos method...", self.project.project_name) - if not self.project.dbt_project_path or not self.project.models_path or not self.project.seeds_path: - raise CosmosLoadDbtException("Unable to load dbt project without project files") + if not self.render_config.project_path or not self.execution_config.project_path: + raise CosmosLoadDbtException( + "Unable to load dbt project without RenderConfig.dbt_project_path and ExecutionConfig.dbt_project_path" + ) project = LegacyDbtProject( - project_name=self.project.dbt_project_path.stem, - dbt_root_path=self.project.dbt_project_path.parent.as_posix(), - dbt_models_dir=self.project.models_path.stem, - dbt_seeds_dir=self.project.seeds_path.stem, + project_name=self.render_config.project_path.stem, + dbt_root_path=self.render_config.project_path.parent.as_posix(), + dbt_models_dir=self.project.models_path.stem if self.project.models_path else "models", + dbt_seeds_dir=self.project.seeds_path.stem if self.project.seeds_path else "seeds", operator_args=self.operator_args, ) nodes = {} @@ -316,7 +312,11 @@ def load_via_custom_parser(self) -> None: unique_id=model_name, resource_type=DbtResourceType(model.type.value), depends_on=list(model.config.upstream_models), - file_path=model.path, + file_path=Path( + model.path.as_posix().replace( + self.render_config.project_path.as_posix(), self.execution_config.project_path.as_posix() + ) + ), tags=[], config=config, ) @@ -324,7 +324,10 @@ def load_via_custom_parser(self) -> None: self.nodes = nodes self.filtered_nodes = select_nodes( - project_dir=self.project.dbt_project_path, nodes=nodes, select=self.select, exclude=self.exclude + project_dir=self.execution_config.project_path, + nodes=nodes, + select=self.render_config.select, + exclude=self.render_config.exclude, ) self.update_node_dependency() @@ -351,6 +354,9 @@ def load_from_dbt_manifest(self) -> None: if not self.project.is_manifest_available(): raise CosmosLoadDbtException(f"Unable to load manifest using {self.project.manifest_path}") + if not self.execution_config.project_path: + raise CosmosLoadDbtException("Unable to load manifest without ExecutionConfig.dbt_project_path") + nodes = {} with open(self.project.manifest_path) as fp: # type: ignore[arg-type] manifest = json.load(fp) @@ -362,9 +368,7 @@ def load_from_dbt_manifest(self) -> None: unique_id=unique_id, resource_type=DbtResourceType(node_dict["resource_type"]), depends_on=node_dict.get("depends_on", {}).get("nodes", []), - file_path=self.project.dbt_project_path / Path(node_dict["original_file_path"]) - if self.project.dbt_project_path - else Path(node_dict["original_file_path"]), + file_path=self.execution_config.project_path / Path(node_dict["original_file_path"]), tags=node_dict["tags"], config=node_dict["config"], ) @@ -373,7 +377,10 @@ def load_from_dbt_manifest(self) -> None: self.nodes = nodes self.filtered_nodes = select_nodes( - project_dir=self.project.dbt_project_path, nodes=nodes, select=self.select, exclude=self.exclude + project_dir=self.execution_config.project_path, + nodes=nodes, + select=self.render_config.select, + exclude=self.render_config.exclude, ) self.update_node_dependency() diff --git a/cosmos/dbt/parser/project.py b/cosmos/dbt/parser/project.py index e154bb0ed4..278b1a0f73 100644 --- a/cosmos/dbt/parser/project.py +++ b/cosmos/dbt/parser/project.py @@ -137,17 +137,14 @@ def __post_init__(self) -> None: """ Parses the file and extracts metadata (dependencies, tags, etc) """ - # first, get an empty config - config = DbtModelConfig() - var_args: Dict[str, Any] = self.operator_args.get("vars", {}) + if self.type == DbtModelType.DBT_SEED or self.type == DbtModelType.DBT_TEST: + return - if self.type == DbtModelType.DBT_MODEL: - # get the code from the file - code = self.path.read_text() + config = DbtModelConfig() + self.var_args: Dict[str, Any] = self.operator_args.get("vars", {}) + code = self.path.read_text() - # we remove first and last line if the code is a snapshot - elif self.type == DbtModelType.DBT_SNAPSHOT: - code = self.path.read_text() + if self.type == DbtModelType.DBT_SNAPSHOT: snapshot_name = code.split("{%")[1] snapshot_name = snapshot_name.split("%}")[0] snapshot_name = snapshot_name.split(" ")[2] @@ -156,56 +153,74 @@ def __post_init__(self) -> None: code = code.split("%}")[1] code = code.split("{%")[0] - elif self.type == DbtModelType.DBT_SEED or self.type == DbtModelType.DBT_TEST: - return - if self.path.suffix == PYTHON_FILE_SUFFIX: config.upstream_models = config.upstream_models.union(set(extract_python_file_upstream_requirements(code))) else: - # get the dependencies - env = jinja2.Environment() - jinja2_ast = env.parse(code) - # iterate over the jinja nodes to extract info - for base_node in jinja2_ast.find_all(jinja2.nodes.Call): - if hasattr(base_node.node, "name"): - try: - # check we have a ref - this indicates a dependency - if base_node.node.name == "ref": - # if it is, get the first argument - first_arg = base_node.args[0] - # if it contains vars, render the value of the var - if isinstance(first_arg, jinja2.nodes.Concat): - value = "" - for node in first_arg.nodes: - if isinstance(node, jinja2.nodes.Const): - value += node.value - elif ( - isinstance(node, jinja2.nodes.Call) - and isinstance(node.node, jinja2.nodes.Name) - and isinstance(node.args[0], jinja2.nodes.Const) - and node.node.name == "var" - ): - value += var_args[node.args[0].value] - config.upstream_models.add(value) - elif isinstance(first_arg, jinja2.nodes.Const): - # and add it to the config - config.upstream_models.add(first_arg.value) - - # check if we have a config - this could contain tags - if base_node.node.name == "config": - # if it is, check if any kwargs are tags - for kwarg in base_node.kwargs: - for selector in self.config.config_types: - extracted_config = self._extract_config(kwarg=kwarg, config_name=selector) - config.config_selectors |= ( - set(extracted_config) if isinstance(extracted_config, (str, List)) else set() - ) - except KeyError as e: - logger.warning(f"Could not add upstream model for config in {self.path}: {e}") - - # set the config and set the parsed file flag to true + upstream_models, extracted_config = self.extract_sql_file_requirements(code) + config.upstream_models = config.upstream_models.union(set(upstream_models)) + config.config_selectors |= extracted_config + self.config = config + def extract_sql_file_requirements(self, code: str) -> tuple[list[str], set[str]]: + """Extracts upstream models and config selectors from a dbt sql file.""" + # get the dependencies + env = jinja2.Environment() + jinja2_ast = env.parse(code) + upstream_models = [] + config_selectors = set() + # iterate over the jinja nodes to extract info + for base_node in jinja2_ast.find_all(jinja2.nodes.Call): + if hasattr(base_node.node, "name"): + try: + # check we have a ref - this indicates a dependency + if base_node.node.name == "ref": + upstream_model = self._parse_jinja_ref_node(base_node) + if upstream_model: + upstream_models.append(upstream_model) + # check if we have a config - this could contain tags + if base_node.node.name == "config": + config_selectors |= self._parse_jinja_config_node(base_node) + except KeyError as e: + logger.warning(f"Could not add upstream model for config in {self.path}: {e}") + + return upstream_models, config_selectors + + def _parse_jinja_ref_node(self, base_node: jinja2.nodes.Call) -> str | None: + """Parses a jinja ref node.""" + # get the first argument + first_arg = base_node.args[0] + value = None + # if it contains vars, render the value of the var + if isinstance(first_arg, jinja2.nodes.Concat): + value = "" + for node in first_arg.nodes: + if isinstance(node, jinja2.nodes.Const): + value += node.value + elif ( + isinstance(node, jinja2.nodes.Call) + and isinstance(node.node, jinja2.nodes.Name) + and isinstance(node.args[0], jinja2.nodes.Const) + and node.node.name == "var" + ): + value += self.var_args[node.args[0].value] + elif isinstance(first_arg, jinja2.nodes.Const): + # and add it to the config + value = first_arg.value + + return value + + def _parse_jinja_config_node(self, base_node: jinja2.nodes.Call) -> set[str]: + """Parses a jinja config node.""" + # check if any kwargs are tags + selector_config = set() + for kwarg in base_node.kwargs: + for config_name in self.config.config_types: + if hasattr(kwarg, "key") and kwarg.key == config_name: + extracted_config = self._extract_config(kwarg, config_name) + selector_config |= set(extracted_config) if isinstance(extracted_config, (str, List)) else set() + return selector_config + # TODO following needs coverage: def _extract_config(self, kwarg: Any, config_name: str) -> Any: if hasattr(kwarg, "key") and kwarg.key == config_name: @@ -354,47 +369,21 @@ def _handle_config_file(self, path: Path) -> None: if not config_dict: return - for model in config_dict.get("models", []): - model_name = model.get("name") + for model_config in config_dict.get("models", []): + model_name = model_config.get("name") # if the model doesn't exist, we can't do anything if not model_name: continue - # tests - for column in model.get("columns", []): - for test in column.get("tests", []): - if not column.get("name"): - continue - - # Get the test name - if not isinstance(test, str): - test = list(test.keys())[0] - - test_model = DbtModel( - name=f"{test}_{column['name']}_{model_name}", - type=DbtModelType.DBT_TEST, - path=path, - operator_args=self.operator_args, - config=DbtModelConfig(upstream_models=set({model_name})), - ) - - self.tests[test_model.name] = test_model + model_tests = self._extract_model_tests(model_name, model_config, path) + self.tests.update(model_tests) # config_selectors if model_name not in self.models: continue - config_selectors = [] - for selector in DbtModelConfig.config_types: - config_value = model.get("config", {}).get(selector) - if config_value: - if isinstance(config_value, str): - config_selectors.append(f"{selector}:{config_value}") - else: - for item in config_value: - if item: - config_selectors.append(f"{selector}:{item}") + config_selectors = self._extract_config_selectors(model_config) # dbt default ensures "materialized:view" is set for all models if nothing is specified so that it will # work in a select/exclude list @@ -407,3 +396,40 @@ def _handle_config_file(self, path: Path) -> None: # then, get the model and merge the configs model = self.models[model_name] model.config = model.config + DbtModelConfig(config_selectors=set(config_selectors)) + + def _extract_model_tests( + self, model_name: str, model_config: dict[str, list[dict[str, dict[str, list[str]]]]], path: Path + ) -> dict[str, DbtModel]: + """Extracts tests from a dbt config file model.""" + tests = {} + for column in model_config.get("columns", []): + for test in column.get("tests", []): + if not column.get("name"): + continue + # Get the test name + if not isinstance(test, str): + test = list(test.keys())[0] + + test_model = DbtModel( + name=f"{test}_{column['name']}_{model_name}", + type=DbtModelType.DBT_TEST, + path=path, + operator_args=self.operator_args, + config=DbtModelConfig(upstream_models=set({model_name})), + ) + tests[test_model.name] = test_model + return tests + + def _extract_config_selectors(self, model_config: dict[str, dict[str, str | list[str]]]) -> list[str]: + """Extracts config selectors from a dbt config file model.""" + config_selectors = [] + for selector in DbtModelConfig.config_types: + config_value = model_config.get("config", {}).get(selector) + if config_value: + if isinstance(config_value, str): + config_selectors.append(f"{selector}:{config_value}") + else: + for item in config_value: + if item: + config_selectors.append(f"{selector}:{item}") + return config_selectors diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index 0e6034a0b2..c7316dc75e 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -2,7 +2,7 @@ from pathlib import Path import copy -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from cosmos.constants import DbtResourceType from cosmos.exceptions import CosmosValueError @@ -84,69 +84,96 @@ def __repr__(self) -> str: return f"SelectorConfig(paths={self.paths}, tags={self.tags}, config={self.config}, other={self.other})" -def select_nodes_ids_by_intersection(nodes: dict[str, DbtNode], config: SelectorConfig) -> set[str]: +class NodeSelector: """ - Return a list of node ids which matches the configuration defined in config. + Class to select nodes based on a selector config. :param nodes: Dictionary mapping dbt nodes (node.unique_id to node) :param config: User-defined select statements - - References: - https://docs.getdbt.com/reference/node-selection/syntax - https://docs.getdbt.com/reference/node-selection/yaml-selectors """ - if config.is_empty: - return set(nodes.keys()) - selected_nodes = set() - visited_nodes = set() + def __init__(self, nodes: dict[str, DbtNode], config: SelectorConfig) -> None: + self.nodes = nodes + self.config = config + + def select_nodes_ids_by_intersection(self) -> set[str]: + """ + Return a list of node ids which matches the configuration defined in config. + + References: + https://docs.getdbt.com/reference/node-selection/syntax + https://docs.getdbt.com/reference/node-selection/yaml-selectors + """ + if self.config.is_empty: + return set(self.nodes.keys()) + + self.selected_nodes: set[str] = set() + self.visited_nodes: set[str] = set() - def should_include_node(node_id: str, node: DbtNode) -> bool: + for node_id, node in self.nodes.items(): + if self._should_include_node(node_id, node): + self.selected_nodes.add(node_id) + + return self.selected_nodes + + def _should_include_node(self, node_id: str, node: DbtNode) -> bool: "Checks if a single node should be included. Only runs once per node with caching." - if node_id in visited_nodes: - return node_id in selected_nodes + if node_id in self.visited_nodes: + return node_id in self.selected_nodes + + self.visited_nodes.add(node_id) - visited_nodes.add(node_id) + if node.resource_type == DbtResourceType.TEST: + node.tags = getattr(self.nodes.get(node.depends_on[0]), "tags", []) - if config.tags: - if not (set(config.tags) <= set(node.tags)): - return False + if not self._is_tags_subset(node): + return False node_config = {key: value for key, value in node.config.items() if key in SUPPORTED_CONFIG} - config_tags = config.config.get("tags") - if config_tags and config_tags not in node_config.get("tags", []): + + if not self._is_config_subset(node_config): return False # Remove 'tags' as they've already been filtered for - config_copy = copy.deepcopy(config.config) + config_copy = copy.deepcopy(self.config.config) config_copy.pop("tags", None) node_config.pop("tags", None) if not (config_copy.items() <= node_config.items()): return False - if config.paths: - for filter_path in config.paths: - if filter_path in node.file_path.parents or filter_path == node.file_path: - return True + if self.config.paths and not self._is_path_matching(node): + return False - # if it's a test coming from a schema.yml file, check the model's file_path - if node.resource_type == DbtResourceType.TEST and node.file_path.name == "schema.yml": - # try to get the corresponding model from node.depends_on - if len(node.depends_on) == 1: - model_node = nodes.get(node.depends_on[0]) - if model_node: - return should_include_node(node.depends_on[0], model_node) + return True + def _is_tags_subset(self, node: DbtNode) -> bool: + """Checks if the node's tags are a subset of the config's tags.""" + if not (set(self.config.tags) <= set(node.tags)): return False + return True + def _is_config_subset(self, node_config: dict[str, Any]) -> bool: + """Checks if the node's config is a subset of the config's config.""" + config_tags = self.config.config.get("tags") + if config_tags and config_tags not in node_config.get("tags", []): + return False return True - for node_id, node in nodes.items(): - if should_include_node(node_id, node): - selected_nodes.add(node_id) + def _is_path_matching(self, node: DbtNode) -> bool: + """Checks if the node's path is a subset of the config's paths.""" + for filter_path in self.config.paths: + if filter_path in node.file_path.parents or filter_path == node.file_path: + return True - return selected_nodes + # if it's a test coming from a schema.yml file, check the model's file_path + if node.resource_type == DbtResourceType.TEST and node.file_path.name == "schema.yml": + # try to get the corresponding model from node.depends_on + if len(node.depends_on) == 1: + model_node = self.nodes.get(node.depends_on[0]) + if model_node: + return self._should_include_node(node.depends_on[0], model_node) + return False def retrieve_by_label(statement_list: list[str], label: str) -> set[str]: @@ -201,7 +228,8 @@ def select_nodes( for statement in select: config = SelectorConfig(project_dir, statement) - select_ids = select_nodes_ids_by_intersection(nodes, config) + node_selector = NodeSelector(nodes, config) + select_ids = node_selector.select_nodes_ids_by_intersection() subset_ids = subset_ids.union(set(select_ids)) if select: @@ -212,7 +240,8 @@ def select_nodes( exclude_ids: set[str] = set() for statement in exclude: config = SelectorConfig(project_dir, statement) - exclude_ids = exclude_ids.union(set(select_nodes_ids_by_intersection(nodes, config))) + node_selector = NodeSelector(nodes, config) + exclude_ids = exclude_ids.union(set(node_selector.select_nodes_ids_by_intersection())) subset_ids = set(nodes_ids) - set(exclude_ids) return {id_: nodes[id_] for id_ in subset_ids} diff --git a/dev/dags/cosmos_manifest_example.py b/dev/dags/cosmos_manifest_example.py index 67a9cb3ce1..c94ea41a2e 100644 --- a/dev/dags/cosmos_manifest_example.py +++ b/dev/dags/cosmos_manifest_example.py @@ -6,7 +6,7 @@ from datetime import datetime from pathlib import Path -from cosmos import DbtDag, ProjectConfig, ProfileConfig, RenderConfig, LoadMode +from cosmos import DbtDag, ProjectConfig, ProfileConfig, RenderConfig, LoadMode, ExecutionConfig from cosmos.profiles import PostgresUserPasswordProfileMapping DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" @@ -25,12 +25,12 @@ cosmos_manifest_example = DbtDag( # dbt/cosmos-specific parameters project_config=ProjectConfig( - dbt_project_path=DBT_ROOT_PATH / "jaffle_shop", manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json", project_name="jaffle_shop", ), profile_config=profile_config, - render_config=RenderConfig(load_method=LoadMode.DBT_MANIFEST, select=["path:models/customers.sql"]), + render_config=RenderConfig(load_method=LoadMode.DBT_MANIFEST, select=["path:seeds/raw_customers.csv"]), + execution_config=ExecutionConfig(dbt_project_path=DBT_ROOT_PATH / "jaffle_shop"), operator_args={"install_deps": True}, # normal dag parameters schedule_interval="@daily", diff --git a/docs/configuration/execution-config.rst b/docs/configuration/execution-config.rst index d1094107c7..c118590d85 100644 --- a/docs/configuration/execution-config.rst +++ b/docs/configuration/execution-config.rst @@ -1,5 +1,12 @@ Execution Config ================== -Cosmos supports multiple ways of executing your dbt models. -For more information, see the `execution modes <../getting_started/execution-modes.html>`_ page. +Cosmos aims to give you control over how your dbt project is executed when running in airflow. +It does this by exposing a ``cosmos.config.ExecutionConfig`` class that you can use to configure how your DAGs are executed. + +The ``ExecutionConfig`` class takes the following arguments: + +- ``execution_mode``: The way dbt is run when executing within airflow. For more information, see the `execution modes <../getting_started/execution-modes.html>`_ page. +- ``test_indirect_selection``: The mode to configure the test behavior when performing indirect selection. +- ``dbt_executable_path``: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. +- ``dbt_project_path``: Configures the DBT project location accessible on their airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM`` diff --git a/docs/configuration/render-config.rst b/docs/configuration/render-config.rst index 04e7f4c025..de0a08cdbe 100644 --- a/docs/configuration/render-config.rst +++ b/docs/configuration/render-config.rst @@ -11,7 +11,10 @@ The ``RenderConfig`` class takes the following arguments: - ``test_behavior``: how to run tests. Defaults to running a model's tests immediately after the model is run. For more information, see the `Testing Behavior `_ section. - ``load_method``: how to load your dbt project. See `Parsing Methods `_ for more information. - ``select`` and ``exclude``: which models to include or exclude from your DAGs. See `Selecting & Excluding `_ for more information. +- ``dbt_deps``: A Boolean to run dbt deps when using dbt ls for dag parsing. Default True - ``node_converters``: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``. Find more information below. +- ``dbt_executable_path``: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. +- ``dbt_project_path``: Configures the DBT project location accessible on their airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM`` Customizing how nodes are rendered (experimental) ------------------------------------------------- diff --git a/docs/contributing.rst b/docs/contributing.rst index af4ffe5854..f875538839 100644 --- a/docs/contributing.rst +++ b/docs/contributing.rst @@ -111,6 +111,16 @@ To run the integration tests for the first time, use: export AIRFLOW_HOME=`pwd` export AIRFLOW_CONN_AIRFLOW_DB=postgres://postgres:postgres@0.0.0.0:5432/postgres + export DATABRICKS_HOST='' + export DATABRICKS_TOKEN='' + export DATABRICKS_WAREHOUSE_ID='' + export DATABRICKS_CLUSTER_ID='' + export POSTGRES_PORT=5432 + export POSTGRES_SCHEMA=public + export POSTGRES_DB=postgres + export POSTGRES_PASSWORD=postgres + export POSTGRES_USER=postgres + export POSTGRES_HOST=localhost hatch run tests.py3.8-2.5:test-integration-setup hatch run tests.py3.8-2.5:test-integration diff --git a/pyproject.toml b/pyproject.toml index a68253d15a..97c2ec66cf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -142,7 +142,8 @@ dependencies = [ "types-attrs", "types-requests", "types-python-dateutil", - "apache-airflow" + "apache-airflow", + "Werkzeug<3.0.0", ] [[tool.hatch.envs.tests.matrix]] @@ -245,3 +246,7 @@ line-length = 120 [tool.distutils.bdist_wheel] universal = true + +[tool.flake8] +max-complexity = 10 +select = "C" diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 3927bbfdd2..b108878fc9 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -5,9 +5,17 @@ import pytest -from cosmos.config import ProfileConfig, ProjectConfig +from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig from cosmos.constants import DbtResourceType, ExecutionMode -from cosmos.dbt.graph import CosmosLoadDbtException, DbtGraph, LoadMode +from cosmos.dbt.graph import ( + CosmosLoadDbtException, + DbtGraph, + DbtNode, + LoadMode, + create_symlinks, + run_command, + parse_dbt_ls_output, +) from cosmos.profiles import PostgresUserPasswordProfileMapping DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt" @@ -48,7 +56,14 @@ def test_load_via_manifest_with_exclude(project_name, manifest_filepath, model_f target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - dbt_graph = DbtGraph(project=project_config, profile_config=profile_config, exclude=["config.materialized:table"]) + render_config = RenderConfig(exclude=["config.materialized:table"]) + execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) + dbt_graph = DbtGraph( + project=project_config, + execution_config=execution_config, + profile_config=profile_config, + render_config=render_config, + ) dbt_graph.load_from_dbt_manifest() assert len(dbt_graph.nodes) == 28 @@ -197,8 +212,12 @@ def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder(mock_pop assert not (tmp_dbt_project_dir / "logs").exists() project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, + render_config=render_config, + execution_config=execution_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -220,10 +239,14 @@ def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder(mock_pop @pytest.mark.integration def test_load_via_dbt_ls_with_exclude(): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, select=["*customers*"], exclude=["*orders*"] + ) + execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, - select=["*customers*"], - exclude=["*orders*"], + render_config=render_config, + execution_config=execution_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -265,8 +288,12 @@ def test_load_via_dbt_ls_with_exclude(): @pytest.mark.parametrize("project_name", ("jaffle_shop", "jaffle_shop_python")) def test_load_via_dbt_ls_without_exclude(project_name): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name) + render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, + render_config=render_config, + execution_config=execution_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -284,30 +311,48 @@ def test_load_via_dbt_ls_without_exclude(project_name): def test_load_via_custom_without_project_path(): project_config = ProjectConfig(manifest_path=SAMPLE_MANIFEST, project_name="test") - dbt_graph = DbtGraph(dbt_cmd="/inexistent/dbt", project=project_config) + execution_config = ExecutionConfig() + render_config = RenderConfig() + dbt_graph = DbtGraph( + dbt_cmd="/inexistent/dbt", + project=project_config, + execution_config=execution_config, + render_config=render_config, + ) with pytest.raises(CosmosLoadDbtException) as err_info: dbt_graph.load_via_custom_parser() - expected = "Unable to load dbt project without project files" + expected = "Unable to load dbt project without RenderConfig.dbt_project_path and ExecutionConfig.dbt_project_path" assert err_info.value.args[0] == expected def test_load_via_dbt_ls_without_profile(): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) - dbt_graph = DbtGraph(dbt_cmd="/inexistent/dbt", project=project_config) + execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + dbt_graph = DbtGraph( + dbt_cmd="/inexistent/dbt", + project=project_config, + execution_config=execution_config, + render_config=render_config, + ) with pytest.raises(CosmosLoadDbtException) as err_info: dbt_graph.load_via_dbt_ls() - expected = "Unable to load dbt project without project files and a profile config" + expected = "Unable to load project via dbt ls without a profile config." assert err_info.value.args[0] == expected def test_load_via_dbt_ls_with_invalid_dbt_path(): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) with patch("pathlib.Path.exists", return_value=True): dbt_graph = DbtGraph( dbt_cmd="/inexistent/dbt", project=project_config, + execution_config=execution_config, + render_config=render_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -327,11 +372,12 @@ def test_load_via_dbt_ls_with_invalid_dbt_path(): def test_load_via_dbt_ls_with_sources(load_method): project_name = "simple" dbt_graph = DbtGraph( - dbt_deps=False, project=ProjectConfig( dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name, manifest_path=SAMPLE_MANIFEST_SOURCE if load_method == "load_from_dbt_manifest" else None, ), + render_config=RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name, dbt_deps=False), + execution_config=ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name), profile_config=ProfileConfig( profile_name="simple", target_name="dev", @@ -347,9 +393,12 @@ def test_load_via_dbt_ls_with_sources(load_method): @pytest.mark.integration def test_load_via_dbt_ls_without_dbt_deps(): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, dbt_deps=False) + execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( - dbt_deps=False, project=project_config, + render_config=render_config, + execution_config=execution_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -363,7 +412,7 @@ def test_load_via_dbt_ls_without_dbt_deps(): with pytest.raises(CosmosLoadDbtException) as err_info: dbt_graph.load_via_dbt_ls() - expected = "Unable to run dbt ls command due to missing dbt_packages. Set render_config.dbt_deps=True." + expected = "Unable to run dbt ls command due to missing dbt_packages. Set RenderConfig.dbt_deps=True." assert err_info.value.args[0] == expected @@ -374,8 +423,12 @@ def test_load_via_dbt_ls_with_zero_returncode_and_non_empty_stderr(mock_popen, t mock_popen().returncode = 0 project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, + render_config=render_config, + execution_config=execution_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -396,8 +449,12 @@ def test_load_via_dbt_ls_with_non_zero_returncode(mock_popen): mock_popen().returncode = 1 project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, + render_config=render_config, + execution_config=execution_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -407,20 +464,22 @@ def test_load_via_dbt_ls_with_non_zero_returncode(mock_popen): ), ), ) - with pytest.raises(CosmosLoadDbtException) as err_info: + expected = r"Unable to run \['.+dbt', 'deps', .*\] due to the error:\nSome stderr message" + with pytest.raises(CosmosLoadDbtException, match=expected): dbt_graph.load_via_dbt_ls() - expected = "Unable to run dbt deps command due to the error:\nSome stderr message" - assert err_info.value.args[0] == expected - @pytest.mark.integration @patch("cosmos.dbt.graph.Popen.communicate", return_value=("Some Runtime Error", "")) def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate): # It may seem strange, but at least until dbt 1.6.0, there are circumstances when it outputs errors to stdout project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, + render_config=render_config, + execution_config=execution_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -430,23 +489,29 @@ def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate): ), ), ) - with pytest.raises(CosmosLoadDbtException) as err_info: + expected = r"Unable to run \['.+dbt', 'deps', .*\] due to the error:\nSome Runtime Error" + with pytest.raises(CosmosLoadDbtException, match=expected): dbt_graph.load_via_dbt_ls() - expected = "Unable to run dbt deps command due to the error:\nSome Runtime Error" - assert err_info.value.args[0] == expected mock_popen_communicate.assert_called_once() @pytest.mark.parametrize("project_name", ("jaffle_shop", "jaffle_shop_python")) def test_load_via_load_via_custom_parser(project_name): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name) + execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) profile_config = ProfileConfig( profile_name="test", target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) + dbt_graph = DbtGraph( + project=project_config, + profile_config=profile_config, + render_config=render_config, + execution_config=execution_config, + ) dbt_graph.load_via_custom_parser() @@ -459,12 +524,13 @@ def test_update_node_dependency_called(mock_update_node_dependency): project_config = ProjectConfig( dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, manifest_path=SAMPLE_MANIFEST ) + execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) profile_config = ProfileConfig( profile_name="test", target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) + dbt_graph = DbtGraph(project=project_config, execution_config=execution_config, profile_config=profile_config) dbt_graph.load() assert mock_update_node_dependency.called @@ -479,7 +545,8 @@ def test_update_node_dependency_target_exist(): target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) + execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) + dbt_graph = DbtGraph(project=project_config, execution_config=execution_config, profile_config=profile_config) dbt_graph.load() for _, nodes in dbt_graph.nodes.items(): @@ -497,13 +564,47 @@ def test_update_node_dependency_test_not_exist(): target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - dbt_graph = DbtGraph(project=project_config, profile_config=profile_config, exclude=["config.materialized:test"]) + render_config = RenderConfig(exclude=["config.materialized:test"]) + execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) + dbt_graph = DbtGraph( + project=project_config, + execution_config=execution_config, + profile_config=profile_config, + render_config=render_config, + ) dbt_graph.load_from_dbt_manifest() for _, nodes in dbt_graph.filtered_nodes.items(): assert nodes.has_test is False +def test_tag_selected_node_test_exist(): + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, manifest_path=SAMPLE_MANIFEST + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + render_config = RenderConfig(select=["tag:test_tag"]) + execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) + dbt_graph = DbtGraph( + project=project_config, + execution_config=execution_config, + profile_config=profile_config, + render_config=render_config, + ) + dbt_graph.load_from_dbt_manifest() + + assert len(dbt_graph.filtered_nodes) > 0 + + for _, node in dbt_graph.filtered_nodes.items(): + assert node.tags == ["test_tag"] + if node.resource_type == DbtResourceType.MODEL: + assert node.has_test is True + + @pytest.mark.integration @pytest.mark.parametrize("load_method", ["load_via_dbt_ls", "load_from_dbt_manifest"]) def test_load_dbt_ls_and_manifest_with_model_version(load_method): @@ -512,6 +613,8 @@ def test_load_dbt_ls_and_manifest_with_model_version(load_method): dbt_project_path=DBT_PROJECTS_ROOT_DIR / "model_version", manifest_path=SAMPLE_MANIFEST_MODEL_VERSION if load_method == "load_from_dbt_manifest" else None, ), + render_config=RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / "model_version"), + execution_config=ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / "model_version"), profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -554,3 +657,59 @@ def test_load_dbt_ls_and_manifest_with_model_version(load_method): "model.jaffle_shop.stg_orders.v1", "model.jaffle_shop.stg_payments", } == set(dbt_graph.nodes["model.jaffle_shop.orders"].depends_on) + + +def test_create_symlinks(tmp_path): + """Tests that symlinks are created for expected files in the dbt project directory.""" + tmp_dir = tmp_path / "dbt-project" + tmp_dir.mkdir() + + create_symlinks(DBT_PROJECTS_ROOT_DIR / "jaffle_shop", tmp_dir) + for child in tmp_dir.iterdir(): + assert child.is_symlink() + assert child.name not in ("logs", "target", "profiles.yml", "dbt_packages") + + +@pytest.mark.parametrize( + "stdout,returncode", + [ + ("all good", None), + pytest.param("fail", 599, marks=pytest.mark.xfail(raises=CosmosLoadDbtException)), + pytest.param("Error", None, marks=pytest.mark.xfail(raises=CosmosLoadDbtException)), + ], +) +@patch("cosmos.dbt.graph.Popen") +def test_run_command(mock_popen, stdout, returncode): + fake_command = ["fake", "command"] + fake_dir = Path("fake_dir") + env_vars = {"fake": "env_var"} + + mock_popen.return_value.communicate.return_value = (stdout, "") + mock_popen.return_value.returncode = returncode + + return_value = run_command(fake_command, fake_dir, env_vars) + args, kwargs = mock_popen.call_args + assert args[0] == fake_command + assert kwargs["cwd"] == fake_dir + assert kwargs["env"] == env_vars + + assert return_value == stdout + + +def test_parse_dbt_ls_output(): + fake_ls_stdout = '{"resource_type": "model", "name": "fake-name", "original_file_path": "fake-file-path.sql", "unique_id": "fake-unique-id", "tags": [], "config": {}}' + + expected_nodes = { + "fake-unique-id": DbtNode( + name="fake-name", + unique_id="fake-unique-id", + resource_type=DbtResourceType.MODEL, + file_path=Path("fake-project/fake-file-path.sql"), + tags=[], + config={}, + depends_on=[], + ), + } + nodes = parse_dbt_ls_output(Path("fake-project"), fake_ls_stdout) + + assert expected_nodes == nodes diff --git a/tests/sample/manifest.json b/tests/sample/manifest.json index 4150234b87..d0b19c7b6d 100644 --- a/tests/sample/manifest.json +++ b/tests/sample/manifest.json @@ -7576,7 +7576,9 @@ "resource_type": "model", "schema": "public", "sources": [], - "tags": [], + "tags": [ + "test_tag" + ], "unique_id": "model.jaffle_shop.customers", "unrendered_config": { "materialized": "table" @@ -7754,7 +7756,9 @@ "resource_type": "model", "schema": "public", "sources": [], - "tags": [], + "tags": [ + "test_tag" + ], "unique_id": "model.jaffle_shop.orders", "unrendered_config": { "materialized": "table" diff --git a/tests/test_config.py b/tests/test_config.py index c0ae321018..9eec48055d 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -36,17 +36,14 @@ def test_init_with_manifest_path_and_project_path_succeeds(): def test_init_with_no_params(): """ - The constructor now validates that the required base fields are present - As such, we should test here that the correct exception is raised if these are not correctly defined - This functionality has been moved from the validate method + With the implementation of dbt_project_path in RenderConfig and ExecutionConfig + dbt_project_path becomes optional here. The only requirement is that if one of + manifest_path or project_name is defined, they should both be defined. + We used to enforce dbt_project_path or manifest_path and project_name, but this is + No longer the case """ - with pytest.raises(CosmosValueError) as err_info: - ProjectConfig() - print(err_info.value.args[0]) - assert err_info.value.args[0] == ( - "ProjectConfig requires dbt_project_path and/or manifest_path to be defined." - "If only manifest_path is defined, project_name must also be defined." - ) + project_config = ProjectConfig() + assert project_config def test_init_with_manifest_path_and_not_project_path_and_not_project_name_fails(): @@ -55,11 +52,9 @@ def test_init_with_manifest_path_and_not_project_path_and_not_project_name_fails """ with pytest.raises(CosmosValueError) as err_info: ProjectConfig(manifest_path=DBT_PROJECTS_ROOT_DIR / "manifest.json") - print(err_info.value.args[0]) - assert err_info.value.args[0] == ( - "ProjectConfig requires dbt_project_path and/or manifest_path to be defined." - "If only manifest_path is defined, project_name must also be defined." - ) + assert err_info.value.args[0] == ( + "If ProjectConfig.dbt_project_path is not defined, ProjectConfig.manifest_path and ProjectConfig.project_name must be defined together, or both left undefined." + ) def test_validate_with_project_path_and_manifest_path_succeeds(): @@ -95,7 +90,7 @@ def test_validate_project_missing_fails(): project_config = ProjectConfig(dbt_project_path=Path("/tmp")) with pytest.raises(CosmosValueError) as err_info: assert project_config.validate_project() is None - assert err_info.value.args[0] == "Could not find dbt_project.yml at /tmp/dbt_project.yml" + assert err_info.value.args[0] == "Could not find dbt_project.yml at /tmp/dbt_project.yml" def test_is_manifest_available_is_true(): @@ -118,13 +113,11 @@ def test_project_name(): def test_profile_config_post_init(): with pytest.raises(CosmosValueError) as err_info: ProfileConfig(profiles_yml_filepath="/tmp/some-profile", profile_name="test", target_name="test") - assert err_info.value.args[0] == "The file /tmp/some-profile does not exist." + assert err_info.value.args[0] == "The file /tmp/some-profile does not exist." def test_profile_config_validate(): with pytest.raises(CosmosValueError) as err_info: profile_config = ProfileConfig(profile_name="test", target_name="test") assert profile_config.validate_profile() is None - assert ( - err_info.value.args[0] == "Either profiles_yml_filepath or profile_mapping must be set to render a profile" - ) + assert err_info.value.args[0] == "Either profiles_yml_filepath or profile_mapping must be set to render a profile" diff --git a/tests/test_converter.py b/tests/test_converter.py index 5fb50a00fc..5d89513b31 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -113,7 +113,7 @@ def test_converter_creates_dag_with_project_path_str(mock_load_dbt_graph, execut ) @patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) @patch("cosmos.converter.DbtGraph.load") -def test_converter_fails_no_project_dir(mock_load_dbt_graph, execution_mode, operator_args): +def test_converter_fails_execution_config_no_project_dir(mock_load_dbt_graph, execution_mode, operator_args): """ This test validates that a project, given a manifest path and project name, with seeds is able to successfully generate a converter @@ -135,4 +135,83 @@ def test_converter_fails_no_project_dir(mock_load_dbt_graph, execution_mode, ope render_config=render_config, operator_args=operator_args, ) - assert err_info.value.args[0] == "A Project Path in ProjectConfig is required for generating a Task Operators." + assert ( + err_info.value.args[0] + == "ExecutionConfig.dbt_project_path is required for the execution of dbt tasks in all execution modes." + ) + + +@pytest.mark.parametrize( + "execution_mode,operator_args", + [ + (ExecutionMode.KUBERNETES, {}), + # (ExecutionMode.DOCKER, {"image": "sample-image"}), + ], +) +@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) +@patch("cosmos.converter.DbtGraph.load") +def test_converter_fails_project_config_path_and_execution_config_path( + mock_load_dbt_graph, execution_mode, operator_args +): + """ + This test ensures that we fail if we defined project path in ProjectConfig and ExecutionConfig + They are mutually exclusive, so this should be allowed. + """ + project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT.as_posix()) + execution_config = ExecutionConfig(execution_mode=execution_mode, dbt_project_path=SAMPLE_DBT_PROJECT.as_posix()) + render_config = RenderConfig(emit_datasets=True) + profile_config = ProfileConfig( + profile_name="my_profile_name", + target_name="my_target_name", + profiles_yml_filepath=SAMPLE_PROFILE_YML, + ) + with pytest.raises(CosmosValueError) as err_info: + DbtToAirflowConverter( + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + operator_args=operator_args, + ) + assert ( + err_info.value.args[0] + == "ProjectConfig.dbt_project_path is mutually exclusive with RenderConfig.dbt_project_path and ExecutionConfig.dbt_project_path.If using RenderConfig.dbt_project_path or ExecutionConfig.dbt_project_path, ProjectConfig.dbt_project_path should be None" + ) + + +@pytest.mark.parametrize( + "execution_mode,operator_args", + [ + (ExecutionMode.KUBERNETES, {}), + # (ExecutionMode.DOCKER, {"image": "sample-image"}), + ], +) +@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) +@patch("cosmos.converter.DbtGraph.load") +def test_converter_fails_no_manifest_no_render_config(mock_load_dbt_graph, execution_mode, operator_args): + """ + This test ensures that we fail if we define project path in ProjectConfig and ExecutionConfig + They are mutually exclusive, so this should be allowed. + """ + project_config = ProjectConfig() + execution_config = ExecutionConfig(execution_mode=execution_mode, dbt_project_path=SAMPLE_DBT_PROJECT.as_posix()) + render_config = RenderConfig(emit_datasets=True) + profile_config = ProfileConfig( + profile_name="my_profile_name", + target_name="my_target_name", + profiles_yml_filepath=SAMPLE_PROFILE_YML, + ) + with pytest.raises(CosmosValueError) as err_info: + DbtToAirflowConverter( + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + operator_args=operator_args, + ) + assert ( + err_info.value.args[0] + == "RenderConfig.dbt_project_path is required for rendering an airflow DAG from a DBT Graph if no manifest is provided." + )