Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions cosmos/operators/_asynchronous/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,15 @@
from typing import TYPE_CHECKING, Any, Sequence

import airflow
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

try:
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
except ImportError:
raise ImportError(
"Could not import BigQueryInsertJobOperator. Ensure you've installed the Google Cloud provider separately or "
"with with `pip install apache-airflow-providers-google`."
)

from airflow.utils.context import Context
from airflow.utils.session import NEW_SESSION, provide_session
from packaging.version import Version
Expand Down Expand Up @@ -158,7 +166,12 @@ def _store_template_fields(self, context: Context, session: Session = NEW_SESSIO
self.log.debug("Executed SQL is: %s", sql)
self.compiled_sql = sql

profile = self.profile_config.profile_mapping.profile
if self.profile_config.profile_mapping is not None:
profile = self.profile_config.profile_mapping.profile
else:
raise CosmosValueError(
"The `profile_config.profile`_mapping attribute must be defined to use `ExecutionMode.AIRFLOW_ASYNC`"
)
self.gcp_project = profile["project"]
self.dataset = profile["dataset"]

Expand Down