From 8c9dbb7ec18ca7012f09f2e31c0d89b84b2aae53 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Tue, 18 Jun 2024 15:27:20 +0530 Subject: [PATCH 1/4] Fix AKS permission error in restricted env --- cosmos/cache.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/cosmos/cache.py b/cosmos/cache.py index 563c4fd703..c401e29bc8 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -1,5 +1,6 @@ from __future__ import annotations +import os import shutil from pathlib import Path @@ -96,6 +97,27 @@ def _get_latest_partial_parse(dbt_project_path: Path, cache_dir: Path) -> Path | return None +def _copy(src, dst, *, follow_symlinks=True): + """ + Copy a file from `src` to `dst` and preserve metadata if possible. + """ + # Handle destination as directory + if os.path.isdir(dst): + dst = os.path.join(dst, os.path.basename(src)) + # shutil.copy includes permission copying via chmod. + # If the user lacks permission to run chmod, a PermissionError occurs. + # To avoid this, we split the operation into two steps: + # first, copy the file contents; then, copy metadata if feasible without raising exceptions. + # Step 1: Copy file contents (no metadata) + shutil.copyfile(src, dst, follow_symlinks=follow_symlinks) + + try: + # Step 2: Copy file metadata (permission bits and other metadata) + shutil.copystat(src, dst, follow_symlinks=follow_symlinks) + except PermissionError: + logger.info("Failed to copy the partial parse file metadata") + + def _update_partial_parse_cache(latest_partial_parse_filepath: Path, cache_dir: Path) -> None: """ Update the cache to have the latest partial parse file contents. @@ -107,8 +129,8 @@ def _update_partial_parse_cache(latest_partial_parse_filepath: Path, cache_dir: manifest_path = get_partial_parse_path(cache_dir).parent / DBT_MANIFEST_FILE_NAME latest_manifest_filepath = latest_partial_parse_filepath.parent / DBT_MANIFEST_FILE_NAME - shutil.copy(str(latest_partial_parse_filepath), str(cache_path)) - shutil.copy(str(latest_manifest_filepath), str(manifest_path)) + _copy(str(latest_partial_parse_filepath), str(cache_path)) + _copy(str(latest_manifest_filepath), str(manifest_path)) def patch_partial_parse_content(partial_parse_filepath: Path, project_path: Path) -> bool: From 20dcb5f64edde7e3056d0cc1360f8b6978cd992f Mon Sep 17 00:00:00 2001 From: Pankaj Date: Tue, 18 Jun 2024 20:42:23 +0530 Subject: [PATCH 2/4] Add tests --- cosmos/cache.py | 6 ++---- tests/test_cache.py | 23 ++++++++++++++++++++++- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/cosmos/cache.py b/cosmos/cache.py index c401e29bc8..c1e38cabef 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -97,10 +97,8 @@ def _get_latest_partial_parse(dbt_project_path: Path, cache_dir: Path) -> Path | return None -def _copy(src, dst, *, follow_symlinks=True): - """ - Copy a file from `src` to `dst` and preserve metadata if possible. - """ +def _copy(src: str, dst: str, *, follow_symlinks: bool = True) -> None: + """Copy a file from `src` to `dst` and preserve metadata if possible.""" # Handle destination as directory if os.path.isdir(dst): dst = os.path.join(dst, os.path.basename(src)) diff --git a/tests/test_cache.py b/tests/test_cache.py index d75bc439b8..80cd562d43 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -1,4 +1,5 @@ import logging +import os import shutil import tempfile import time @@ -10,7 +11,7 @@ from airflow import DAG from airflow.utils.task_group import TaskGroup -from cosmos.cache import _copy_partial_parse_to_project, _create_cache_identifier, _get_latest_partial_parse +from cosmos.cache import _copy, _copy_partial_parse_to_project, _create_cache_identifier, _get_latest_partial_parse from cosmos.constants import DBT_PARTIAL_PARSE_FILE_NAME, DBT_TARGET_DIR_NAME START_DATE = datetime(2024, 4, 16) @@ -86,3 +87,23 @@ def test__copy_partial_parse_to_project_msg_fails_msgpack(mock_unpack, tmp_path, _copy_partial_parse_to_project(partial_parse_filepath, Path(tmp_dir)) assert "Unable to patch the partial_parse.msgpack file due to ValueError()" in caplog.text + + +def test_copy_file_to_directory(): + """Test copying a file to an existing directory.""" + with tempfile.TemporaryDirectory() as tmpdir: + src_file = os.path.join(tmpdir, "source.txt") + dst_dir = os.path.join(tmpdir, "destination") + os.makedirs(dst_dir) + + # Create a source file + with open(src_file, "w") as f: + f.write("Test content") + + # Call _copy function + _copy(src_file, dst_dir) + + # Verify that the file was copied to the destination directory + assert os.path.isfile(os.path.join(dst_dir, "source.txt")) + with open(Path(dst_dir) / "source.txt") as f: + assert f.read() == "Test content" From 91d2778e8454bc04593050841a2ee916f45798d2 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Wed, 19 Jun 2024 23:01:30 +0530 Subject: [PATCH 3/4] Apply review suggestions --- cosmos/cache.py | 24 ++---------------------- tests/test_cache.py | 23 +---------------------- 2 files changed, 3 insertions(+), 44 deletions(-) diff --git a/cosmos/cache.py b/cosmos/cache.py index c1e38cabef..b101366a01 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -1,6 +1,5 @@ from __future__ import annotations -import os import shutil from pathlib import Path @@ -97,25 +96,6 @@ def _get_latest_partial_parse(dbt_project_path: Path, cache_dir: Path) -> Path | return None -def _copy(src: str, dst: str, *, follow_symlinks: bool = True) -> None: - """Copy a file from `src` to `dst` and preserve metadata if possible.""" - # Handle destination as directory - if os.path.isdir(dst): - dst = os.path.join(dst, os.path.basename(src)) - # shutil.copy includes permission copying via chmod. - # If the user lacks permission to run chmod, a PermissionError occurs. - # To avoid this, we split the operation into two steps: - # first, copy the file contents; then, copy metadata if feasible without raising exceptions. - # Step 1: Copy file contents (no metadata) - shutil.copyfile(src, dst, follow_symlinks=follow_symlinks) - - try: - # Step 2: Copy file metadata (permission bits and other metadata) - shutil.copystat(src, dst, follow_symlinks=follow_symlinks) - except PermissionError: - logger.info("Failed to copy the partial parse file metadata") - - def _update_partial_parse_cache(latest_partial_parse_filepath: Path, cache_dir: Path) -> None: """ Update the cache to have the latest partial parse file contents. @@ -127,8 +107,8 @@ def _update_partial_parse_cache(latest_partial_parse_filepath: Path, cache_dir: manifest_path = get_partial_parse_path(cache_dir).parent / DBT_MANIFEST_FILE_NAME latest_manifest_filepath = latest_partial_parse_filepath.parent / DBT_MANIFEST_FILE_NAME - _copy(str(latest_partial_parse_filepath), str(cache_path)) - _copy(str(latest_manifest_filepath), str(manifest_path)) + shutil.copyfile(str(latest_partial_parse_filepath), str(cache_path)) + shutil.copyfile(str(latest_manifest_filepath), str(manifest_path)) def patch_partial_parse_content(partial_parse_filepath: Path, project_path: Path) -> bool: diff --git a/tests/test_cache.py b/tests/test_cache.py index 80cd562d43..d75bc439b8 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -1,5 +1,4 @@ import logging -import os import shutil import tempfile import time @@ -11,7 +10,7 @@ from airflow import DAG from airflow.utils.task_group import TaskGroup -from cosmos.cache import _copy, _copy_partial_parse_to_project, _create_cache_identifier, _get_latest_partial_parse +from cosmos.cache import _copy_partial_parse_to_project, _create_cache_identifier, _get_latest_partial_parse from cosmos.constants import DBT_PARTIAL_PARSE_FILE_NAME, DBT_TARGET_DIR_NAME START_DATE = datetime(2024, 4, 16) @@ -87,23 +86,3 @@ def test__copy_partial_parse_to_project_msg_fails_msgpack(mock_unpack, tmp_path, _copy_partial_parse_to_project(partial_parse_filepath, Path(tmp_dir)) assert "Unable to patch the partial_parse.msgpack file due to ValueError()" in caplog.text - - -def test_copy_file_to_directory(): - """Test copying a file to an existing directory.""" - with tempfile.TemporaryDirectory() as tmpdir: - src_file = os.path.join(tmpdir, "source.txt") - dst_dir = os.path.join(tmpdir, "destination") - os.makedirs(dst_dir) - - # Create a source file - with open(src_file, "w") as f: - f.write("Test content") - - # Call _copy function - _copy(src_file, dst_dir) - - # Verify that the file was copied to the destination directory - assert os.path.isfile(os.path.join(dst_dir, "source.txt")) - with open(Path(dst_dir) / "source.txt") as f: - assert f.read() == "Test content" From 298c8c202b9eb200ad0cceca63bf9347bbb9ef48 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Fri, 21 Jun 2024 15:24:20 +0530 Subject: [PATCH 4/4] Add tests --- tests/test_cache.py | 32 +++++++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/tests/test_cache.py b/tests/test_cache.py index d75bc439b8..7d6a2d36c8 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -4,13 +4,18 @@ import time from datetime import datetime from pathlib import Path -from unittest.mock import patch +from unittest.mock import call, patch import pytest from airflow import DAG from airflow.utils.task_group import TaskGroup -from cosmos.cache import _copy_partial_parse_to_project, _create_cache_identifier, _get_latest_partial_parse +from cosmos.cache import ( + _copy_partial_parse_to_project, + _create_cache_identifier, + _get_latest_partial_parse, + _update_partial_parse_cache, +) from cosmos.constants import DBT_PARTIAL_PARSE_FILE_NAME, DBT_TARGET_DIR_NAME START_DATE = datetime(2024, 4, 16) @@ -74,7 +79,6 @@ def test_get_latest_partial_parse(tmp_path): @patch("cosmos.cache.msgpack.unpack", side_effect=ValueError) def test__copy_partial_parse_to_project_msg_fails_msgpack(mock_unpack, tmp_path, caplog): - # setup caplog.set_level(logging.INFO) source_dir = tmp_path / DBT_TARGET_DIR_NAME source_dir.mkdir() @@ -86,3 +90,25 @@ def test__copy_partial_parse_to_project_msg_fails_msgpack(mock_unpack, tmp_path, _copy_partial_parse_to_project(partial_parse_filepath, Path(tmp_dir)) assert "Unable to patch the partial_parse.msgpack file due to ValueError()" in caplog.text + + +@patch("cosmos.cache.shutil.copyfile") +@patch("cosmos.cache.get_partial_parse_path") +def test_update_partial_parse_cache(mock_get_partial_parse_path, mock_copyfile): + mock_get_partial_parse_path.side_effect = lambda cache_dir: cache_dir / "partial_parse.yml" + + latest_partial_parse_filepath = Path("/path/to/latest_partial_parse.yml") + cache_dir = Path("/path/to/cache_directory") + + # Expected paths + cache_path = cache_dir / "partial_parse.yml" + manifest_path = cache_dir / "manifest.json" + + _update_partial_parse_cache(latest_partial_parse_filepath, cache_dir) + + # Assert shutil.copyfile was called twice with the correct arguments + calls = [ + call(str(latest_partial_parse_filepath), str(cache_path)), + call(str(latest_partial_parse_filepath.parent / "manifest.json"), str(manifest_path)), + ] + mock_copyfile.assert_has_calls(calls)