diff --git a/cosmos/operators/_asynchronous/bigquery.py b/cosmos/operators/_asynchronous/bigquery.py index e6a604e5f6..a7e3909868 100644 --- a/cosmos/operators/_asynchronous/bigquery.py +++ b/cosmos/operators/_asynchronous/bigquery.py @@ -33,9 +33,12 @@ from cosmos.constants import AIRFLOW_VERSION from cosmos.dataset import get_dataset_alias_name from cosmos.exceptions import CosmosValueError +from cosmos.log import get_logger from cosmos.operators.local import AbstractDbtLocalBase from cosmos.settings import remote_target_path, remote_target_path_conn_id +logger = get_logger(__name__) + DEFAULT_PRODUCER_ASYNC_TASK_ID = "dbt_setup_async" @@ -117,6 +120,11 @@ def __init__( # BigQueryInsertJobOperator parameters and hence fails to initialise the operator due to missing arguments. # To fix this, we temporarily set the base class to only BigQueryInsertJobOperator during initialization, # then restore the full inheritance chain afterward. + if kwargs.pop("deferrable", True) is False: + logger.warning( + "DbtRunAirflowAsyncBigqueryOperator requires deferrable=True. " + "The provided value of False has been ignored." + ) DbtRunAirflowAsyncBigqueryOperator.__bases__ = (BigQueryInsertJobOperator,) super().__init__( gcp_conn_id=self.gcp_conn_id, diff --git a/tests/operators/_asynchronous/test_bigquery.py b/tests/operators/_asynchronous/test_bigquery.py index 700f17f0fb..2ede7dead2 100644 --- a/tests/operators/_asynchronous/test_bigquery.py +++ b/tests/operators/_asynchronous/test_bigquery.py @@ -45,6 +45,33 @@ def test_dbt_run_airflow_async_bigquery_operator_init(profile_config_mock): assert operator.full_refresh is False # Default value should be False +def test_dbt_run_airflow_async_bigquery_operator_init_with_deferrable_false_warns(profile_config_mock, caplog): + """Passing deferrable=False must not raise TypeError, be overridden to True, and log a warning.""" + import logging + + with caplog.at_level(logging.WARNING, logger="cosmos.operators._asynchronous.bigquery"): + operator = DbtRunAirflowAsyncBigqueryOperator( + task_id="test_task", + project_dir="/path/to/project", + profile_config=profile_config_mock, + dbt_kwargs={"task_id": "test_task"}, + deferrable=False, + ) + assert operator.deferrable is True + assert "deferrable=True" in caplog.text + + +def test_dbt_run_airflow_async_bigquery_operator_init_deferrable_defaults_true(profile_config_mock): + """deferrable defaults to True when not supplied.""" + operator = DbtRunAirflowAsyncBigqueryOperator( + task_id="test_task", + project_dir="/path/to/project", + profile_config=profile_config_mock, + dbt_kwargs={"task_id": "test_task"}, + ) + assert operator.deferrable is True + + def test_dbt_run_airflow_async_bigquery_operator_base_cmd(profile_config_mock): """Test base_cmd property returns the correct dbt command.""" operator = DbtRunAirflowAsyncBigqueryOperator(