Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 17 additions & 38 deletions ibis-server/app/model/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,45 +429,24 @@ def __init__(self, connection_info: ConnectionInfo):
super().__init__(DataSource.bigquery, connection_info)
self.connection_info = connection_info

@tracer.start_as_current_span("connector_query", kind=trace.SpanKind.CLIENT)
def query(self, sql: str, limit: int | None = None) -> pa.Table:
try:
return super().query(sql, limit)
except ValueError as e:
# Import here to avoid override the custom datatypes
import ibis.backends.bigquery # noqa: PLC0415

# Try to match the error message from the google cloud bigquery library matching Arrow type error.
# If the error message matches, requries to get the schema from the result and generate a empty pandas dataframe with the mapped schema
#
# It's a workaround for the issue that the ibis library does not support empty result for some special types (e.g. JSON or Interval)
# see details:
# - https://github.com/Canner/wren-engine/issues/909
# - https://github.com/ibis-project/ibis/issues/10612
if "Must pass schema" in str(e):
with tracer.start_as_current_span(
"get_schema", kind=trace.SpanKind.CLIENT
):
credits_json = loads(
base64.b64decode(
self.connection_info.credentials.get_secret_value()
).decode("utf-8")
)
credentials = service_account.Credentials.from_service_account_info(
credits_json
)
credentials = credentials.with_scopes(
[
"https://www.googleapis.com/auth/drive",
"https://www.googleapis.com/auth/cloud-platform",
]
)
client = bigquery.Client(credentials=credentials)
ibis_schema_mapper = ibis.backends.bigquery.BigQuerySchema()
bq_fields = client.query(sql).result()
ibis_fields = ibis_schema_mapper.to_ibis(bq_fields.schema)
return pd.DataFrame(columns=ibis_fields.names)
else:
raise e
credits_json = loads(
base64.b64decode(
self.connection_info.credentials.get_secret_value()
).decode("utf-8")
)
credentials = service_account.Credentials.from_service_account_info(
credits_json
)
credentials = credentials.with_scopes(
[
"https://www.googleapis.com/auth/drive",
"https://www.googleapis.com/auth/cloud-platform",
]
)
client = bigquery.Client(credentials=credentials)
return client.query(sql).result(max_results=limit).to_arrow()


class DuckDBConnector:
Expand Down
8 changes: 4 additions & 4 deletions ibis-server/app/model/metadata/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from loguru import logger

from app.model import BigQueryConnectionInfo
from app.model.data_source import DataSource
from app.model.connector import BigQueryConnector
from app.model.metadata.dto import (
Column,
Constraint,
Expand Down Expand Up @@ -35,7 +35,7 @@
class BigQueryMetadata(Metadata):
def __init__(self, connection_info: BigQueryConnectionInfo):
super().__init__(connection_info)
self.connection = DataSource.bigquery.get_connection(connection_info)
self.connection = BigQueryConnector(connection_info)

def get_table_list(self) -> list[Table]:
dataset_id = self.connection_info.dataset_id.get_secret_value()
Expand Down Expand Up @@ -74,7 +74,7 @@ def get_table_list(self) -> list[Table]:
AND cf.data_type NOT LIKE 'RANGE%'
ORDER BY cf.field_path ASC
"""
response = self.connection.sql(sql).to_pandas().to_dict(orient="records")
response = self.connection.query(sql).to_pandas().to_dict(orient="records")

def get_column(row) -> Column:
return Column(
Expand Down Expand Up @@ -175,7 +175,7 @@ def get_constraints(self) -> list[Constraint]:
ON ccu.constraint_name = tc.constraint_name
WHERE tc.constraint_type = 'FOREIGN KEY'
"""
response = self.connection.sql(sql).to_pandas().to_dict(orient="records")
response = self.connection.query(sql).to_pandas().to_dict(orient="records")

constraints = []
for row in response:
Expand Down
Loading