Skip to content
This repository was archived by the owner on May 7, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions ibis-server/app/model/connector.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import base64
import importlib
from functools import cache
from json import loads
from typing import Any

import ibis
import ibis.backends.bigquery
import ibis.expr.datatypes as dt
import ibis.expr.schema as sch
import ibis.formats
import pandas as pd
import sqlglot.expressions as sge
from google.cloud import bigquery
from google.oauth2 import service_account
from ibis import BaseBackend
from ibis.backends.sql.compilers.postgres import compiler as postgres_compiler

Expand All @@ -23,6 +29,8 @@ def __init__(self, data_source: DataSource, connection_info: ConnectionInfo):
self._connector = MSSqlConnector(connection_info)
elif data_source == DataSource.canner:
self._connector = CannerConnector(connection_info)
elif data_source == DataSource.bigquery:
self._connector = BigQueryConnector(connection_info)
else:
self._connector = SimpleConnector(data_source, connection_info)

Expand Down Expand Up @@ -100,6 +108,40 @@ def _to_ibis_type(type_name: str) -> dt.DataType:
return postgres_compiler.type_mapper.from_string(type_name)


class BigQueryConnector(SimpleConnector):
def __init__(self, connection_info: ConnectionInfo):
super().__init__(DataSource.bigquery, connection_info)
self.connection_info = connection_info

def query(self, sql: str, limit: int) -> pd.DataFrame:
try:
return super().query(sql, limit)
except ValueError as e:
# Try to match the error message from the google cloud bigquery library matching Arrow type error.
# If the error message matches, requries to get the schema from the result and generate a empty pandas dataframe with the mapped schema
#
# It's a workaround for the issue that the ibis library does not support empty result for some special types (e.g. JSON or Interval)
# see details:
# - https://github.com/Canner/wren-engine/issues/909
# - https://github.com/ibis-project/ibis/issues/10612
if "Must pass schema" in str(e):
credits_json = loads(
base64.b64decode(
self.connection_info.credentials.get_secret_value()
).decode("utf-8")
)
credentials = service_account.Credentials.from_service_account_info(
credits_json
)
client = bigquery.Client(credentials=credentials)
ibis_schema_mapper = ibis.backends.bigquery.BigQuerySchema()
bq_fields = client.query(sql).result()
ibis_fields = ibis_schema_mapper.to_ibis(bq_fields.schema)
return pd.DataFrame(columns=ibis_fields.names)
else:
raise e


@cache
def _get_pg_type_names(connection: BaseBackend) -> dict[int, str]:
cur = connection.raw_sql("SELECT oid, typname FROM pg_type")
Expand Down
32 changes: 32 additions & 0 deletions ibis-server/tests/routers/v2/connector/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,38 @@ async def test_query_values(client, manifest_str):
assert response.status_code == 204


async def test_query_empty_json(client, manifest_str):
# Test the empty result with json column
response = await client.post(
url=f"{base_url}/query",
json={
"manifestStr": manifest_str,
"connectionInfo": connection_info,
"sql": "select json_object('a', 1, 'b', 2) limit 0",
},
)
assert response.status_code == 200
result = response.json()
assert len(result["data"]) == 0
assert result["dtypes"] == {"f0_": "object"}

# Test only the json column is null
response = await client.post(
url=f"{base_url}/query",
json={
"manifestStr": manifest_str,
"connectionInfo": connection_info,
"sql": "select cast(null as JSON), 1",
},
)
assert response.status_code == 200
result = response.json()
assert len(result["data"]) == 1
assert result["data"][0][0] is None
assert result["data"][0][1] == 1
assert result["dtypes"] == {"f0_": "object", "f1_": "int64"}


async def test_interval(client, manifest_str):
response = await client.post(
url=f"{base_url}/query",
Expand Down