Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletion cosmos/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
from sqlalchemy.orm import Session

from cosmos import settings
from cosmos.constants import DBT_MANIFEST_FILE_NAME, DBT_TARGET_DIR_NAME
from cosmos.constants import DBT_MANIFEST_FILE_NAME, DBT_TARGET_DIR_NAME, DEFAULT_PROFILES_FILE_NAME
from cosmos.dbt.project import get_partial_parse_path
from cosmos.log import get_logger
from cosmos.settings import cache_dir, dbt_profile_cache_dir_name, enable_cache, enable_cache_profile

logger = get_logger(__name__)
VAR_KEY_CACHE_PREFIX = "cosmos_cache__"
Expand Down Expand Up @@ -346,3 +347,52 @@ def delete_unused_dbt_ls_cache(
f"Deleted {deleted_cosmos_variables}/{total_cosmos_variables} Airflow Variables used to store Cosmos cache. "
)
return deleted_cosmos_variables


def is_profile_cache_enabled() -> bool:
"""Return True if global and profile cache is enable"""
return enable_cache and enable_cache_profile


def _get_or_create_profile_cache_dir() -> Path:
"""
Get or create the directory path for caching DBT profiles.

- Constructs the profile cache directory path based on cache_dir and dbt_profile_cache_dir.
- Checks if the directory exists; if not, creates it
- Return profile cache directory
"""
profile_cache_dir = cache_dir / dbt_profile_cache_dir_name
if not profile_cache_dir.exists():
profile_cache_dir.mkdir(parents=True, exist_ok=True)
return profile_cache_dir


def get_cached_profile(version: str) -> Path | None:
"""
Retrieve the path to a cached DBT profile YML file if it exists for the given version.

- Constructs the DBT profile YML Path based on version and profile cache directory
- Checks if the profile YML exists
- Return the profile YML Path
"""
profile_yml_path = _get_or_create_profile_cache_dir() / version / DEFAULT_PROFILES_FILE_NAME
if profile_yml_path.exists() and profile_yml_path.is_file():
return profile_yml_path
return None


def create_cache_profile(version: str, profile_content: str) -> Path:
"""
Create a cached DBT profile YAML file with the provided content for the given version.

- Constructs the path for profile YML based on the version in the profile cache directory
- Creates the profile directory if it does not exist
- Writes the profile content to the profile YML file
- Return the profile YML Path
"""
profile_yml_dir = _get_or_create_profile_cache_dir() / version
profile_yml_dir.mkdir(parents=True, exist_ok=True)
profile_yml_path = profile_yml_dir / DEFAULT_PROFILES_FILE_NAME
profile_yml_path.write_text(profile_content)
return profile_yml_path
70 changes: 48 additions & 22 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
from pathlib import Path
from typing import Any, Callable, Iterator

