From be86e8596f59950164d2ea23c1cfba7490b0988c Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Wed, 25 Dec 2024 17:04:08 +0800 Subject: [PATCH 1/3] implement workaround for empty json result --- ibis-server/app/model/connector.py | 42 +++++++++++++++++++ .../routers/v2/connector/test_bigquery.py | 15 +++++++ 2 files changed, 57 insertions(+) diff --git a/ibis-server/app/model/connector.py b/ibis-server/app/model/connector.py index a433860b2..86e944950 100644 --- a/ibis-server/app/model/connector.py +++ b/ibis-server/app/model/connector.py @@ -1,12 +1,18 @@ +import base64 import importlib from functools import cache +from json import loads from typing import Any import ibis +import ibis.backends.bigquery import ibis.expr.datatypes as dt import ibis.expr.schema as sch +import ibis.formats import pandas as pd import sqlglot.expressions as sge +from google.cloud import bigquery +from google.oauth2 import service_account from ibis import BaseBackend from ibis.backends.sql.compilers.postgres import compiler as postgres_compiler @@ -23,6 +29,8 @@ def __init__(self, data_source: DataSource, connection_info: ConnectionInfo): self._connector = MSSqlConnector(connection_info) elif data_source == DataSource.canner: self._connector = CannerConnector(connection_info) + elif data_source == DataSource.bigquery: + self._connector = BigQueryConnector(connection_info) else: self._connector = SimpleConnector(data_source, connection_info) @@ -100,6 +108,40 @@ def _to_ibis_type(type_name: str) -> dt.DataType: return postgres_compiler.type_mapper.from_string(type_name) +class BigQueryConnector(SimpleConnector): + def __init__(self, connection_info: ConnectionInfo): + super().__init__(DataSource.bigquery, connection_info) + self.connection_info = connection_info + + def query(self, sql: str, limit: int) -> pd.DataFrame: + try: + return super().query(sql, limit) + except ValueError as e: + # Try to match the error message from the google cloud bigquery library matching Arrow type error. + # If the error message matches, requries to get the schema from the result and generate a empty pandas dataframe with the mapped schema + # + # It's a workaround for the issue that the ibis library does not support empty result for some special types (e.g. JSON or Interval) + # see details: + # - https://github.com/Canner/wren-engine/issues/909 + # - https://github.com/ibis-project/ibis/issues/10612 + if "Must pass schema" in str(e): + credits_json = loads( + base64.b64decode( + self.connection_info.credentials.get_secret_value() + ).decode("utf-8") + ) + credentials = service_account.Credentials.from_service_account_info( + credits_json + ) + client = bigquery.Client(credentials=credentials) + ibis_schema_mapper = ibis.backends.bigquery.BigQuerySchema() + bq_fields = client.query(sql).result() + ibis_fields = ibis_schema_mapper.to_ibis(bq_fields.schema) + return pd.DataFrame(columns=ibis_fields.names) + else: + raise e + + @cache def _get_pg_type_names(connection: BaseBackend) -> dict[int, str]: cur = connection.raw_sql("SELECT oid, typname FROM pg_type") diff --git a/ibis-server/tests/routers/v2/connector/test_bigquery.py b/ibis-server/tests/routers/v2/connector/test_bigquery.py index b1062a238..d3accb20f 100644 --- a/ibis-server/tests/routers/v2/connector/test_bigquery.py +++ b/ibis-server/tests/routers/v2/connector/test_bigquery.py @@ -198,6 +198,21 @@ async def test_query_values(client, manifest_str): assert response.status_code == 204 +async def test_query_empty_json(client, manifest_str): + response = await client.post( + url=f"{base_url}/query", + json={ + "manifestStr": manifest_str, + "connectionInfo": connection_info, + "sql": "select json_object('a', 1, 'b', 2) limit 0", + }, + ) + assert response.status_code == 200 + result = response.json() + assert len(result["data"]) == 0 + assert result["dtypes"] == {"f0_": "object"} + + async def test_interval(client, manifest_str): response = await client.post( url=f"{base_url}/query", From af5c332ddd4b444124db084ee17f9e32fbeee6a2 Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Wed, 25 Dec 2024 17:35:17 +0800 Subject: [PATCH 2/3] add test for null json column --- .../tests/routers/v2/connector/test_bigquery.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/ibis-server/tests/routers/v2/connector/test_bigquery.py b/ibis-server/tests/routers/v2/connector/test_bigquery.py index d3accb20f..106400cde 100644 --- a/ibis-server/tests/routers/v2/connector/test_bigquery.py +++ b/ibis-server/tests/routers/v2/connector/test_bigquery.py @@ -199,6 +199,7 @@ async def test_query_values(client, manifest_str): async def test_query_empty_json(client, manifest_str): + # Test the empty result with json column response = await client.post( url=f"{base_url}/query", json={ @@ -212,6 +213,22 @@ async def test_query_empty_json(client, manifest_str): assert len(result["data"]) == 0 assert result["dtypes"] == {"f0_": "object"} + # Test only the json column is null + response = await client.post( + url=f"{base_url}/query", + json={ + "manifestStr": manifest_str, + "connectionInfo": connection_info, + "sql": "select cast(null as JSON), 1", + }, + ) + assert response.status_code == 200 + result = response.json() + assert len(result["data"]) == 1 + assert result["data"][0][0] is None + assert result["data"][0][1] == 1 + assert result["dtypes"] == {"f0_": "object", "f1_": "int64"} + async def test_interval(client, manifest_str): response = await client.post( From ff2257f62d745a6f19f26c0dc73473376381a728 Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Wed, 25 Dec 2024 18:13:38 +0800 Subject: [PATCH 3/3] fix the custom datatype --- ibis-server/app/model/connector.py | 4 ++- .../routers/v2/connector/test_bigquery.py | 34 +++++++++++++++++-- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/ibis-server/app/model/connector.py b/ibis-server/app/model/connector.py index 86e944950..0640f5825 100644 --- a/ibis-server/app/model/connector.py +++ b/ibis-server/app/model/connector.py @@ -5,7 +5,6 @@ from typing import Any import ibis -import ibis.backends.bigquery import ibis.expr.datatypes as dt import ibis.expr.schema as sch import ibis.formats @@ -117,6 +116,9 @@ def query(self, sql: str, limit: int) -> pd.DataFrame: try: return super().query(sql, limit) except ValueError as e: + # Import here to avoid override the custom datatypes + import ibis.backends.bigquery + # Try to match the error message from the google cloud bigquery library matching Arrow type error. # If the error message matches, requries to get the schema from the result and generate a empty pandas dataframe with the mapped schema # diff --git a/ibis-server/tests/routers/v2/connector/test_bigquery.py b/ibis-server/tests/routers/v2/connector/test_bigquery.py index 106400cde..81c7ccf48 100644 --- a/ibis-server/tests/routers/v2/connector/test_bigquery.py +++ b/ibis-server/tests/routers/v2/connector/test_bigquery.py @@ -199,7 +199,7 @@ async def test_query_values(client, manifest_str): async def test_query_empty_json(client, manifest_str): - # Test the empty result with json column + """Test the empty result with json column.""" response = await client.post( url=f"{base_url}/query", json={ @@ -213,7 +213,7 @@ async def test_query_empty_json(client, manifest_str): assert len(result["data"]) == 0 assert result["dtypes"] == {"f0_": "object"} - # Test only the json column is null + """Test only the json column is null.""" response = await client.post( url=f"{base_url}/query", json={ @@ -260,6 +260,36 @@ async def test_avg_interval(client, manifest_str): assert result["dtypes"] == {"col": "object"} +async def test_custom_datatypes_no_overrides(client, manifest_str): + # Trigger import the official BigQueryType + response = await client.post( + url=f"{base_url}/query", + json={ + "manifestStr": manifest_str, + "connectionInfo": connection_info, + "sql": "select json_object('a', 1, 'b', 2) limit 0", + }, + ) + assert response.status_code == 200 + result = response.json() + assert len(result["data"]) == 0 + assert result["dtypes"] == {"f0_": "object"} + + # Should use back the custom BigQueryType + response = await client.post( + url=f"{base_url}/query", + json={ + "connectionInfo": connection_info, + "manifestStr": manifest_str, + "sql": "SELECT INTERVAL '1' YEAR + INTERVAL '100' MONTH + INTERVAL '100' DAY + INTERVAL '1' HOUR AS col", + }, + ) + assert response.status_code == 200 + result = response.json() + assert result["data"][0] == ["112 months 100 days 3600000000 microseconds"] + assert result["dtypes"] == {"col": "object"} + + async def test_validate_with_unknown_rule(client, manifest_str): response = await client.post( url=f"{base_url}/validate/unknown_rule",