diff --git a/cosmos/dbt/runner.py b/cosmos/dbt/runner.py index 345701ff31..be4f8f9e55 100644 --- a/cosmos/dbt/runner.py +++ b/cosmos/dbt/runner.py @@ -1,5 +1,6 @@ from __future__ import annotations +import gc import sys from collections.abc import Callable from functools import lru_cache @@ -61,6 +62,28 @@ def get_runner(callbacks: list[Callable] | None = None) -> dbtRunner: # type: i return _get_cached_dbt_runner() +def _cleanup_dbt_adapters() -> None: + """ + Reset dbt adapters to release semaphores. + + dbt adapters maintain internal state that holds onto + semaphores. Resetting the adapters after each dbt command combined with + garbage collection prevents "leaked semaphore objects" warnings. + + See: https://github.com/astronomer/astronomer-cosmos/issues/2334 + """ + try: + from dbt.adapters.factory import reset_adapters + + reset_adapters() + except ImportError: + pass + except (RuntimeError, KeyError, AttributeError): + logger.debug("Error resetting dbt adapters", exc_info=True) + + gc.collect() + + def run_command( command: list[str], env: dict[str, str], cwd: str, callbacks: list[Callable] | None = None, **kwargs: Any # type: ignore[type-arg] ) -> dbtRunnerResult: @@ -74,7 +97,13 @@ def run_command( with change_working_directory(cwd), environ(env): logger.info("Trying to run dbtRunner with:\n %s\n in %s", cli_args, cwd) runner = get_runner(callbacks=callbacks) - result = runner.invoke(cli_args) + try: + result = runner.invoke(cli_args) + finally: + # Reset dbt adapters to release semaphores (run on all exit paths) + # See: https://github.com/astronomer/astronomer-cosmos/issues/2334 + _cleanup_dbt_adapters() + return result diff --git a/tests/dbt/test_runner.py b/tests/dbt/test_runner.py index ea17592ed7..a696b9954d 100644 --- a/tests/dbt/test_runner.py +++ b/tests/dbt/test_runner.py @@ -2,8 +2,9 @@ import shutil import sys import tempfile +import types from pathlib import Path -from unittest.mock import patch +from unittest.mock import MagicMock, patch from airflow import DAG from pendulum import datetime @@ -58,6 +59,99 @@ def test_is_available_is_false(): assert not dbt_runner.is_available() +def test_cleanup_dbt_adapters_calls_reset_adapters_and_gc(): + """_cleanup_dbt_adapters calls reset_adapters when available and always runs gc.collect().""" + factory_mock = MagicMock() + with ( + patch("cosmos.dbt.runner.gc") as mock_gc, + patch.dict("sys.modules", {"dbt.adapters.factory": factory_mock}), + ): + dbt_runner._cleanup_dbt_adapters() + factory_mock.reset_adapters.assert_called_once() + mock_gc.collect.assert_called_once() + + +def test_cleanup_dbt_adapters_handles_import_error(): + """_cleanup_dbt_adapters does not raise when dbt.adapters.factory is not available.""" + + # Use a fake module in sys.modules so the "from dbt.adapters.factory import ..." fails + # with ImportError without patching builtins.__import__ globally. + class FakeFactoryModule(types.ModuleType): + def __getattr__(self, name: str): + raise ImportError("No module named 'dbt.adapters.factory'") + + fake_factory = FakeFactoryModule("dbt.adapters.factory") + with ( + patch("cosmos.dbt.runner.gc") as mock_gc, + patch.dict("sys.modules", {"dbt.adapters.factory": fake_factory}), + ): + dbt_runner._cleanup_dbt_adapters() + mock_gc.collect.assert_called_once() + + +def test_cleanup_dbt_adapters_handles_reset_exception(): + """_cleanup_dbt_adapters catches exceptions from reset_adapters and still runs gc.collect().""" + factory_mock = MagicMock() + factory_mock.reset_adapters.side_effect = RuntimeError("adapter error") + with ( + patch("cosmos.dbt.runner.gc") as mock_gc, + patch("cosmos.dbt.runner.logger") as mock_logger, + patch.dict("sys.modules", {"dbt.adapters.factory": factory_mock}), + ): + dbt_runner._cleanup_dbt_adapters() + mock_gc.collect.assert_called_once() + mock_logger.debug.assert_called_once_with("Error resetting dbt adapters", exc_info=True) + + +def test_run_command_calls_cleanup_dbt_adapters(): + """run_command calls _cleanup_dbt_adapters after runner.invoke to release semaphores.""" + fake_result = MagicMock() + fake_result.success = True + fake_result.exception = None + fake_result.result = None + + fake_runner = MagicMock() + fake_runner.invoke.return_value = fake_result + + with ( + patch.object(dbt_runner, "get_runner", return_value=fake_runner), + patch.object(dbt_runner, "_cleanup_dbt_adapters") as mock_cleanup, + patch.object(dbt_runner, "change_working_directory"), + patch.object(dbt_runner, "environ"), + patch.object(dbt_runner, "logger"), + ): + result = dbt_runner.run_command( + command=["dbt", "deps"], + env={}, + cwd="/tmp/project", + ) + assert result is fake_result + fake_runner.invoke.assert_called_once() + mock_cleanup.assert_called_once() + + +def test_run_command_calls_cleanup_dbt_adapters_when_invoke_raises(): + """run_command calls _cleanup_dbt_adapters even when runner.invoke raises (try/finally).""" + fake_runner = MagicMock() + fake_runner.invoke.side_effect = RuntimeError("invoke failed") + + with ( + patch.object(dbt_runner, "get_runner", return_value=fake_runner), + patch.object(dbt_runner, "_cleanup_dbt_adapters") as mock_cleanup, + patch.object(dbt_runner, "change_working_directory"), + patch.object(dbt_runner, "environ"), + patch.object(dbt_runner, "logger"), + ): + with pytest.raises(RuntimeError, match="invoke failed"): + dbt_runner.run_command( + command=["dbt", "deps"], + env={}, + cwd="/tmp/project", + ) + fake_runner.invoke.assert_called_once() + mock_cleanup.assert_called_once() + + @pytest.mark.integration def test_is_available_is_true(): assert dbt_runner.is_available()