from cosmos.cache import create_cache_profile, get_cached_profile, is_profile_cache_enabled
from cosmos.constants import (
DEFAULT_PROFILES_FILE_NAME,
DbtResourceType,
ExecutionMode,
InvocationMode,
Expand All @@ -25,8 +27,6 @@

logger = get_logger(__name__)

DEFAULT_PROFILES_FILE_NAME = "profiles.yml"


class CosmosConfigException(Exception):
"""
Expand Down Expand Up @@ -258,6 +258,27 @@ def validate_profiles_yml(self) -> None:
if self.profiles_yml_filepath and not Path(self.profiles_yml_filepath).exists():
raise CosmosValueError(f"The file {self.profiles_yml_filepath} does not exist.")

def _get_profile_path(self, use_mock_values: bool = False) -> Path:
"""
Handle the profile caching mechanism.

Check if profile object version is exist then reuse it
Otherwise, create profile yml for requested object and return the profile path
"""
assert self.profile_mapping # To satisfy MyPy
current_profile_version = self.profile_mapping.version(self.profile_name, self.target_name, use_mock_values)
cached_profile_path = get_cached_profile(current_profile_version)
if cached_profile_path:
logger.info("Profile found in cache using profile: %s.", cached_profile_path)
return cached_profile_path
else:
profile_contents = self.profile_mapping.get_profile_file_contents(
profile_name=self.profile_name, target_name=self.target_name, use_mock_values=use_mock_values
)
profile_path = create_cache_profile(current_profile_version, profile_contents)
logger.info("Profile not found in cache storing and using profile: %s.", profile_path)
return profile_path

@contextlib.contextmanager
def ensure_profile(
self, desired_profile_path: Path | None = None, use_mock_values: bool = False
Expand All @@ -268,35 +289,40 @@ def ensure_profile(
yield Path(self.profiles_yml_filepath), {}

elif self.profile_mapping:
profile_contents = self.profile_mapping.get_profile_file_contents(
profile_name=self.profile_name, target_name=self.target_name, use_mock_values=use_mock_values
)

if use_mock_values:
env_vars = {}
else:
env_vars = self.profile_mapping.env_vars

if desired_profile_path:
logger.info(
"Writing profile to %s with the following contents:\n%s",
desired_profile_path,
profile_contents,
)
# write profile_contents to desired_profile_path using yaml library
desired_profile_path.write_text(profile_contents)
yield desired_profile_path, env_vars
if is_profile_cache_enabled():
logger.info("Profile caching is enable.")
cached_profile_path = self._get_profile_path(use_mock_values)
yield cached_profile_path, env_vars
else:
with tempfile.TemporaryDirectory() as temp_dir:
temp_file = Path(temp_dir) / DEFAULT_PROFILES_FILE_NAME
profile_contents = self.profile_mapping.get_profile_file_contents(
profile_name=self.profile_name, target_name=self.target_name, use_mock_values=use_mock_values
)

if desired_profile_path:
logger.info(
"Creating temporary profiles.yml with use_mock_values=%s at %s with the following contents:\n%s",
use_mock_values,
temp_file,
"Writing profile to %s with the following contents:\n%s",
desired_profile_path,
profile_contents,
)
temp_file.write_text(profile_contents)
yield temp_file, env_vars
# write profile_contents to desired_profile_path using yaml library
desired_profile_path.write_text(profile_contents)
yield desired_profile_path, env_vars
else:
with tempfile.TemporaryDirectory() as temp_dir:
temp_file = Path(temp_dir) / DEFAULT_PROFILES_FILE_NAME
logger.info(
"Creating temporary profiles.yml with use_mock_values=%s at %s with the following contents:\n%s",
use_mock_values,
temp_file,
profile_contents,
)
temp_file.write_text(profile_contents)
yield temp_file, env_vars


@dataclass
Expand Down
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
DBT_DEPENDENCIES_FILE_NAMES = {"packages.yml", "dependencies.yml"}
DBT_LOG_FILENAME = "dbt.log"
DBT_BINARY_NAME = "dbt"
DEFAULT_PROFILES_FILE_NAME = "profiles.yml"

DEFAULT_OPENLINEAGE_NAMESPACE = "cosmos"
OPENLINEAGE_PRODUCER = "https://github.com/astronomer/astronomer-cosmos/"
Expand Down
19 changes: 19 additions & 0 deletions cosmos/profiles/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from __future__ import annotations

import hashlib
import json
import warnings
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Dict, Literal, Optional
Expand Down Expand Up @@ -82,6 +84,23 @@ def __init__(
self.dbt_config_vars = dbt_config_vars
self._validate_disable_event_tracking()

def version(self, profile_name: str, target_name: str, mock_profile: bool = False) -> str:
"""
Generate SHA-256 hash digest based on the provided profile, profile and target names.

:param profile_name: Name of the DBT profile.
:param target_name: Name of the DBT target
:param mock_profile: If True, use a mock profile.
"""
if mock_profile:
profile = self.mock_profile
else:
profile = self.profile
profile["profile_name"] = profile_name
profile["target_name"] = target_name
hash_object = hashlib.sha256(json.dumps(profile, sort_keys=True).encode())
return hash_object.hexdigest()

def _validate_profile_args(self) -> None:
"""
Check if profile_args contains keys that should not be overridden from the
Expand Down
2 changes: 2 additions & 0 deletions cosmos/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
dbt_docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None)
dbt_docs_conn_id = conf.get("cosmos", "dbt_docs_conn_id", fallback=None)
dbt_docs_index_file_name = conf.get("cosmos", "dbt_docs_index_file_name", fallback="index.html")
enable_cache_profile = conf.getboolean("cosmos", "enable_cache_profile", fallback=True)
dbt_profile_cache_dir_name = conf.get("cosmos", "profile_cache_dir_name", fallback="profile")

try:
LINEAGE_NAMESPACE = conf.get("openlineage", "namespace")
Expand Down
14 changes: 14 additions & 0 deletions docs/configuration/caching.rst
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,17 @@ Users can customize where to store the cache using the setting ``AIRFLOW__COSMOS
It is possible to switch off this feature by exporting the environment variable ``AIRFLOW__COSMOS__ENABLE_CACHE_PARTIAL_PARSE=0``.

For more information, read the `Cosmos partial parsing documentation <./partial-parsing.html>`_


Caching the profiles
~~~~~~~~~~~~~~~~~~~~~~~~

