Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from cosmos import settings

__version__ = "1.10.1"
__version__ = "1.10.2a1"

if not settings.enable_memory_optimised_imports:
from cosmos.airflow.dag import DbtDag
Expand Down
3 changes: 3 additions & 0 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,9 @@ def store_freshness_json(self, tmp_project_dir: str, context: Context) -> None:
self.freshness = ""

def _override_rtif(self, context: Context) -> None:
Comment thread
pankajkoti marked this conversation as resolved.
if not self.should_store_compiled_sql:
return

if AIRFLOW_VERSION.major == _AIRFLOW3_MAJOR_VERSION:
self.overwrite_rtif_after_execution = True
return
Expand Down
31 changes: 31 additions & 0 deletions tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -1920,3 +1920,34 @@ def test_delete_sql_files_no_remote_target_configured(mock_configure_remote):
with patch.object(operator.log, "warning") as mock_log_warning:
operator._delete_sql_files()
mock_log_warning.assert_called_once_with(expected_log_message)


@pytest.mark.skipif(version.parse(airflow_version).major == 3, reason="Test only applies to Airflow 2")
def test_override_rtif_airflow2_with_should_store_compiled_sql_false():
"""Test that _override_rtif does nothing in Airflow 2 when should_store_compiled_sql is False"""
operator = DbtRunLocalOperator(
task_id="test",
profile_config=None,
project_dir="my/dir",
should_store_compiled_sql=False,
)

context = {"task_instance": MagicMock()}

operator._override_rtif(context)
assert not context["task_instance"].called


@pytest.mark.skipif(version.parse(airflow_version).major == 2, reason="Test only applies to Airflow 3")
def test_override_rtif_airflow3_with_should_store_compiled_sql_false():
"""Test that _override_rtif sets overwrite_rtif_after_execution to False in Airflow 3 when should_store_compiled_sql is False"""
operator = DbtRunLocalOperator(
task_id="test",
profile_config=None,
project_dir="my/dir",
should_store_compiled_sql=False,
)

context = {"task_instance": MagicMock()}
operator._override_rtif(context)
assert operator.overwrite_rtif_after_execution is False
Loading