diff --git a/cosmos/cache.py b/cosmos/cache.py index 16ca7709d1..98cca40543 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -42,6 +42,7 @@ remote_cache_dir_conn_id, ) from cosmos.settings import remote_cache_dir as settings_remote_cache_dir +from cosmos.versioning import _create_folder_version_hash logger = get_logger(__name__) VAR_KEY_CACHE_PREFIX = "cosmos_cache__" @@ -52,7 +53,7 @@ def _configure_remote_cache_dir() -> Path | None: if not settings_remote_cache_dir: return None - _configured_cache_dir = None + _configured_cache_dir: Path | None = None cache_dir_str = str(settings_remote_cache_dir) @@ -267,37 +268,6 @@ def _copy_partial_parse_to_project(partial_parse_filepath: Path, project_path: P shutil.copy(str(source_manifest_filepath), str(target_manifest_filepath)) -def _create_folder_version_hash(dir_path: Path) -> str: - """ - Given a directory, iterate through its content and create a hash that will change in case the - contents of the directory change. The value should not change if the values of the directory do not change, even if - the command is run from different Airflow instances. - - This method output must be concise and it currently changes based on operating system. - """ - # This approach is less efficient than using modified time - # sum([path.stat().st_mtime for path in dir_path.glob("**/*")]) - # unfortunately, the modified time approach does not work well for dag-only deployments - # where DAGs are constantly synced to the deployed Airflow - # for 5k files, this seems to take 0.14 - hasher = hashlib.md5() - filepaths = [] - - for root_dir, dirs, files in os.walk(dir_path): - paths = [os.path.join(root_dir, filepath) for filepath in files] - filepaths.extend(paths) - - for filepath in sorted(filepaths): - try: - with open(str(filepath), "rb") as fp: - buf = fp.read() - hasher.update(buf) - except FileNotFoundError: - logger.warning(f"The dbt project folder contains a symbolic link to a non-existent file: {filepath}") - - return hasher.hexdigest() - - def _calculate_dbt_ls_cache_current_version(cache_identifier: str, project_dir: Path, cmd_args: list[str]) -> str: """ Taking into account the project directory contents and the command arguments, calculate the diff --git a/cosmos/converter.py b/cosmos/converter.py index bf3fa12644..b3f804807d 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -23,6 +23,7 @@ from cosmos.dbt.selector import retrieve_by_label from cosmos.exceptions import CosmosValueError from cosmos.log import get_logger +from cosmos.versioning import _create_folder_version_hash logger = get_logger(__name__) @@ -286,6 +287,8 @@ def __init__( ) self.dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode) + self._add_dbt_project_hash_to_dag_docs(dag) + current_time = time.perf_counter() elapsed_time = current_time - previous_time logger.info( @@ -336,3 +339,28 @@ def __init__( logger.info( f"Cosmos performance ({cache_identifier}) - [{platform.node()}|{os.getpid()}]: It took {elapsed_time:.3}s to build the Airflow DAG." ) + + def _add_dbt_project_hash_to_dag_docs(self, dag: DAG | None) -> None: + """ + Add dbt project content hash to DAG documentation for Airflow 3 dag versioning support. + + This enables Airflow 3's automatic DAG versioning to detect when dbt project + files change, ensuring proper DAG version updates. + + :param dag: The Airflow DAG to add versioning information to. If None, no action is taken. + """ + if dag is None: + return + + try: + dbt_project_hash = _create_folder_version_hash(self.dbt_graph.project_path) + hash_suffix = f"\n\n**dbt project hash:** `{dbt_project_hash}`" + + if dag.doc_md: + dag.doc_md += hash_suffix + else: + dag.doc_md = f"**dbt project hash:** `{dbt_project_hash}`" + + logger.debug(f"Appended dbt project hash {dbt_project_hash} to DAG {dag.dag_id} documentation") + except Exception as e: + logger.warning(f"Failed to append dbt project hash to DAG documentation: {e}") diff --git a/cosmos/versioning.py b/cosmos/versioning.py new file mode 100644 index 0000000000..8c3a361703 --- /dev/null +++ b/cosmos/versioning.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +import hashlib +import os +from pathlib import Path + +from cosmos.log import get_logger + +logger = get_logger(__name__) + + +def _create_folder_version_hash(dir_path: Path) -> str: + """ + Given a directory, iterate through its content and create a hash that will change in case the + contents of the directory change. The value should not change if the values of the directory do not change, even if + the command is run from different Airflow instances. + + This method output must be concise and it currently changes based on operating system. + """ + # This approach is less efficient than using modified time + # sum([path.stat().st_mtime for path in dir_path.glob("**/*")]) + # unfortunately, the modified time approach does not work well for dag-only deployments + # where DAGs are constantly synced to the deployed Airflow + # for 5k files, this seems to take 0.14 + hasher = hashlib.md5() + filepaths = [] + + for root_dir, dirs, files in os.walk(dir_path): + paths = [os.path.join(root_dir, filepath) for filepath in files] + filepaths.extend(paths) + + for filepath in sorted(filepaths): + try: + with open(str(filepath), "rb") as fp: + buf = fp.read() + hasher.update(buf) + except FileNotFoundError: + logger.warning(f"The dbt project folder contains a symbolic link to a non-existent file: {filepath}") + + return hasher.hexdigest() diff --git a/tests/test_cache.py b/tests/test_cache.py index 403daccbd9..8734ab2ec5 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -28,7 +28,6 @@ _configure_remote_cache_dir, _copy_partial_parse_to_project, _create_cache_identifier, - _create_folder_version_hash, _get_latest_cached_package_lockfile, _get_latest_partial_parse, _get_or_create_profile_cache_dir, @@ -123,38 +122,6 @@ def test__copy_partial_parse_to_project_msg_fails_msgpack(mock_unpack, tmp_path, assert "Unable to patch the partial_parse.msgpack file due to ValueError()" in caplog.text -def test__create_folder_version_hash(tmp_path, caplog): - """ - Test that Cosmos is still able to create the hash of a dbt project folder even when - there is a symbolic link referencing a no longer existing file. - - This test addresses the issue: - https://github.com/astronomer/astronomer-cosmos/issues/1096 - """ - caplog.set_level(logging.INFO) - - # Create a source folder with two files - source_dir = tmp_path / "original_dbt_folder" - source_dir.mkdir() - file_1 = Path(source_dir / "file_1.sql") - file_1.touch() - file_2 = Path(source_dir / "file_2.sql") - file_2.touch() - - # Create a target folder with symbolic links to the two files in the source folder - target_dir = tmp_path / "cosmos_dbt_folder" - target_dir.mkdir() - file_1_symlink = Path(target_dir / "file_1.sql") - file_1_symlink.symlink_to(file_1) - file_2_symlink = Path(target_dir / "file_2.sql") - file_2_symlink.symlink_to(file_2) - - # Delete one of the original files from the source folder - file_1.unlink() - - _create_folder_version_hash(target_dir) - - @patch("cosmos.cache.shutil.copyfile") @patch("cosmos.cache.get_partial_parse_path") def test_update_partial_parse_cache(mock_get_partial_parse_path, mock_copyfile): diff --git a/tests/test_converter.py b/tests/test_converter.py index c14c7c2629..98b07e07d6 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -968,3 +968,147 @@ def test_converter_creates_model_with_pre_dbt_fusion(mock_load_dbt_graph): assert isinstance(converter.tasks_map, dict) assert converter.tasks_map["sample_model"].models == "sample.sample_model" assert converter.tasks_map["sample_model"].select is None + + +@patch("cosmos.converter._create_folder_version_hash") +@patch("cosmos.converter.DbtGraph.load") +def test_dag_versioning_hash_appended_to_empty_doc_md(mock_load_dbt_graph, mock_hash_func): + """Test that dbt project hash is appended to DAG doc_md when doc_md is initially empty.""" + mock_hash_func.return_value = "abc123def456" + dag = DAG("test_dag", start_date=datetime(2024, 1, 1)) + assert dag.doc_md is None # Initially empty + + project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT) + profile_config = ProfileConfig( + profile_name="my_profile_name", + target_name="my_target_name", + profiles_yml_filepath=SAMPLE_PROFILE_YML, + ) + execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL) + + DbtToAirflowConverter( + dag=dag, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + ) + + assert dag.doc_md == "**dbt project hash:** `abc123def456`" + mock_hash_func.assert_called_once() + + +@patch("cosmos.converter._create_folder_version_hash") +@patch("cosmos.converter.DbtGraph.load") +def test_dag_versioning_hash_appended_to_existing_doc_md(mock_load_dbt_graph, mock_hash_func): + """Test that dbt project hash is appended to existing DAG doc_md.""" + mock_hash_func.return_value = "xyz789abc123" + existing_doc = "This is my existing DAG documentation.\n\nIt has multiple lines." + dag = DAG("test_dag", start_date=datetime(2024, 1, 1), doc_md=existing_doc) + + project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}), + ) + execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL) + + DbtToAirflowConverter( + dag=dag, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + ) + + expected_doc = existing_doc + "\n\n**dbt project hash:** `xyz789abc123`" + assert dag.doc_md == expected_doc + mock_hash_func.assert_called_once() + + +@patch("cosmos.converter.logger") +@patch("cosmos.converter._create_folder_version_hash") +@patch("cosmos.converter.DbtGraph.load") +def test_dag_versioning_hash_error_handling(mock_load_dbt_graph, mock_hash_func, mock_logger): + """Test that hash creation errors are properly handled and logged.""" + mock_hash_func.side_effect = Exception("File system error") + dag = DAG("test_dag", start_date=datetime(2024, 1, 1)) + original_doc_md = dag.doc_md # Should be None + + project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}), + ) + execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL) + + DbtToAirflowConverter( + dag=dag, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + ) + + # DAG doc_md should remain unchanged when error occurs + assert dag.doc_md == original_doc_md + + # Error should be logged as warning + mock_logger.warning.assert_called_once() + warning_call = mock_logger.warning.call_args[0][0] + assert "Failed to append dbt project hash to DAG documentation" in warning_call + assert "File system error" in warning_call + + +@patch("cosmos.converter._create_folder_version_hash") +@patch("cosmos.converter.DbtGraph.load") +def test_dag_versioning_hash_with_special_characters(mock_load_dbt_graph, mock_hash_func): + """Test hash appending works correctly with special characters in existing doc_md.""" + mock_hash_func.return_value = "hash_with_special_chars!@#$%" + existing_doc = "DAG with **markdown**, `code`, and [links](http://example.com)" + dag = DAG("test_dag", start_date=datetime(2024, 1, 1), doc_md=existing_doc) + + project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}), + ) + execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL) + + DbtToAirflowConverter( + dag=dag, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + ) + + expected_doc = existing_doc + "\n\n**dbt project hash:** `hash_with_special_chars!@#$%`" + assert dag.doc_md == expected_doc + + +@patch("cosmos.converter.logger") +@patch("cosmos.converter._create_folder_version_hash") +@patch("cosmos.converter.DbtGraph.load") +def test_dag_versioning_successful_logging(mock_load_dbt_graph, mock_hash_func, mock_logger): + """Test that successful hash appending is logged at debug level.""" + mock_hash_func.return_value = "test_hash_123" + dag = DAG("test_dag_logging", start_date=datetime(2024, 1, 1)) + + project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}), + ) + execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL) + + DbtToAirflowConverter( + dag=dag, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + ) + + mock_logger.debug.assert_called_once_with( + "Appended dbt project hash test_hash_123 to DAG test_dag_logging documentation" + ) diff --git a/tests/test_versioning.py b/tests/test_versioning.py new file mode 100644 index 0000000000..e6d91316c7 --- /dev/null +++ b/tests/test_versioning.py @@ -0,0 +1,36 @@ +import logging +from pathlib import Path + +from cosmos.versioning import _create_folder_version_hash + + +def test__create_folder_version_hash(tmp_path, caplog): + """ + Test that Cosmos is still able to create the hash of a dbt project folder even when + there is a symbolic link referencing a no longer existing file. + + This test addresses the issue: + https://github.com/astronomer/astronomer-cosmos/issues/1096 + """ + caplog.set_level(logging.INFO) + + # Create a source folder with two files + source_dir = tmp_path / "original_dbt_folder" + source_dir.mkdir() + file_1 = Path(source_dir / "file_1.sql") + file_1.touch() + file_2 = Path(source_dir / "file_2.sql") + file_2.touch() + + # Create a target folder with symbolic links to the two files in the source folder + target_dir = tmp_path / "cosmos_dbt_folder" + target_dir.mkdir() + file_1_symlink = Path(target_dir / "file_1.sql") + file_1_symlink.symlink_to(file_1) + file_2_symlink = Path(target_dir / "file_2.sql") + file_2_symlink.symlink_to(file_2) + + # Delete one of the original files from the source folder + file_1.unlink() + + _create_folder_version_hash(target_dir)