Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 2 additions & 32 deletions cosmos/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
remote_cache_dir_conn_id,
)
from cosmos.settings import remote_cache_dir as settings_remote_cache_dir
from cosmos.versioning import _create_folder_version_hash

logger = get_logger(__name__)
VAR_KEY_CACHE_PREFIX = "cosmos_cache__"
Expand All @@ -52,7 +53,7 @@ def _configure_remote_cache_dir() -> Path | None:
if not settings_remote_cache_dir:
return None

_configured_cache_dir = None
_configured_cache_dir: Path | None = None

cache_dir_str = str(settings_remote_cache_dir)

Expand Down Expand Up @@ -267,37 +268,6 @@ def _copy_partial_parse_to_project(partial_parse_filepath: Path, project_path: P
shutil.copy(str(source_manifest_filepath), str(target_manifest_filepath))


def _create_folder_version_hash(dir_path: Path) -> str:
Comment thread
pankajkoti marked this conversation as resolved.
"""
Given a directory, iterate through its content and create a hash that will change in case the
contents of the directory change. The value should not change if the values of the directory do not change, even if
the command is run from different Airflow instances.

This method output must be concise and it currently changes based on operating system.
"""
# This approach is less efficient than using modified time
# sum([path.stat().st_mtime for path in dir_path.glob("**/*")])
# unfortunately, the modified time approach does not work well for dag-only deployments
# where DAGs are constantly synced to the deployed Airflow
# for 5k files, this seems to take 0.14
hasher = hashlib.md5()
filepaths = []

for root_dir, dirs, files in os.walk(dir_path):
paths = [os.path.join(root_dir, filepath) for filepath in files]
filepaths.extend(paths)

for filepath in sorted(filepaths):
try:
with open(str(filepath), "rb") as fp:
buf = fp.read()
hasher.update(buf)
except FileNotFoundError:
logger.warning(f"The dbt project folder contains a symbolic link to a non-existent file: {filepath}")

return hasher.hexdigest()


def _calculate_dbt_ls_cache_current_version(cache_identifier: str, project_dir: Path, cmd_args: list[str]) -> str:
"""
Taking into account the project directory contents and the command arguments, calculate the
Expand Down
28 changes: 28 additions & 0 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from cosmos.dbt.selector import retrieve_by_label
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger
from cosmos.versioning import _create_folder_version_hash

logger = get_logger(__name__)

Expand Down Expand Up @@ -286,6 +287,8 @@ def __init__(
)
self.dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode)

self._add_dbt_project_hash_to_dag_docs(dag)

current_time = time.perf_counter()
elapsed_time = current_time - previous_time
logger.info(
Expand Down Expand Up @@ -336,3 +339,28 @@ def __init__(
logger.info(
f"Cosmos performance ({cache_identifier}) - [{platform.node()}|{os.getpid()}]: It took {elapsed_time:.3}s to build the Airflow DAG."
)

def _add_dbt_project_hash_to_dag_docs(self, dag: DAG | None) -> None:
"""
Add dbt project content hash to DAG documentation for Airflow 3 dag versioning support.

This enables Airflow 3's automatic DAG versioning to detect when dbt project
files change, ensuring proper DAG version updates.

:param dag: The Airflow DAG to add versioning information to. If None, no action is taken.
"""
if dag is None:
return

try:
dbt_project_hash = _create_folder_version_hash(self.dbt_graph.project_path)
hash_suffix = f"\n\n**dbt project hash:** `{dbt_project_hash}`"

if dag.doc_md:
dag.doc_md += hash_suffix
else:
dag.doc_md = f"**dbt project hash:** `{dbt_project_hash}`"

logger.debug(f"Appended dbt project hash {dbt_project_hash} to DAG {dag.dag_id} documentation")
except Exception as e:
logger.warning(f"Failed to append dbt project hash to DAG documentation: {e}")
40 changes: 40 additions & 0 deletions cosmos/versioning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from __future__ import annotations

import hashlib
import os
from pathlib import Path

from cosmos.log import get_logger

logger = get_logger(__name__)


def _create_folder_version_hash(dir_path: Path) -> str:
"""
Given a directory, iterate through its content and create a hash that will change in case the
contents of the directory change. The value should not change if the values of the directory do not change, even if
the command is run from different Airflow instances.

This method output must be concise and it currently changes based on operating system.
"""
# This approach is less efficient than using modified time
# sum([path.stat().st_mtime for path in dir_path.glob("**/*")])
# unfortunately, the modified time approach does not work well for dag-only deployments
# where DAGs are constantly synced to the deployed Airflow
# for 5k files, this seems to take 0.14
hasher = hashlib.md5()
filepaths = []

for root_dir, dirs, files in os.walk(dir_path):
paths = [os.path.join(root_dir, filepath) for filepath in files]
filepaths.extend(paths)

for filepath in sorted(filepaths):
try:
with open(str(filepath), "rb") as fp:
buf = fp.read()
hasher.update(buf)
except FileNotFoundError:
logger.warning(f"The dbt project folder contains a symbolic link to a non-existent file: {filepath}")

return hasher.hexdigest()
33 changes: 0 additions & 33 deletions tests/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
_configure_remote_cache_dir,
_copy_partial_parse_to_project,
_create_cache_identifier,
_create_folder_version_hash,
_get_latest_cached_package_lockfile,
_get_latest_partial_parse,
_get_or_create_profile_cache_dir,
Expand Down Expand Up @@ -123,38 +122,6 @@ def test__copy_partial_parse_to_project_msg_fails_msgpack(mock_unpack, tmp_path,
assert "Unable to patch the partial_parse.msgpack file due to ValueError()" in caplog.text


def test__create_folder_version_hash(tmp_path, caplog):
Comment thread
pankajkoti marked this conversation as resolved.
"""
Test that Cosmos is still able to create the hash of a dbt project folder even when
there is a symbolic link referencing a no longer existing file.

This test addresses the issue:
https://github.com/astronomer/astronomer-cosmos/issues/1096
"""
caplog.set_level(logging.INFO)

# Create a source folder with two files
source_dir = tmp_path / "original_dbt_folder"
source_dir.mkdir()
file_1 = Path(source_dir / "file_1.sql")
file_1.touch()
file_2 = Path(source_dir / "file_2.sql")
file_2.touch()

# Create a target folder with symbolic links to the two files in the source folder
target_dir = tmp_path / "cosmos_dbt_folder"
target_dir.mkdir()
file_1_symlink = Path(target_dir / "file_1.sql")
file_1_symlink.symlink_to(file_1)
file_2_symlink = Path(target_dir / "file_2.sql")
file_2_symlink.symlink_to(file_2)

# Delete one of the original files from the source folder
file_1.unlink()

_create_folder_version_hash(target_dir)


@patch("cosmos.cache.shutil.copyfile")
@patch("cosmos.cache.get_partial_parse_path")
def test_update_partial_parse_cache(mock_get_partial_parse_path, mock_copyfile):
Expand Down
144 changes: 144 additions & 0 deletions tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -968,3 +968,147 @@ def test_converter_creates_model_with_pre_dbt_fusion(mock_load_dbt_graph):
assert isinstance(converter.tasks_map, dict)
assert converter.tasks_map["sample_model"].models == "sample.sample_model"
assert converter.tasks_map["sample_model"].select is None


@patch("cosmos.converter._create_folder_version_hash")
@patch("cosmos.converter.DbtGraph.load")
def test_dag_versioning_hash_appended_to_empty_doc_md(mock_load_dbt_graph, mock_hash_func):
"""Test that dbt project hash is appended to DAG doc_md when doc_md is initially empty."""
mock_hash_func.return_value = "abc123def456"
dag = DAG("test_dag", start_date=datetime(2024, 1, 1))
assert dag.doc_md is None # Initially empty

project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT)
profile_config = ProfileConfig(
profile_name="my_profile_name",
target_name="my_target_name",
profiles_yml_filepath=SAMPLE_PROFILE_YML,
)
execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL)

DbtToAirflowConverter(
dag=dag,
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
)

assert dag.doc_md == "**dbt project hash:** `abc123def456`"
mock_hash_func.assert_called_once()


@patch("cosmos.converter._create_folder_version_hash")
@patch("cosmos.converter.DbtGraph.load")
def test_dag_versioning_hash_appended_to_existing_doc_md(mock_load_dbt_graph, mock_hash_func):
"""Test that dbt project hash is appended to existing DAG doc_md."""
mock_hash_func.return_value = "xyz789abc123"
existing_doc = "This is my existing DAG documentation.\n\nIt has multiple lines."
dag = DAG("test_dag", start_date=datetime(2024, 1, 1), doc_md=existing_doc)

project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT)
profile_config = ProfileConfig(
profile_name="test",
target_name="test",
profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}),
)
execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL)

DbtToAirflowConverter(
dag=dag,
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
)

expected_doc = existing_doc + "\n\n**dbt project hash:** `xyz789abc123`"
assert dag.doc_md == expected_doc
mock_hash_func.assert_called_once()


@patch("cosmos.converter.logger")
@patch("cosmos.converter._create_folder_version_hash")
@patch("cosmos.converter.DbtGraph.load")
def test_dag_versioning_hash_error_handling(mock_load_dbt_graph, mock_hash_func, mock_logger):
"""Test that hash creation errors are properly handled and logged."""
mock_hash_func.side_effect = Exception("File system error")
dag = DAG("test_dag", start_date=datetime(2024, 1, 1))
original_doc_md = dag.doc_md # Should be None

project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT)
profile_config = ProfileConfig(
profile_name="test",
target_name="test",
profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}),
)
execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL)

DbtToAirflowConverter(
dag=dag,
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
)

# DAG doc_md should remain unchanged when error occurs
assert dag.doc_md == original_doc_md

# Error should be logged as warning
mock_logger.warning.assert_called_once()
warning_call = mock_logger.warning.call_args[0][0]
assert "Failed to append dbt project hash to DAG documentation" in warning_call
assert "File system error" in warning_call


@patch("cosmos.converter._create_folder_version_hash")
@patch("cosmos.converter.DbtGraph.load")
def test_dag_versioning_hash_with_special_characters(mock_load_dbt_graph, mock_hash_func):
"""Test hash appending works correctly with special characters in existing doc_md."""
mock_hash_func.return_value = "hash_with_special_chars!@#$%"
existing_doc = "DAG with **markdown**, `code`, and [links](http://example.com)"
dag = DAG("test_dag", start_date=datetime(2024, 1, 1), doc_md=existing_doc)

project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT)
profile_config = ProfileConfig(
profile_name="test",
target_name="test",
profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}),
)
execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL)

DbtToAirflowConverter(
dag=dag,
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
)

expected_doc = existing_doc + "\n\n**dbt project hash:** `hash_with_special_chars!@#$%`"
assert dag.doc_md == expected_doc


@patch("cosmos.converter.logger")
@patch("cosmos.converter._create_folder_version_hash")
@patch("cosmos.converter.DbtGraph.load")
def test_dag_versioning_successful_logging(mock_load_dbt_graph, mock_hash_func, mock_logger):
"""Test that successful hash appending is logged at debug level."""
mock_hash_func.return_value = "test_hash_123"
dag = DAG("test_dag_logging", start_date=datetime(2024, 1, 1))

project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT)
profile_config = ProfileConfig(
profile_name="test",
target_name="test",
profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}),
)
execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL)

DbtToAirflowConverter(
dag=dag,
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
)

mock_logger.debug.assert_called_once_with(
"Appended dbt project hash test_hash_123 to DAG test_dag_logging documentation"
)
36 changes: 36 additions & 0 deletions tests/test_versioning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import logging
from pathlib import Path

from cosmos.versioning import _create_folder_version_hash


def test__create_folder_version_hash(tmp_path, caplog):
"""
Test that Cosmos is still able to create the hash of a dbt project folder even when
there is a symbolic link referencing a no longer existing file.

This test addresses the issue:
https://github.com/astronomer/astronomer-cosmos/issues/1096
"""
caplog.set_level(logging.INFO)

# Create a source folder with two files
source_dir = tmp_path / "original_dbt_folder"
source_dir.mkdir()
file_1 = Path(source_dir / "file_1.sql")
file_1.touch()
file_2 = Path(source_dir / "file_2.sql")
file_2.touch()

# Create a target folder with symbolic links to the two files in the source folder
target_dir = tmp_path / "cosmos_dbt_folder"
target_dir.mkdir()
file_1_symlink = Path(target_dir / "file_1.sql")
file_1_symlink.symlink_to(file_1)
file_2_symlink = Path(target_dir / "file_2.sql")
file_2_symlink.symlink_to(file_2)

# Delete one of the original files from the source folder
file_1.unlink()

_create_folder_version_hash(target_dir)