diff --git a/tests/operators/test_watcher_kubernetes_unit.py b/tests/operators/test_watcher_kubernetes_unit.py index aece178fb3..a0fddd9e60 100644 --- a/tests/operators/test_watcher_kubernetes_unit.py +++ b/tests/operators/test_watcher_kubernetes_unit.py @@ -224,6 +224,123 @@ def test_use_event_returns_false(): assert sensor.use_event() is False +class TestCallbacksNormalization: + """Tests for the callbacks normalization logic in DbtProducerWatcherKubernetesOperator.""" + + def test_callbacks_none_adds_watcher_callback(self): + """ + Test that when callbacks is None, WatcherKubernetesCallback is added. + """ + from cosmos.operators.watcher_kubernetes import WatcherKubernetesCallback + + op = DbtProducerWatcherKubernetesOperator( + project_dir=".", + profile_config=None, + image="dbt-image:latest", + callbacks=None, + ) + assert op.callbacks == [WatcherKubernetesCallback] + + def test_callbacks_not_provided_adds_watcher_callback(self): + """ + Test that when callbacks is not provided, WatcherKubernetesCallback is added. + """ + from cosmos.operators.watcher_kubernetes import WatcherKubernetesCallback + + op = DbtProducerWatcherKubernetesOperator( + project_dir=".", + profile_config=None, + image="dbt-image:latest", + ) + assert op.callbacks == [WatcherKubernetesCallback] + + def test_callbacks_list_appends_watcher_callback(self): + """ + Test that when callbacks is a list, WatcherKubernetesCallback is appended. + """ + from cosmos.operators.watcher_kubernetes import WatcherKubernetesCallback + + class CustomCallback: + pass + + op = DbtProducerWatcherKubernetesOperator( + project_dir=".", + profile_config=None, + image="dbt-image:latest", + callbacks=[CustomCallback], + ) + assert op.callbacks == [CustomCallback, WatcherKubernetesCallback] + + def test_callbacks_tuple_appends_watcher_callback(self): + """ + Test that when callbacks is a tuple, WatcherKubernetesCallback is appended. + """ + from cosmos.operators.watcher_kubernetes import WatcherKubernetesCallback + + class CustomCallback: + pass + + op = DbtProducerWatcherKubernetesOperator( + project_dir=".", + profile_config=None, + image="dbt-image:latest", + callbacks=(CustomCallback,), + ) + assert op.callbacks == [CustomCallback, WatcherKubernetesCallback] + + def test_callbacks_single_value_wraps_and_appends_watcher_callback(self): + """ + Test that when callbacks is a single value (not list/tuple), it is wrapped in a list + and WatcherKubernetesCallback is appended. + """ + from cosmos.operators.watcher_kubernetes import WatcherKubernetesCallback + + class CustomCallback: + pass + + op = DbtProducerWatcherKubernetesOperator( + project_dir=".", + profile_config=None, + image="dbt-image:latest", + callbacks=CustomCallback, + ) + assert op.callbacks == [CustomCallback, WatcherKubernetesCallback] + + def test_callbacks_empty_list_adds_watcher_callback(self): + """ + Test that when callbacks is an empty list, WatcherKubernetesCallback is added. + """ + from cosmos.operators.watcher_kubernetes import WatcherKubernetesCallback + + op = DbtProducerWatcherKubernetesOperator( + project_dir=".", + profile_config=None, + image="dbt-image:latest", + callbacks=[], + ) + assert op.callbacks == [WatcherKubernetesCallback] + + def test_callbacks_multiple_values_appends_watcher_callback(self): + """ + Test that when callbacks contains multiple values, WatcherKubernetesCallback is appended. + """ + from cosmos.operators.watcher_kubernetes import WatcherKubernetesCallback + + class CustomCallback1: + pass + + class CustomCallback2: + pass + + op = DbtProducerWatcherKubernetesOperator( + project_dir=".", + profile_config=None, + image="dbt-image:latest", + callbacks=[CustomCallback1, CustomCallback2], + ) + assert op.callbacks == [CustomCallback1, CustomCallback2, WatcherKubernetesCallback] + + def test_callbacks_included_in_producer_operator(): """ Test that the WatcherKubernetesCallback is included in the callbacks of the DbtProducerWatcherKubernetesOperator.