(Introduced in Cosmos 1.5)

Cosmos 1.5 introduced `support to profile caching <https://github.com/astronomer/astronomer-cosmos/pull/1046>`_,
enabling caching for the profile mapping in the path specified by env ``AIRFLOW__COSMOS__CACHE_DIR`` and ``AIRFLOW__COSMOS__PROFILE_CACHE_DIR_NAME``.
This feature facilitates the reuse of Airflow connections and ``profiles.yml``.

Users have the flexibility to customize the cache storage location using the settings ``AIRFLOW__COSMOS__CACHE_DIR`` and ``AIRFLOW__COSMOS__PROFILE_CACHE_DIR_NAME``.

To disable this feature, users can set the environment variable ``AIRFLOW__COSMOS__ENABLE_CACHE_PROFILE=False``
17 changes: 17 additions & 0 deletions docs/configuration/cosmos-conf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,23 @@ This page lists all available Airflow configurations that affect ``astronomer-co
- Default: ``None``
- Environment Variable: ``AIRFLOW__COSMOS__DBT_DOCS_CONN_ID``

.. _enable_cache_profile:

`enable_cache_profile`_:
Enable caching for the DBT profile.

- Default: ``True``
- Environment Variable: ``AIRFLOW__COSMOS__ENABLE_CACHE_PROFILE``

.. _profile_cache_dir_name:

`profile_cache_dir_name`_:
Folder name to store the DBT cached profiles. This will be a sub-folder of ``cache_dir``

- Default: ``profile``
- Environment Variable: ``AIRFLOW__COSMOS__PROFILE_CACHE_DIR_NAME``


[openlineage]
~~~~~~~~~~~~~

Expand Down
15 changes: 12 additions & 3 deletions tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,10 +405,13 @@ def test_load(


@pytest.mark.integration
@pytest.mark.parametrize("enable_cache_profile", [True, False])
@patch("cosmos.config.is_profile_cache_enabled")
@patch("cosmos.dbt.graph.Popen")
def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder(
mock_popen, tmp_dbt_project_dir, postgres_profile_config
mock_popen, is_profile_cache_enabled, enable_cache_profile, tmp_dbt_project_dir, postgres_profile_config
):
is_profile_cache_enabled.return_value = enable_cache_profile
mock_popen().communicate.return_value = ("", "")
mock_popen().returncode = 0
assert not (tmp_dbt_project_dir / "target").exists()
Expand All @@ -427,7 +430,7 @@ def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder(
assert not (tmp_dbt_project_dir / "target").exists()
assert not (tmp_dbt_project_dir / "logs").exists()

used_cwd = Path(mock_popen.call_args[0][0][-5])
used_cwd = Path(mock_popen.call_args[0][0][5])
assert used_cwd != project_config.dbt_project_path
assert not used_cwd.exists()

Expand Down Expand Up @@ -638,7 +641,11 @@ def test_load_via_dbt_ls_without_dbt_deps_and_preinstalled_dbt_packages(


@pytest.mark.integration
def test_load_via_dbt_ls_caching_partial_parsing(tmp_dbt_project_dir, postgres_profile_config, caplog, tmp_path):
@pytest.mark.parametrize("enable_cache_profile", [True, False])
@patch("cosmos.config.is_profile_cache_enabled")
def test_load_via_dbt_ls_caching_partial_parsing(
is_profile_cache_enabled, enable_cache_profile, tmp_dbt_project_dir, postgres_profile_config, caplog, tmp_path
):
"""
When using RenderConfig.enable_mock_profile=False and defining DbtGraph.cache_dir,
Cosmos should leverage dbt partial parsing.
Expand All @@ -647,6 +654,8 @@ def test_load_via_dbt_ls_caching_partial_parsing(tmp_dbt_project_dir, postgres_p

caplog.set_level(logging.DEBUG)

is_profile_cache_enabled.return_value = enable_cache_profile

project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME)
render_config = RenderConfig(
dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, dbt_deps=True, enable_mock_profile=False
Expand Down
6 changes: 6 additions & 0 deletions tests/profiles/test_base_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,9 @@ def test_profile_config_validate_dbt_config_vars_check_values(dbt_config_var: st
conn_id="fake_conn_id",
dbt_config_vars=DbtProfileConfigVars(**dbt_config_vars),
)


def test_profile_version_sha_consistency():
profile_mapping = TestProfileMapping(conn_id="fake_conn_id")
version = profile_mapping.version(profile_name="dev", target_name="dev")
assert version == "ea3bf1f70b033405ba9ff9cafe65af873fd7a868cac840cdbfd5e8e9a1da9650"
Loading