From 3f7468d76ed4f1e967445f2425d0d551464989a8 Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Thu, 13 Nov 2025 11:04:28 +0800 Subject: [PATCH 1/3] use bigquery python client directly --- ibis-server/app/model/connector.py | 55 +++++++--------------- ibis-server/app/model/metadata/bigquery.py | 7 +-- 2 files changed, 20 insertions(+), 42 deletions(-) diff --git a/ibis-server/app/model/connector.py b/ibis-server/app/model/connector.py index ab27fb4c7..d09150449 100644 --- a/ibis-server/app/model/connector.py +++ b/ibis-server/app/model/connector.py @@ -430,45 +430,22 @@ def __init__(self, connection_info: ConnectionInfo): self.connection_info = connection_info def query(self, sql: str, limit: int | None = None) -> pa.Table: - try: - return super().query(sql, limit) - except ValueError as e: - # Import here to avoid override the custom datatypes - import ibis.backends.bigquery # noqa: PLC0415 - - # Try to match the error message from the google cloud bigquery library matching Arrow type error. - # If the error message matches, requries to get the schema from the result and generate a empty pandas dataframe with the mapped schema - # - # It's a workaround for the issue that the ibis library does not support empty result for some special types (e.g. JSON or Interval) - # see details: - # - https://github.com/Canner/wren-engine/issues/909 - # - https://github.com/ibis-project/ibis/issues/10612 - if "Must pass schema" in str(e): - with tracer.start_as_current_span( - "get_schema", kind=trace.SpanKind.CLIENT - ): - credits_json = loads( - base64.b64decode( - self.connection_info.credentials.get_secret_value() - ).decode("utf-8") - ) - credentials = service_account.Credentials.from_service_account_info( - credits_json - ) - credentials = credentials.with_scopes( - [ - "https://www.googleapis.com/auth/drive", - "https://www.googleapis.com/auth/cloud-platform", - ] - ) - client = bigquery.Client(credentials=credentials) - ibis_schema_mapper = ibis.backends.bigquery.BigQuerySchema() - bq_fields = client.query(sql).result() - ibis_fields = ibis_schema_mapper.to_ibis(bq_fields.schema) - return pd.DataFrame(columns=ibis_fields.names) - else: - raise e - + credits_json = loads( + base64.b64decode( + self.connection_info.credentials.get_secret_value() + ).decode("utf-8") + ) + credentials = service_account.Credentials.from_service_account_info( + credits_json + ) + credentials = credentials.with_scopes( + [ + "https://www.googleapis.com/auth/drive", + "https://www.googleapis.com/auth/cloud-platform", + ] + ) + client = bigquery.Client(credentials=credentials) + return client.query(sql).result(max_results=limit).to_arrow() class DuckDBConnector: def __init__(self, connection_info: ConnectionInfo): diff --git a/ibis-server/app/model/metadata/bigquery.py b/ibis-server/app/model/metadata/bigquery.py index 654abbc37..2e108c969 100644 --- a/ibis-server/app/model/metadata/bigquery.py +++ b/ibis-server/app/model/metadata/bigquery.py @@ -1,3 +1,4 @@ +from app.model.connector import BigQueryConnector from loguru import logger from app.model import BigQueryConnectionInfo @@ -35,7 +36,7 @@ class BigQueryMetadata(Metadata): def __init__(self, connection_info: BigQueryConnectionInfo): super().__init__(connection_info) - self.connection = DataSource.bigquery.get_connection(connection_info) + self.connection = BigQueryConnector(connection_info) def get_table_list(self) -> list[Table]: dataset_id = self.connection_info.dataset_id.get_secret_value() @@ -74,7 +75,7 @@ def get_table_list(self) -> list[Table]: AND cf.data_type NOT LIKE 'RANGE%' ORDER BY cf.field_path ASC """ - response = self.connection.sql(sql).to_pandas().to_dict(orient="records") + response = self.connection.query(sql).to_pandas().to_dict(orient="records") def get_column(row) -> Column: return Column( @@ -175,7 +176,7 @@ def get_constraints(self) -> list[Constraint]: ON ccu.constraint_name = tc.constraint_name WHERE tc.constraint_type = 'FOREIGN KEY' """ - response = self.connection.sql(sql).to_pandas().to_dict(orient="records") + response = self.connection.query(sql).to_pandas().to_dict(orient="records") constraints = [] for row in response: From 7afcfd931455ca740c33dfb4a5fd7a8758345b4b Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Thu, 13 Nov 2025 11:10:10 +0800 Subject: [PATCH 2/3] fix fmt --- ibis-server/app/model/connector.py | 1 + ibis-server/app/model/metadata/bigquery.py | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ibis-server/app/model/connector.py b/ibis-server/app/model/connector.py index d09150449..eccb90193 100644 --- a/ibis-server/app/model/connector.py +++ b/ibis-server/app/model/connector.py @@ -447,6 +447,7 @@ def query(self, sql: str, limit: int | None = None) -> pa.Table: client = bigquery.Client(credentials=credentials) return client.query(sql).result(max_results=limit).to_arrow() + class DuckDBConnector: def __init__(self, connection_info: ConnectionInfo): import duckdb # noqa: PLC0415 diff --git a/ibis-server/app/model/metadata/bigquery.py b/ibis-server/app/model/metadata/bigquery.py index 2e108c969..4db2d4b8c 100644 --- a/ibis-server/app/model/metadata/bigquery.py +++ b/ibis-server/app/model/metadata/bigquery.py @@ -1,8 +1,7 @@ -from app.model.connector import BigQueryConnector from loguru import logger from app.model import BigQueryConnectionInfo -from app.model.data_source import DataSource +from app.model.connector import BigQueryConnector from app.model.metadata.dto import ( Column, Constraint, From 173dafb1b45a606e14863869141ce9264f9f955d Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Thu, 13 Nov 2025 11:25:22 +0800 Subject: [PATCH 3/3] add tracing span --- ibis-server/app/model/connector.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ibis-server/app/model/connector.py b/ibis-server/app/model/connector.py index eccb90193..9c00115c2 100644 --- a/ibis-server/app/model/connector.py +++ b/ibis-server/app/model/connector.py @@ -429,6 +429,7 @@ def __init__(self, connection_info: ConnectionInfo): super().__init__(DataSource.bigquery, connection_info) self.connection_info = connection_info + @tracer.start_as_current_span("connector_query", kind=trace.SpanKind.CLIENT) def query(self, sql: str, limit: int | None = None) -> pa.Table: credits_json = loads( base64.b64decode(