diff --git a/cosmos/operators/_asynchronous/bigquery.py b/cosmos/operators/_asynchronous/bigquery.py index 1d28f5da3e..007ea47145 100644 --- a/cosmos/operators/_asynchronous/bigquery.py +++ b/cosmos/operators/_asynchronous/bigquery.py @@ -5,7 +5,15 @@ from typing import TYPE_CHECKING, Any, Sequence import airflow -from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator + +try: + from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator +except ImportError: + raise ImportError( + "Could not import BigQueryInsertJobOperator. Ensure you've installed the Google Cloud provider separately or " + "with with `pip install apache-airflow-providers-google`." + ) + from airflow.utils.context import Context from airflow.utils.session import NEW_SESSION, provide_session from packaging.version import Version @@ -158,7 +166,12 @@ def _store_template_fields(self, context: Context, session: Session = NEW_SESSIO self.log.debug("Executed SQL is: %s", sql) self.compiled_sql = sql - profile = self.profile_config.profile_mapping.profile + if self.profile_config.profile_mapping is not None: + profile = self.profile_config.profile_mapping.profile + else: + raise CosmosValueError( + "The `profile_config.profile`_mapping attribute must be defined to use `ExecutionMode.AIRFLOW_ASYNC`" + ) self.gcp_project = profile["project"] self.dataset = profile["dataset"]