Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cosmos/operators/_asynchronous/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@
from cosmos.constants import AIRFLOW_VERSION
from cosmos.dataset import get_dataset_alias_name
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger
from cosmos.operators.local import AbstractDbtLocalBase
from cosmos.settings import remote_target_path, remote_target_path_conn_id

logger = get_logger(__name__)

DEFAULT_PRODUCER_ASYNC_TASK_ID = "dbt_setup_async"


Expand Down Expand Up @@ -117,6 +120,11 @@ def __init__(
# BigQueryInsertJobOperator parameters and hence fails to initialise the operator due to missing arguments.
# To fix this, we temporarily set the base class to only BigQueryInsertJobOperator during initialization,
# then restore the full inheritance chain afterward.
if kwargs.pop("deferrable", True) is False:
logger.warning(
"DbtRunAirflowAsyncBigqueryOperator requires deferrable=True. "
"The provided value of False has been ignored."
)
DbtRunAirflowAsyncBigqueryOperator.__bases__ = (BigQueryInsertJobOperator,)
super().__init__(
gcp_conn_id=self.gcp_conn_id,
Expand Down
27 changes: 27 additions & 0 deletions tests/operators/_asynchronous/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,33 @@ def test_dbt_run_airflow_async_bigquery_operator_init(profile_config_mock):
assert operator.full_refresh is False # Default value should be False


def test_dbt_run_airflow_async_bigquery_operator_init_with_deferrable_false_warns(profile_config_mock, caplog):
"""Passing deferrable=False must not raise TypeError, be overridden to True, and log a warning."""
import logging

with caplog.at_level(logging.WARNING, logger="cosmos.operators._asynchronous.bigquery"):
operator = DbtRunAirflowAsyncBigqueryOperator(
task_id="test_task",
project_dir="/path/to/project",
profile_config=profile_config_mock,
dbt_kwargs={"task_id": "test_task"},
deferrable=False,
)
assert operator.deferrable is True
assert "deferrable=True" in caplog.text


def test_dbt_run_airflow_async_bigquery_operator_init_deferrable_defaults_true(profile_config_mock):
"""deferrable defaults to True when not supplied."""
operator = DbtRunAirflowAsyncBigqueryOperator(
task_id="test_task",
project_dir="/path/to/project",
profile_config=profile_config_mock,
dbt_kwargs={"task_id": "test_task"},
)
assert operator.deferrable is True


def test_dbt_run_airflow_async_bigquery_operator_base_cmd(profile_config_mock):
"""Test base_cmd property returns the correct dbt command."""
operator = DbtRunAirflowAsyncBigqueryOperator(
Expand Down
Loading