Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ def __init__(

cache_dir = None
cache_identifier = None

if settings.enable_cache:
cache_identifier = cache._create_cache_identifier(dag, task_group)
cache_dir = cache._obtain_cache_dir_path(cache_identifier=cache_identifier)
Expand Down Expand Up @@ -303,6 +304,7 @@ def __init__(
"env": env_vars,
"vars": dbt_vars,
"cache_dir": cache_dir,
"manifest_filepath": project_config.manifest_path,
}

validate_arguments(
Expand Down
21 changes: 19 additions & 2 deletions cosmos/dbt/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
DBT_DEFAULT_PACKAGES_FOLDER,
DBT_DEPENDENCIES_FILE_NAMES,
DBT_LOG_DIR_NAME,
DBT_MANIFEST_FILE_NAME,
DBT_PARTIAL_PARSE_FILE_NAME,
DBT_PROJECT_FILENAME,
DBT_TARGET_DIR_NAME,
Expand Down Expand Up @@ -70,8 +71,8 @@ def copy_dbt_packages(source_folder: Path, target_folder: Path) -> None:
"""
Copies the dbt packages related files and directories from source_folder to target_folder.

:param: source_folder: The base directory where paths are sourced from.
:param: target_folder: The directory where paths will be copied to.
:param source_folder: The base directory where paths are sourced from.
:param target_folder: The directory where paths will be copied to.
"""
logger.info("Copying dbt packages to temporary folder...")

Expand All @@ -92,6 +93,22 @@ def copy_dbt_packages(source_folder: Path, target_folder: Path) -> None:
logger.info("Completed copying dbt packages to temporary folder.")


def copy_manifest_file_if_exists(source_manifest: str | Path, dbt_project_folder: str | Path) -> None:
Comment thread
tatiana marked this conversation as resolved.
"""
Copies the source manifest.json file, if available, to the given desired dbt project folder.

:param source_manifest: manifest.json filepath
:param dbt_project_folder: destination dbt project folder (it will be copied to the target folder)
"""
dbt_project_folder = Path(dbt_project_folder)
if source_manifest and Path(source_manifest).exists():
logger.info(f"Copying the manifest from {source_manifest}...")
target_folder_path = dbt_project_folder / DBT_TARGET_DIR_NAME
tmp_manifest_filepath = target_folder_path / DBT_MANIFEST_FILE_NAME
Path(target_folder_path).mkdir(parents=True, exist_ok=True)
shutil.copy(source_manifest, tmp_manifest_filepath)
Comment thread
tatiana marked this conversation as resolved.


def create_symlinks(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool) -> None:
"""Helper function to create symlinks to the dbt project files."""
ignore_paths = [DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, PACKAGE_LOCKFILE_YML, "profiles.yml"]
Expand Down
14 changes: 13 additions & 1 deletion cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@
InvocationMode,
)
from cosmos.dataset import get_dataset_alias_name
from cosmos.dbt.project import copy_dbt_packages, get_partial_parse_path, has_non_empty_dependencies_file
from cosmos.dbt.project import (
copy_dbt_packages,
copy_manifest_file_if_exists,
get_partial_parse_path,
has_non_empty_dependencies_file,
)
from cosmos.exceptions import AirflowCompatibilityError, CosmosDbtRunError, CosmosValueError
from cosmos.settings import (
remote_target_path,
Expand Down Expand Up @@ -148,6 +153,7 @@ class AbstractDbtLocalBase(AbstractDbtBase):
:param install_deps (deprecated): If true, install dependencies before running the command
:param copy_dbt_packages: If true, copy pre-existing `dbt_packages` (before running dbt deps)
:param callback: A callback function called on after a dbt run with a path to the dbt project directory.
:param manifest_filepath: The path to the user-defined Manifest file. It's "" by default.
:param target_name: A name to use for the dbt target. If not provided, and no target is found
in your project's dbt_project.yml, "cosmos_target" is used.
:param should_store_compiled_sql: If true, store the compiled SQL in the compiled_sql rendered template.
Expand All @@ -171,6 +177,7 @@ def __init__(
invocation_mode: InvocationMode | None = None,
install_deps: bool = True,
copy_dbt_packages: bool = settings.default_copy_dbt_packages,
manifest_filepath: str = "",
Comment thread
tatiana marked this conversation as resolved.
callback: Callable[[str], None] | list[Callable[[str], None]] | None = None,
callback_args: dict[str, Any] | None = None,
should_store_compiled_sql: bool = True,
Expand Down Expand Up @@ -202,6 +209,8 @@ def __init__(
self.install_deps = install_deps and has_non_empty_dependencies_file(Path(self.project_dir))
self.copy_dbt_packages = copy_dbt_packages

self.manifest_filepath = manifest_filepath

@cached_property
def subprocess_hook(self) -> FullOutputSubprocessHook:
"""Returns hook for running the bash command."""
Expand Down Expand Up @@ -457,6 +466,8 @@ def _clone_project(self, tmp_dir_path: Path) -> None:
copy_dbt_packages(Path(self.project_dir), tmp_dir_path)
self.log.info("Completed copying dbt packages to temporary folder.")

copy_manifest_file_if_exists(self.manifest_filepath, Path(tmp_dir_path))

def _handle_partial_parse(self, tmp_dir_path: Path) -> None:
if self.cache_dir is None:
return
Expand Down Expand Up @@ -575,6 +586,7 @@ def run_command(

tmp_dir_path = Path(tmp_project_dir)
env = {k: str(v) for k, v in env.items()}

self._clone_project(tmp_dir_path)

if self.partial_parse:
Expand Down
Loading