Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions tests/operators/test_watcher_kubernetes_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,123 @@ def test_use_event_returns_false():
assert sensor.use_event() is False


class TestCallbacksNormalization:
"""Tests for the callbacks normalization logic in DbtProducerWatcherKubernetesOperator."""

def test_callbacks_none_adds_watcher_callback(self):
"""
Test that when callbacks is None, WatcherKubernetesCallback is added.
"""
from cosmos.operators.watcher_kubernetes import WatcherKubernetesCallback

op = DbtProducerWatcherKubernetesOperator(
project_dir=".",
profile_config=None,
image="dbt-image:latest",
callbacks=None,
)
assert op.callbacks == [WatcherKubernetesCallback]

def test_callbacks_not_provided_adds_watcher_callback(self):
"""
Test that when callbacks is not provided, WatcherKubernetesCallback is added.
"""
from cosmos.operators.watcher_kubernetes import WatcherKubernetesCallback

op = DbtProducerWatcherKubernetesOperator(
project_dir=".",
profile_config=None,
image="dbt-image:latest",
)
assert op.callbacks == [WatcherKubernetesCallback]

def test_callbacks_list_appends_watcher_callback(self):
"""
Test that when callbacks is a list, WatcherKubernetesCallback is appended.
"""
from cosmos.operators.watcher_kubernetes import WatcherKubernetesCallback

class CustomCallback:
pass

op = DbtProducerWatcherKubernetesOperator(
project_dir=".",
profile_config=None,
image="dbt-image:latest",
callbacks=[CustomCallback],
)
assert op.callbacks == [CustomCallback, WatcherKubernetesCallback]

def test_callbacks_tuple_appends_watcher_callback(self):
"""
Test that when callbacks is a tuple, WatcherKubernetesCallback is appended.
"""
from cosmos.operators.watcher_kubernetes import WatcherKubernetesCallback

class CustomCallback:
pass

op = DbtProducerWatcherKubernetesOperator(
project_dir=".",
profile_config=None,
image="dbt-image:latest",
callbacks=(CustomCallback,),
)
assert op.callbacks == [CustomCallback, WatcherKubernetesCallback]

def test_callbacks_single_value_wraps_and_appends_watcher_callback(self):
"""
Test that when callbacks is a single value (not list/tuple), it is wrapped in a list
and WatcherKubernetesCallback is appended.
"""
from cosmos.operators.watcher_kubernetes import WatcherKubernetesCallback

class CustomCallback:
pass

op = DbtProducerWatcherKubernetesOperator(
project_dir=".",
profile_config=None,
image="dbt-image:latest",
callbacks=CustomCallback,
)
assert op.callbacks == [CustomCallback, WatcherKubernetesCallback]

def test_callbacks_empty_list_adds_watcher_callback(self):
"""
Test that when callbacks is an empty list, WatcherKubernetesCallback is added.
"""
from cosmos.operators.watcher_kubernetes import WatcherKubernetesCallback

op = DbtProducerWatcherKubernetesOperator(
project_dir=".",
profile_config=None,
image="dbt-image:latest",
callbacks=[],
)
assert op.callbacks == [WatcherKubernetesCallback]

def test_callbacks_multiple_values_appends_watcher_callback(self):
"""
Test that when callbacks contains multiple values, WatcherKubernetesCallback is appended.
"""
from cosmos.operators.watcher_kubernetes import WatcherKubernetesCallback

class CustomCallback1:
pass

class CustomCallback2:
pass

op = DbtProducerWatcherKubernetesOperator(
project_dir=".",
profile_config=None,
image="dbt-image:latest",
callbacks=[CustomCallback1, CustomCallback2],
)
assert op.callbacks == [CustomCallback1, CustomCallback2, WatcherKubernetesCallback]


def test_callbacks_included_in_producer_operator():
"""
Test that the WatcherKubernetesCallback is included in the callbacks of the DbtProducerWatcherKubernetesOperator.
Expand Down