From 66af9340679d350b50c7633053357251a2379336 Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Tue, 10 Jun 2025 15:25:59 +0800 Subject: [PATCH 01/26] change to arrow type --- ibis-server/app/model/connector.py | 14 ++++++++++++-- ibis-server/app/util.py | 15 +++++++++++++-- ibis-server/tools/query_local_run.py | 11 +++++++++-- 3 files changed, 34 insertions(+), 6 deletions(-) diff --git a/ibis-server/app/model/connector.py b/ibis-server/app/model/connector.py index 1c2bb4a59..8f916c5e2 100644 --- a/ibis-server/app/model/connector.py +++ b/ibis-server/app/model/connector.py @@ -76,7 +76,12 @@ def __init__(self, data_source: DataSource, connection_info: ConnectionInfo): @tracer.start_as_current_span("connector_query", kind=trace.SpanKind.CLIENT) def query(self, sql: str, limit: int) -> pd.DataFrame: - return self.connection.sql(sql).limit(limit).to_pandas() + return ( + self.connection.sql(sql) + .limit(limit) + .to_pyarrow() + .to_pandas(types_mapper=pd.ArrowDtype) + ) @tracer.start_as_current_span("connector_dry_run", kind=trace.SpanKind.CLIENT) def dry_run(self, sql: str) -> None: @@ -118,7 +123,12 @@ def __init__(self, connection_info: ConnectionInfo): def query(self, sql: str, limit: int) -> pd.DataFrame: # Canner enterprise does not support `CREATE TEMPORARY VIEW` for getting schema schema = self._get_schema(sql) - return self.connection.sql(sql, schema=schema).limit(limit).to_pandas() + return ( + self.connection.sql(sql, schema=schema) + .limit(limit) + .to_pyarrow() + .to_pandas(types_mapper=pd.ArrowDtype) + ) @tracer.start_as_current_span("connector_dry_run", kind=trace.SpanKind.CLIENT) def dry_run(self, sql: str) -> Any: diff --git a/ibis-server/app/util.py b/ibis-server/app/util.py index 31298b234..1b6a04da2 100644 --- a/ibis-server/app/util.py +++ b/ibis-server/app/util.py @@ -4,6 +4,7 @@ import orjson import pandas as pd +import pyarrow as pa import wren_core from fastapi import Header from opentelemetry import trace @@ -35,11 +36,19 @@ def base64_to_dict(base64_str: str) -> dict: @tracer.start_as_current_span("to_json", kind=trace.SpanKind.INTERNAL) def to_json(df: pd.DataFrame) -> dict: for column in df.columns: - if is_datetime64_any_dtype(df[column].dtype): + if _is_arrow_datetime(df[column]) and is_datetime64_any_dtype(df[column].dtype): df[column] = _to_datetime_and_format(df[column]) return _to_json_obj(df) +def _is_arrow_datetime(series: pd.Series) -> bool: + dtype = series.dtype + if hasattr(dtype, "pyarrow_dtype"): + pa_type = dtype.pyarrow_dtype + return pa.types.is_timestamp(pa_type) + return False + + def _to_datetime_and_format(series: pd.Series) -> pd.Series: return series.apply( lambda d: d.strftime( @@ -87,7 +96,9 @@ def default(obj): default=default, ) ) - json_obj["dtypes"] = df.dtypes.astype(str).to_dict() + json_obj["dtypes"] = df.dtypes.map( + lambda x: str(x.pyarrow_dtype) if hasattr(x, "pyarrow_dtype") else str(x) + ).to_dict() return json_obj diff --git a/ibis-server/tools/query_local_run.py b/ibis-server/tools/query_local_run.py index 4692e1cff..18d5bd5e5 100644 --- a/ibis-server/tools/query_local_run.py +++ b/ibis-server/tools/query_local_run.py @@ -15,8 +15,10 @@ import json import os from app.model import MySqlConnectionInfo, PostgresConnectionInfo +from app.util import to_json import sqlglot import sys +import pandas as pd from dotenv import load_dotenv from wren_core import SessionContext @@ -89,7 +91,12 @@ else: raise Exception("Unsupported data source:", data_source) -df = connection.sql(dialect_sql).limit(10).to_pandas() +df = connection.sql(dialect_sql).limit(10).to_pyarrow().to_pandas(types_mapper=pd.ArrowDtype) print("### Result ###") print("") -print(df) \ No newline at end of file +print(df) + +json_str = to_json(df) +print("### Result JSON ###") +print("") +print(json_str) \ No newline at end of file From bf2b152bb1771fe534bd1b4fe2bbf650bf585431 Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Wed, 11 Jun 2025 10:28:01 +0800 Subject: [PATCH 02/26] impl v1 --- ibis-server/app/query_cache/__init__.py | 34 ++++++++++++++++++------- ibis-server/app/routers/v2/connector.py | 16 ++++++++++-- ibis-server/app/util.py | 28 ++++++++++++++++---- 3 files changed, 62 insertions(+), 16 deletions(-) diff --git a/ibis-server/app/query_cache/__init__.py b/ibis-server/app/query_cache/__init__.py index 203035b47..3d3390a79 100644 --- a/ibis-server/app/query_cache/__init__.py +++ b/ibis-server/app/query_cache/__init__.py @@ -4,6 +4,8 @@ import ibis import opendal +import pandas as pd +import pyarrow as pa from loguru import logger from opentelemetry import trace @@ -24,10 +26,13 @@ def get(self, data_source: str, sql: str, info) -> Optional[Any]: # Check if cache file exists if op.exists(cache_file_name): try: - logger.info(f"\nReading query cache {cache_file_name}\n") - cache = ibis.read_parquet(full_path) - df = cache.execute() - logger.info("\nquery cache to dataframe\n") + logger.info(f"Reading query cache {cache_file_name}") + df = ( + ibis.read_parquet(full_path) + .to_pyarrow() + .to_pandas(types_mapper=pd.ArrowDtype) + ) + logger.info("query cache to dataframe") return df except Exception as e: logger.debug(f"Failed to read query cache {e}") @@ -36,19 +41,30 @@ def get(self, data_source: str, sql: str, info) -> Optional[Any]: return None @tracer.start_as_current_span("set_cache", kind=trace.SpanKind.INTERNAL) - def set(self, data_source: str, sql: str, result: Any, info) -> None: + def set( + self, + data_source: str, + sql: str, + result: pd.DataFrame, + info, + result_schema: pa.Schema, + ) -> None: cache_key = self._generate_cache_key(data_source, sql, info) cache_file_name = self._set_cache_file_name(cache_key) op = self._get_dal_operator() full_path = self._get_full_path(cache_file_name) - try: # Create cache directory if it doesn't exist with op.open(cache_file_name, mode="wb") as file: - cache = ibis.memtable(result) - logger.info(f"\nWriting query cache to {cache_file_name}\n") + cache = pa.Table.from_pandas( + result, + preserve_index=False, + schema=result_schema, + ) + df = cache.to_pandas(types_mapper=pd.ArrowDtype) + logger.info(f"Writing query cache to {cache_file_name}") if file.writable(): - cache.to_parquet(full_path) + df.to_parquet(full_path) except Exception as e: logger.debug(f"Failed to write query cache: {e}") return diff --git a/ibis-server/app/routers/v2/connector.py b/ibis-server/app/routers/v2/connector.py index 4dbcf98d1..9cffcf5be 100644 --- a/ibis-server/app/routers/v2/connector.py +++ b/ibis-server/app/routers/v2/connector.py @@ -25,6 +25,7 @@ from app.util import ( build_context, get_fallback_message, + pd_to_arrow_schema, pushdown_limit, set_attribute, to_json, @@ -126,6 +127,9 @@ async def query( ).rewrite(sql) connector = Connector(data_source, dto.connection_info) result = connector.query(rewritten_sql, limit=limit) + # the shcmea of the result would be changed after to_json + # so we need to keep the original schema for cache first + result_schema = pd_to_arrow_schema(result) response = ORJSONResponse(to_json(result)) # headers for all non-hit cases @@ -140,7 +144,11 @@ async def query( ) ) query_cache_manager.set( - data_source, dto.sql, result, dto.connection_info + data_source, + dto.sql, + result, + dto.connection_info, + result_schema, ) response.headers["X-Cache-Override"] = "true" @@ -153,7 +161,11 @@ async def query( # no matter the cache override or not, we need to create cache case (True, False, _): query_cache_manager.set( - data_source, dto.sql, result, dto.connection_info + data_source, + dto.sql, + result, + dto.connection_info, + result_schema, ) # case 5~8 Other cases (cache is not enabled) case (False, _, _): diff --git a/ibis-server/app/util.py b/ibis-server/app/util.py index 1b6a04da2..18fcdf085 100644 --- a/ibis-server/app/util.py +++ b/ibis-server/app/util.py @@ -35,10 +35,15 @@ def base64_to_dict(base64_str: str) -> dict: @tracer.start_as_current_span("to_json", kind=trace.SpanKind.INTERNAL) def to_json(df: pd.DataFrame) -> dict: + original_dtype = df.dtypes.map( + lambda x: str(x.pyarrow_dtype) if hasattr(x, "pyarrow_dtype") else str(x) + ).to_dict() for column in df.columns: if _is_arrow_datetime(df[column]) and is_datetime64_any_dtype(df[column].dtype): df[column] = _to_datetime_and_format(df[column]) - return _to_json_obj(df) + json_obj = _to_json_obj(df) + json_obj["dtypes"] = original_dtype + return json_obj def _is_arrow_datetime(series: pd.Series) -> bool: @@ -61,7 +66,10 @@ def _to_datetime_and_format(series: pd.Series) -> pd.Series: def _to_json_obj(df: pd.DataFrame) -> dict: def format_value(x): - if isinstance(x, float): + # Need to handle NaN first, as it can be a float or pd.NA + if pd.isna(x): + return None + elif isinstance(x, float): return f"{x:.9g}" elif isinstance(x, decimal.Decimal): if x == 0: @@ -96,9 +104,6 @@ def default(obj): default=default, ) ) - json_obj["dtypes"] = df.dtypes.map( - lambda x: str(x.pyarrow_dtype) if hasattr(x, "pyarrow_dtype") else str(x) - ).to_dict() return json_obj @@ -168,3 +173,16 @@ def get_fallback_message( def safe_strtobool(val: str) -> bool: return val.lower() in {"1", "true", "yes", "y"} + + +def pd_to_arrow_schema(df: pd.DataFrame) -> pa.Schema: + fields = [] + for column in df.columns: + dtype = df[column].dtype + if hasattr(dtype, "pyarrow_dtype"): + pa_type = dtype.pyarrow_dtype + else: + # Fallback to string type for unsupported dtypes + pa_type = pa.string() + fields.append(pa.field(column, pa_type)) + return pa.schema(fields) From 5b969063f2ad045321a0b8b868aa8ae9f7b8826f Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Fri, 13 Jun 2025 10:20:53 +0800 Subject: [PATCH 03/26] get arrow result and fmt data using duckdb --- ibis-server/app/dependencies.py | 5 + ibis-server/app/model/connector.py | 21 +-- ibis-server/app/query_cache/__init__.py | 25 ++-- ibis-server/app/routers/v2/connector.py | 34 ++--- ibis-server/app/routers/v3/connector.py | 24 ++-- ibis-server/app/util.py | 168 ++++++++++++------------ ibis-server/tools/query_local_run.py | 5 +- 7 files changed, 140 insertions(+), 142 deletions(-) diff --git a/ibis-server/app/dependencies.py b/ibis-server/app/dependencies.py index 4900cae1d..3d9010b18 100644 --- a/ibis-server/app/dependencies.py +++ b/ibis-server/app/dependencies.py @@ -6,6 +6,11 @@ X_WREN_FALLBACK_DISABLE = "x-wren-fallback_disable" X_WREN_VARIABLE_PREFIX = "x-wren-variable-" +X_WREN_TIMEZONE = "x-wren-timezone" +X_CACHE_HIT = "X-Cache-Hit" +X_CACHE_CREATE_AT = "X-Cache-Create-At" +X_CACHE_OVERRIDE = "X-Cache-Override" +X_CACHE_OVERRIDE_AT = "X-Cache-Override-At" # Rebuild model to validate the dto is correct via validation of the pydantic diff --git a/ibis-server/app/model/connector.py b/ibis-server/app/model/connector.py index 8f916c5e2..5ec00b2d1 100644 --- a/ibis-server/app/model/connector.py +++ b/ibis-server/app/model/connector.py @@ -10,6 +10,7 @@ import ibis.expr.schema as sch import ibis.formats import pandas as pd +import pyarrow as pa import sqlglot.expressions as sge from duckdb import HTTPException, IOException from google.cloud import bigquery @@ -59,7 +60,7 @@ def __init__(self, data_source: DataSource, connection_info: ConnectionInfo): else: self._connector = SimpleConnector(data_source, connection_info) - def query(self, sql: str, limit: int) -> pd.DataFrame: + def query(self, sql: str, limit: int) -> pa.Table: return self._connector.query(sql, limit) def dry_run(self, sql: str) -> None: @@ -75,13 +76,8 @@ def __init__(self, data_source: DataSource, connection_info: ConnectionInfo): self.connection = self.data_source.get_connection(connection_info) @tracer.start_as_current_span("connector_query", kind=trace.SpanKind.CLIENT) - def query(self, sql: str, limit: int) -> pd.DataFrame: - return ( - self.connection.sql(sql) - .limit(limit) - .to_pyarrow() - .to_pandas(types_mapper=pd.ArrowDtype) - ) + def query(self, sql: str, limit: int) -> pa.Table: + return self.connection.sql(sql).limit(limit).to_pyarrow() @tracer.start_as_current_span("connector_dry_run", kind=trace.SpanKind.CLIENT) def dry_run(self, sql: str) -> None: @@ -123,12 +119,7 @@ def __init__(self, connection_info: ConnectionInfo): def query(self, sql: str, limit: int) -> pd.DataFrame: # Canner enterprise does not support `CREATE TEMPORARY VIEW` for getting schema schema = self._get_schema(sql) - return ( - self.connection.sql(sql, schema=schema) - .limit(limit) - .to_pyarrow() - .to_pandas(types_mapper=pd.ArrowDtype) - ) + return self.connection.sql(sql, schema=schema).limit(limit).to_pyarrow() @tracer.start_as_current_span("connector_dry_run", kind=trace.SpanKind.CLIENT) def dry_run(self, sql: str) -> Any: @@ -156,7 +147,7 @@ def __init__(self, connection_info: ConnectionInfo): super().__init__(DataSource.bigquery, connection_info) self.connection_info = connection_info - def query(self, sql: str, limit: int) -> pd.DataFrame: + def query(self, sql: str, limit: int) -> pa.Table: try: return super().query(sql, limit) except ValueError as e: diff --git a/ibis-server/app/query_cache/__init__.py b/ibis-server/app/query_cache/__init__.py index 3d3390a79..efd3aa8d2 100644 --- a/ibis-server/app/query_cache/__init__.py +++ b/ibis-server/app/query_cache/__init__.py @@ -4,8 +4,8 @@ import ibis import opendal -import pandas as pd import pyarrow as pa +from duckdb import DuckDBPyConnection, connect from loguru import logger from opentelemetry import trace @@ -27,11 +27,7 @@ def get(self, data_source: str, sql: str, info) -> Optional[Any]: if op.exists(cache_file_name): try: logger.info(f"Reading query cache {cache_file_name}") - df = ( - ibis.read_parquet(full_path) - .to_pyarrow() - .to_pandas(types_mapper=pd.ArrowDtype) - ) + df = ibis.read_parquet(full_path).to_pyarrow() logger.info("query cache to dataframe") return df except Exception as e: @@ -45,9 +41,8 @@ def set( self, data_source: str, sql: str, - result: pd.DataFrame, + result: pa.Table, info, - result_schema: pa.Schema, ) -> None: cache_key = self._generate_cache_key(data_source, sql, info) cache_file_name = self._set_cache_file_name(cache_key) @@ -56,15 +51,10 @@ def set( try: # Create cache directory if it doesn't exist with op.open(cache_file_name, mode="wb") as file: - cache = pa.Table.from_pandas( - result, - preserve_index=False, - schema=result_schema, - ) - df = cache.to_pandas(types_mapper=pd.ArrowDtype) - logger.info(f"Writing query cache to {cache_file_name}") + con = self._get_duckdb_connection() + arrow_table = con.from_arrow(result) if file.writable(): - df.to_parquet(full_path) + arrow_table.write_parquet(full_path) except Exception as e: logger.debug(f"Failed to write query cache: {e}") return @@ -119,3 +109,6 @@ def _get_full_path(self, path: str) -> str: def _get_dal_operator(self) -> Any: # Default implementation using local filesystem return opendal.Operator("fs", root=self.root) + + def _get_duckdb_connection(self) -> DuckDBPyConnection: + return connect() diff --git a/ibis-server/app/routers/v2/connector.py b/ibis-server/app/routers/v2/connector.py index 9cffcf5be..c5a351549 100644 --- a/ibis-server/app/routers/v2/connector.py +++ b/ibis-server/app/routers/v2/connector.py @@ -6,7 +6,14 @@ from opentelemetry import trace from starlette.datastructures import Headers -from app.dependencies import get_wren_headers, verify_query_dto +from app.dependencies import ( + X_CACHE_CREATE_AT, + X_CACHE_HIT, + X_CACHE_OVERRIDE, + X_CACHE_OVERRIDE_AT, + get_wren_headers, + verify_query_dto, +) from app.mdl.java_engine import JavaEngineConnector from app.mdl.rewriter import Rewriter from app.mdl.substitute import ModelSubstitute @@ -25,10 +32,10 @@ from app.util import ( build_context, get_fallback_message, - pd_to_arrow_schema, pushdown_limit, set_attribute, to_json, + update_response_headers, ) router = APIRouter(prefix="/connector", tags=["connector"]) @@ -108,12 +115,13 @@ async def query( ) cache_hit = cached_result is not None + cache_headers = {} # case 1: cache hit read if cache_enable and cache_hit and not override_cache: span.add_event("cache hit") - response = ORJSONResponse(to_json(cached_result)) - response.headers["X-Cache-Hit"] = "true" - response.headers["X-Cache-Create-At"] = str( + result = cached_result + cache_headers[X_CACHE_HIT] = "true" + cache_headers[X_CACHE_CREATE_AT] = str( query_cache_manager.get_cache_file_timestamp( data_source, dto.sql, dto.connection_info ) @@ -127,18 +135,14 @@ async def query( ).rewrite(sql) connector = Connector(data_source, dto.connection_info) result = connector.query(rewritten_sql, limit=limit) - # the shcmea of the result would be changed after to_json - # so we need to keep the original schema for cache first - result_schema = pd_to_arrow_schema(result) - response = ORJSONResponse(to_json(result)) # headers for all non-hit cases - response.headers["X-Cache-Hit"] = "false" + cache_headers[X_CACHE_HIT] = "false" match (cache_enable, cache_hit, override_cache): # case 2 cache hit but override cache case (True, True, True): - response.headers["X-Cache-Create-At"] = str( + cache_headers[X_CACHE_CREATE_AT] = str( query_cache_manager.get_cache_file_timestamp( data_source, dto.sql, dto.connection_info ) @@ -148,11 +152,10 @@ async def query( dto.sql, result, dto.connection_info, - result_schema, ) - response.headers["X-Cache-Override"] = "true" - response.headers["X-Cache-Override-At"] = str( + cache_headers[X_CACHE_OVERRIDE] = "true" + cache_headers[X_CACHE_OVERRIDE_AT] = str( query_cache_manager.get_cache_file_timestamp( data_source, dto.sql, dto.connection_info ) @@ -165,11 +168,12 @@ async def query( dto.sql, result, dto.connection_info, - result_schema, ) # case 5~8 Other cases (cache is not enabled) case (False, _, _): pass + response = ORJSONResponse(to_json(result, headers)) + update_response_headers(response, cache_headers) if is_fallback: get_fallback_message( diff --git a/ibis-server/app/routers/v3/connector.py b/ibis-server/app/routers/v3/connector.py index 078de6a81..78c0a91db 100644 --- a/ibis-server/app/routers/v3/connector.py +++ b/ibis-server/app/routers/v3/connector.py @@ -8,6 +8,10 @@ from app.config import get_config from app.dependencies import ( + X_CACHE_CREATE_AT, + X_CACHE_HIT, + X_CACHE_OVERRIDE, + X_CACHE_OVERRIDE_AT, X_WREN_FALLBACK_DISABLE, exist_wren_variables_header, get_wren_headers, @@ -36,6 +40,7 @@ safe_strtobool, set_attribute, to_json, + update_response_headers, ) router = APIRouter(prefix="/connector", tags=["connector"]) @@ -98,12 +103,14 @@ async def query( data_source, dto.sql, dto.connection_info ) cache_hit = cached_result is not None + + cache_headers = {} # case 1: cache hit read if cache_enable and cache_hit and not override_cache: span.add_event("cache hit") - response = ORJSONResponse(to_json(cached_result)) - response.headers["X-Cache-Hit"] = "true" - response.headers["X-Cache-Create-At"] = str( + result = cached_result + cache_headers[X_CACHE_HIT] = "true" + cache_headers[X_CACHE_CREATE_AT] = str( query_cache_manager.get_cache_file_timestamp( data_source, dto.sql, dto.connection_info ) @@ -119,15 +126,14 @@ async def query( ).rewrite(sql) connector = Connector(data_source, dto.connection_info) result = connector.query(rewritten_sql, limit=limit) - response = ORJSONResponse(to_json(result)) # headers for all non-hit cases - response.headers["X-Cache-Hit"] = "false" + cache_headers[X_CACHE_HIT] = "false" match (cache_enable, cache_hit, override_cache): # case 2: override existing cache case (True, True, True): - response.headers["X-Cache-Create-At"] = str( + cache_headers[X_CACHE_CREATE_AT] = str( query_cache_manager.get_cache_file_timestamp( data_source, dto.sql, dto.connection_info ) @@ -136,8 +142,8 @@ async def query( data_source, dto.sql, result, dto.connection_info ) - response.headers["X-Cache-Override"] = "true" - response.headers["X-Cache-Override-At"] = str( + cache_headers[X_CACHE_OVERRIDE] = "true" + cache_headers[X_CACHE_OVERRIDE_AT] = str( query_cache_manager.get_cache_file_timestamp( data_source, dto.sql, dto.connection_info ) @@ -152,6 +158,8 @@ async def query( case (False, _, _): pass + response = ORJSONResponse(to_json(result, headers)) + update_response_headers(response, cache_headers) return response except Exception as e: is_fallback_disable = bool( diff --git a/ibis-server/app/util.py b/ibis-server/app/util.py index 18fcdf085..345f2f3cc 100644 --- a/ibis-server/app/util.py +++ b/ibis-server/app/util.py @@ -1,12 +1,12 @@ import base64 -import datetime -import decimal +import duckdb import orjson import pandas as pd import pyarrow as pa import wren_core from fastapi import Header +from loguru import logger from opentelemetry import trace from opentelemetry.baggage.propagation import W3CBaggagePropagator from opentelemetry.context import Context @@ -16,9 +16,15 @@ set_span_in_context, ) from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator -from pandas.core.dtypes.common import is_datetime64_any_dtype from starlette.datastructures import Headers +from app.dependencies import ( + X_CACHE_CREATE_AT, + X_CACHE_HIT, + X_CACHE_OVERRIDE, + X_CACHE_OVERRIDE_AT, + X_WREN_TIMEZONE, +) from app.model.data_source import DataSource tracer = trace.get_tracer(__name__) @@ -34,94 +40,54 @@ def base64_to_dict(base64_str: str) -> dict: @tracer.start_as_current_span("to_json", kind=trace.SpanKind.INTERNAL) -def to_json(df: pd.DataFrame) -> dict: - original_dtype = df.dtypes.map( - lambda x: str(x.pyarrow_dtype) if hasattr(x, "pyarrow_dtype") else str(x) - ).to_dict() - for column in df.columns: - if _is_arrow_datetime(df[column]) and is_datetime64_any_dtype(df[column].dtype): - df[column] = _to_datetime_and_format(df[column]) - json_obj = _to_json_obj(df) - json_obj["dtypes"] = original_dtype - return json_obj - - -def _is_arrow_datetime(series: pd.Series) -> bool: - dtype = series.dtype - if hasattr(dtype, "pyarrow_dtype"): - pa_type = dtype.pyarrow_dtype - return pa.types.is_timestamp(pa_type) - return False - - -def _to_datetime_and_format(series: pd.Series) -> pd.Series: - return series.apply( - lambda d: d.strftime( - "%Y-%m-%d %H:%M:%S.%f" + (" %Z" if series.dt.tz is not None else "") - ) - if not pd.isnull(d) - else d +def to_json(df: pa.Table, headers: dict) -> dict: + dtypes = {field.name: str(field.type) for field in df.schema} + if df.num_rows == 0: + columns = {field.name: [] for field in df.schema} + return {"columns": columns, "data": [], "dtypes": dtypes} + + formatted_sql = ( + "SELECT " + ", ".join([_formater(field) for field in df.schema]) + " FROM df" ) + logger.debug(f"formmated_sql: {formatted_sql}") + conn = get_duckdb_conn(headers) + formatted_df = conn.execute(formatted_sql).fetch_df() + + result = formatted_df.to_dict(orient="split") + result["dtypes"] = dtypes + return result + + +def get_duckdb_conn(headers: dict) -> duckdb.DuckDBPyConnection: + """Get a DuckDB connection with the provided headers.""" + conn = duckdb.connect() + if X_WREN_TIMEZONE in headers: + timezone = headers[X_WREN_TIMEZONE] + if timezone.startwith("+") or timezone.startswith("-"): + # If the timezone is an offset, convert it to a named timezone + timezone = get_timezone_from_offset(timezone) + conn.execute(f"SET TimeZone = '{timezone}'") + else: + # Default to UTC if no timezone is provided + conn.execute("SET TimeZone = 'UTC'") + return conn -def _to_json_obj(df: pd.DataFrame) -> dict: - def format_value(x): - # Need to handle NaN first, as it can be a float or pd.NA - if pd.isna(x): - return None - elif isinstance(x, float): - return f"{x:.9g}" - elif isinstance(x, decimal.Decimal): - if x == 0: - return "0" - else: - return x - else: - return x - - data = df.map(format_value).to_dict(orient="split", index=False) - - def default(obj): - if pd.isna(obj): - return None - if isinstance(obj, decimal.Decimal): - return str(obj) - if isinstance(obj, (bytes, bytearray)): - return obj.hex() - if isinstance(obj, pd.tseries.offsets.DateOffset): - return _date_offset_to_str(obj) - if isinstance(obj, datetime.timedelta): - return str(obj) - # Add handling for any remaining LOB objects - if hasattr(obj, "read"): # Check if object is LOB-like - return str(obj) - raise TypeError - - json_obj = orjson.loads( - orjson.dumps( - data, - option=orjson.OPT_SERIALIZE_NUMPY | orjson.OPT_SERIALIZE_UUID, - default=default, - ) - ) - return json_obj +def get_timezone_from_offset(offset: str) -> str: + if offset.startswith("+"): + offset = offset[1:] # Remove the leading '+' sign -def _date_offset_to_str(offset: pd.tseries.offsets.DateOffset) -> str: - parts = [] - units = [ - "months", - "days", - "microseconds", - "nanoseconds", - ] + sql = f""" + SELECT name, utc_offset + FROM pg_timezone_names() + WHERE utc_offset = '{offset}' + """ - for unit in units: - value = getattr(offset, unit, 0) - if value: - parts.append(f"{value} {unit if value > 1 else unit.rstrip('s')}") - - return " ".join(parts) + first = duckdb.execute(sql).fetchone() + if first is None: + raise ValueError(f"Invalid timezone offset: {offset}") + return first[0] # Return the timezone name def build_context(headers: Header) -> Context: @@ -186,3 +152,35 @@ def pd_to_arrow_schema(df: pd.DataFrame) -> pa.Schema: pa_type = pa.string() fields.append(pa.field(column, pa_type)) return pa.schema(fields) + + +def update_response_headers(response, required_headers: dict): + if X_CACHE_HIT in required_headers: + response.headers[X_CACHE_HIT] = required_headers[X_CACHE_HIT] + if X_CACHE_CREATE_AT in required_headers: + response.headers[X_CACHE_CREATE_AT] = required_headers[X_CACHE_CREATE_AT] + if X_CACHE_OVERRIDE in required_headers: + response.headers[X_CACHE_OVERRIDE] = required_headers[X_CACHE_OVERRIDE] + if X_CACHE_OVERRIDE_AT in required_headers: + response.headers[X_CACHE_OVERRIDE_AT] = required_headers[X_CACHE_OVERRIDE_AT] + + +def _formater(field: pa.Field) -> str: + if pa.types.is_decimal(field.type) or pa.types.is_floating(field.type): + return f""" + case when {field.name} = 0 then '0' + when length(CAST({field.name} AS VARCHAR)) > 15 then format('{{:.9g}}', {field.name}) + else RTRIM(RTRIM(format('{{:.8f}}', {field.name}), '0'), '.') + end as {field.name}""" + elif pa.types.is_date(field.type): + return f"strftime({field.name}, '%Y-%m-%d') as {field.name}" + elif pa.types.is_timestamp(field.type): + if field.type.tz is None: + return f"strftime({field.name}, '%Y-%m-%d %H:%M:%S.%f') as {field.name}" + else: + return f"strftime({field.name}, '%Y-%m-%d %H:%M:%S.%f %Z') as {field.name}" + elif pa.types.is_binary(field.type): + return f"to_hex({field.name}) as {field.name}" + elif pa.types.is_interval(field.type): + return f"cast({field.name} as varchar) as {field.name}" + return field.name diff --git a/ibis-server/tools/query_local_run.py b/ibis-server/tools/query_local_run.py index 18d5bd5e5..770540c15 100644 --- a/ibis-server/tools/query_local_run.py +++ b/ibis-server/tools/query_local_run.py @@ -91,12 +91,11 @@ else: raise Exception("Unsupported data source:", data_source) -df = connection.sql(dialect_sql).limit(10).to_pyarrow().to_pandas(types_mapper=pd.ArrowDtype) +df = connection.sql(dialect_sql).limit(10).to_pyarrow() print("### Result ###") print("") print(df) - -json_str = to_json(df) +json_str = to_json(df, dict()) print("### Result JSON ###") print("") print(json_str) \ No newline at end of file From bdcae19179b78dfab67e97e235b8bf690b588983 Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Fri, 13 Jun 2025 10:38:04 +0800 Subject: [PATCH 04/26] fix postgres test --- ibis-server/app/util.py | 1 + .../routers/v2/connector/test_postgres.py | 100 ++-------------- .../v3/connector/postgres/test_query.py | 107 +++++++++++++++--- 3 files changed, 106 insertions(+), 102 deletions(-) diff --git a/ibis-server/app/util.py b/ibis-server/app/util.py index 345f2f3cc..122faad00 100644 --- a/ibis-server/app/util.py +++ b/ibis-server/app/util.py @@ -55,6 +55,7 @@ def to_json(df: pa.Table, headers: dict) -> dict: result = formatted_df.to_dict(orient="split") result["dtypes"] = dtypes + result.pop("index", None) # Remove index field from the DuckDB result return result diff --git a/ibis-server/tests/routers/v2/connector/test_postgres.py b/ibis-server/tests/routers/v2/connector/test_postgres.py index 3e9be35d9..f62604736 100644 --- a/ibis-server/tests/routers/v2/connector/test_postgres.py +++ b/ibis-server/tests/routers/v2/connector/test_postgres.py @@ -185,7 +185,7 @@ async def test_query(client, manifest_str, postgres: PostgresContainer): 370, "O", "172799.49", - "1996-01-02 00:00:00.000000", + "1996-01-02", "1_370", "2024-01-01 23:59:59.000000", "2024-01-01 23:59:59.000000 UTC", @@ -195,27 +195,28 @@ async def test_query(client, manifest_str, postgres: PostgresContainer): assert result["dtypes"] == { "orderkey": "int32", "custkey": "int32", - "orderstatus": "object", - "totalprice": "object", - "orderdate": "object", - "order_cust_key": "object", - "timestamp": "object", - "timestamptz": "object", - "test_null_time": "datetime64[ns]", - "bytea_column": "object", + "orderstatus": "string", + "totalprice": "string", + "orderdate": "date32[day]", + "order_cust_key": "string", + "timestamp": "timestamp[us]", + "timestamptz": "timestamp[us, tz=UTC]", + "test_null_time": "timestamp[us]", + "bytea_column": "binary", } async def test_query_with_cache(client, manifest_str, postgres: PostgresContainer): connection_info = _to_connection_info(postgres) + sql = 'SELECT * FROM "Orders" LIMIT 10' # First request - should miss cache response1 = await client.post( url=f"{base_url}/query?cacheEnable=true", # Enable cache json={ "connectionInfo": connection_info, "manifestStr": manifest_str, - "sql": 'SELECT * FROM "Orders" LIMIT 10', + "sql": sql, }, ) @@ -229,7 +230,7 @@ async def test_query_with_cache(client, manifest_str, postgres: PostgresContaine json={ "connectionInfo": connection_info, "manifestStr": manifest_str, - "sql": 'SELECT * FROM "Orders" LIMIT 10', + "sql": sql, }, ) assert response2.status_code == 200 @@ -384,83 +385,6 @@ async def test_query_with_dot_all(client, manifest_str, postgres: PostgresContai assert result["dtypes"] is not None -async def test_format_floating(client, manifest_str, postgres): - connection_info = _to_connection_info(postgres) - response = await client.post( - url=f"{base_url}/query", - json={ - "connectionInfo": connection_info, - "manifestStr": manifest_str, - "sql": """ -SELECT - 0.0123e-5 AS case_scientific_original, - 1.23e+4 AS case_scientific_positive, - -4.56e-3 AS case_scientific_negative, - 7.89e0 AS case_scientific_zero_exponent, - 0e0 AS case_scientific_zero, - - 123.456 AS case_decimal_positive, - -123.456 AS case_decimal_negative, - 0.0000123 AS case_decimal_small, - 123.0000 AS case_decimal_trailing_zeros, - 0.0 AS case_decimal_zero, - - 0 AS case_integer_zero, - 0e-9 AS case_integer_zero_scientific, - -1 AS case_integer_negative, - 9999999999 AS case_integer_large, - - 1.7976931348623157E+308 AS case_float_max, - 2.2250738585072014E-308 AS case_float_min, - -1.7976931348623157E+308 AS case_float_min_negative, - - 1.23e4 + 4.56 AS case_mixed_addition, - -1.23e-4 - 123.45 AS case_mixed_subtraction, - 0.0123e-5 * 1000 AS case_mixed_multiplication, - 123.45 / 1.23e2 AS case_mixed_division, - - CAST('NaN' AS FLOAT) AS case_special_nan, - CAST('Infinity' AS FLOAT) AS case_special_infinity, - CAST('-Infinity' AS FLOAT) AS case_special_negative_infinity, - NULL AS case_special_null, - - CAST(123.456 AS FLOAT) AS case_cast_float, - CAST(1.23e4 AS DECIMAL(10,5)) AS case_cast_decimal - """, - }, - ) - assert response.status_code == 200 - result = response.json() - - assert result["data"][0][0] == "1.23E-7" - assert result["data"][0][1] == "1.23E+4" - assert result["data"][0][2] == "-0.00456" - assert result["data"][0][3] == "7.89" - assert result["data"][0][4] == "0" - assert result["data"][0][5] == "123.456" - assert result["data"][0][6] == "-123.456" - assert result["data"][0][7] == "0.0000123" - assert result["data"][0][8] == "123" - assert result["data"][0][9] == "0" - assert result["data"][0][10] == 0 - assert result["data"][0][11] == "0" - assert result["data"][0][12] == -1 - assert result["data"][0][13] == 9999999999 - assert result["data"][0][14] == "1.7976931348623157E+308" - assert result["data"][0][15] == "2.2250738585072014E-308" - assert result["data"][0][16] == "-1.7976931348623157E+308" - assert result["data"][0][17] == "12304.56" - assert result["data"][0][18] == "-123.450123" - assert result["data"][0][19] == "0.000123" - assert result["data"][0][20] == "1.0036585365853659" - assert result["data"][0][21] == "nan" - assert result["data"][0][22] == "inf" - assert result["data"][0][23] == "-inf" - assert result["data"][0][24] is None - assert result["data"][0][25] == "123.456001" - assert result["data"][0][26] == "12300.00000" - - async def test_limit_pushdown(client, manifest_str, postgres: PostgresContainer): connection_info = _to_connection_info(postgres) response = await client.post( diff --git a/ibis-server/tests/routers/v3/connector/postgres/test_query.py b/ibis-server/tests/routers/v3/connector/postgres/test_query.py index e82d38881..30a554742 100644 --- a/ibis-server/tests/routers/v3/connector/postgres/test_query.py +++ b/ibis-server/tests/routers/v3/connector/postgres/test_query.py @@ -3,7 +3,7 @@ import orjson import pytest -from app.dependencies import X_WREN_VARIABLE_PREFIX +from app.dependencies import X_WREN_FALLBACK_DISABLE, X_WREN_VARIABLE_PREFIX from tests.routers.v3.connector.postgres.conftest import base_url manifest = { @@ -142,7 +142,7 @@ async def test_query(client, manifest_str, connection_info): 370, "O", "172799.49", - "1996-01-02 00:00:00.000000", + "1996-01-02", "1_370", "2024-01-01 23:59:59.000000", "2024-01-01 23:59:59.000000 UTC", @@ -152,14 +152,14 @@ async def test_query(client, manifest_str, connection_info): assert result["dtypes"] == { "o_orderkey": "int32", "o_custkey": "int32", - "o_orderstatus": "object", - "o_totalprice_double": "float64", - "o_orderdate": "object", - "order_cust_key": "object", - "timestamp": "object", - "timestamptz": "object", - "dst_utc_minus_5": "object", - "dst_utc_minus_4": "object", + "o_orderstatus": "string", + "o_totalprice_double": "double", + "o_orderdate": "date32[day]", + "order_cust_key": "string", + "timestamp": "timestamp[us]", + "timestamptz": "timestamp[us, tz=UTC]", + "dst_utc_minus_5": "timestamp[us, tz=UTC]", + "dst_utc_minus_4": "timestamp[us, tz=UTC]", } @@ -433,7 +433,7 @@ async def test_query_to_many_calculation(client, manifest_str, connection_info): result = response.json() assert len(result["columns"]) == 1 assert len(result["data"]) == 1 - assert result["dtypes"] == {"sum_totalprice": "float64"} + assert result["dtypes"] == {"sum_totalprice": "double"} response = await client.post( url=f"{base_url}/query", @@ -447,7 +447,7 @@ async def test_query_to_many_calculation(client, manifest_str, connection_info): result = response.json() assert len(result["columns"]) == 1 assert len(result["data"]) == 1 - assert result["dtypes"] == {"sum_totalprice": "float64"} + assert result["dtypes"] == {"sum_totalprice": "double"} response = await client.post( url=f"{base_url}/query", @@ -461,7 +461,7 @@ async def test_query_to_many_calculation(client, manifest_str, connection_info): result = response.json() assert len(result["columns"]) == 2 assert len(result["data"]) == 1 - assert result["dtypes"] == {"c_name": "object", "sum_totalprice": "float64"} + assert result["dtypes"] == {"c_name": "string", "sum_totalprice": "double"} response = await client.post( url=f"{base_url}/query", @@ -475,7 +475,7 @@ async def test_query_to_many_calculation(client, manifest_str, connection_info): result = response.json() assert len(result["columns"]) == 2 assert len(result["data"]) == 1 - assert result["dtypes"] == {"c_custkey": "int32", "sum_totalprice": "float64"} + assert result["dtypes"] == {"c_custkey": "int32", "sum_totalprice": "double"} @pytest.mark.skip(reason="Datafusion does not implement filter yet") @@ -586,3 +586,82 @@ async def test_clac_query(client, manifest_str, connection_info): result = response.json() assert len(result["data"]) == 1 assert len(result["data"][0]) == 2 + + +async def test_format_floating(client, manifest_str, connection_info): + response = await client.post( + url=f"{base_url}/query", + json={ + "connectionInfo": connection_info, + "manifestStr": manifest_str, + "sql": """ +SELECT + 0.0123e-5 AS case_scientific_original, + 1.23e+4 AS case_scientific_positive, + -4.56e-3 AS case_scientific_negative, + 7.89e0 AS case_scientific_zero_exponent, + 0e0 AS case_scientific_zero, + + 123.456 AS case_decimal_positive, + -123.456 AS case_decimal_negative, + 0.0000123 AS case_decimal_small, + 123.0000 AS case_decimal_trailing_zeros, + 0.0 AS case_decimal_zero, + + 0 AS case_integer_zero, + 0e-9 AS case_integer_zero_scientific, + -1 AS case_integer_negative, + 9999999999 AS case_integer_large, + + 1.23e4 + 4.56 AS case_mixed_addition, + -1.23e-4 - 123.45 AS case_mixed_subtraction, + 0.0123e-5 * 1000 AS case_mixed_multiplication, + 123.45 / 1.23e2 AS case_mixed_division, + + CAST('NaN' AS FLOAT) AS case_special_nan, + CAST('Infinity' AS FLOAT) AS case_special_infinity, + CAST('-Infinity' AS FLOAT) AS case_special_negative_infinity, + NULL AS case_special_null, + + CAST(123.456 AS FLOAT) AS case_cast_float, + CAST(1.23e4 AS DECIMAL(10,5)) AS case_cast_decimal, + CAST(1.234e+14 AS DECIMAL(20,0)) AS show_float, + CAST(1.234e+15 AS DECIMAL(20,0)) AS show_exponent, + CAST(1.123456789 AS DECIMAL(20,9)) AS round_to_9_decimal_places, + CAST(0.123456789123456789 AS DECIMAL(20,18)) AS round_to_18_decimal_places + """, + }, + headers={ + X_WREN_FALLBACK_DISABLE: "true", # Disable fallback to DuckDB + }, + ) + assert response.status_code == 200 + result = response.json() + assert result["data"][0][0] == "0.00000012" + assert result["data"][0][1] == "12300" + assert result["data"][0][2] == "-0.00456" + assert result["data"][0][3] == "7.89" + assert result["data"][0][4] == "0" + assert result["data"][0][5] == "123.456" + assert result["data"][0][6] == "-123.456" + assert result["data"][0][7] == "0.0000123" + assert result["data"][0][8] == "123" + assert result["data"][0][9] == "0" + assert result["data"][0][10] == 0 + assert result["data"][0][11] == "0" + assert result["data"][0][12] == -1 + assert result["data"][0][13] == 9999999999 + assert result["data"][0][14] == "12304.56" + assert result["data"][0][15] == "-123.450123" + assert result["data"][0][16] == "0.000123" + assert result["data"][0][17] == "1.00365854" + assert result["data"][0][18] is None + assert result["data"][0][19] == "inf" + assert result["data"][0][20] == "-inf" + assert result["data"][0][21] is None + assert result["data"][0][22] == "123.45600128" + assert result["data"][0][23] == "12300" + assert result["data"][0][24] == "123400000000000" + assert result["data"][0][25] == "1.234e+15" + assert result["data"][0][26] == "1.12345679" + assert result["data"][0][27] == "0.123456789" From 56027ee8a15e248d29324f262c8449b585cc4390 Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Fri, 13 Jun 2025 10:39:56 +0800 Subject: [PATCH 05/26] fix mysql --- ibis-server/tests/routers/v2/connector/test_mysql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ibis-server/tests/routers/v2/connector/test_mysql.py b/ibis-server/tests/routers/v2/connector/test_mysql.py index 7f1fb7753..63dd3f37e 100644 --- a/ibis-server/tests/routers/v2/connector/test_mysql.py +++ b/ibis-server/tests/routers/v2/connector/test_mysql.py @@ -160,7 +160,7 @@ async def test_query(client, manifest_str, mysql: MySqlContainer): 370, "O", "172799.49", - "1996-01-02 00:00:00.000000", + "1996-01-02", "1_370", "2024-01-01 23:59:59.000000", "2024-01-01 23:59:59.000000", From 50cda1983db392caf33640d8be675b41ae1f878f Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Mon, 16 Jun 2025 13:26:07 +0800 Subject: [PATCH 06/26] fix bigquery test --- .../app/custom_ibis/backends/sql/datatypes.py | 5 ++- .../routers/v2/connector/test_bigquery.py | 44 ++++++++++--------- .../v3/connector/bigquery/test_functions.py | 4 +- .../v3/connector/bigquery/test_query.py | 32 ++++++++------ 4 files changed, 48 insertions(+), 37 deletions(-) diff --git a/ibis-server/app/custom_ibis/backends/sql/datatypes.py b/ibis-server/app/custom_ibis/backends/sql/datatypes.py index a04514674..692b2839a 100644 --- a/ibis-server/app/custom_ibis/backends/sql/datatypes.py +++ b/ibis-server/app/custom_ibis/backends/sql/datatypes.py @@ -2,7 +2,10 @@ class BigQueryType(datatypes.BigQueryType): - default_interval_precision = "s" + # It's a workaround for the issue of ibs BQ connector not supporting interval precision. + # Set `h` to avoid ibis try to cast arrow interval to duration, which leads to an casting error of pyarrow. + # See: https://github.com/ibis-project/ibis/blob/main/ibis/formats/pyarrow.py#L182 + default_interval_precision = "h" datatypes.BigQueryType = BigQueryType diff --git a/ibis-server/tests/routers/v2/connector/test_bigquery.py b/ibis-server/tests/routers/v2/connector/test_bigquery.py index 0cbcb9d92..7df5960c8 100644 --- a/ibis-server/tests/routers/v2/connector/test_bigquery.py +++ b/ibis-server/tests/routers/v2/connector/test_bigquery.py @@ -1,5 +1,6 @@ import base64 import os +import time import orjson import pytest @@ -91,7 +92,7 @@ async def test_query(client, manifest_str): 370, "O", "172799.49", - "1996-01-02 00:00:00.000000", + "1996-01-02", "1_370", "2024-01-01 23:59:59.000000", "2024-01-01 23:59:59.000000 UTC", @@ -101,24 +102,27 @@ async def test_query(client, manifest_str): assert result["dtypes"] == { "orderkey": "int64", "custkey": "int64", - "orderstatus": "object", - "totalprice": "float64", - "orderdate": "object", - "order_cust_key": "object", - "timestamp": "object", - "timestamptz": "object", - "test_null_time": "datetime64[ns]", - "bytea_column": "object", + "orderstatus": "string", + "totalprice": "double", + "orderdate": "date32[day]", + "order_cust_key": "string", + "timestamp": "timestamp[us]", + "timestamptz": "timestamp[us, tz=UTC]", + "test_null_time": "timestamp[us]", + "bytea_column": "binary", } async def test_query_with_cache(client, manifest_str): + # add random timestamp to the query to ensure cache is not hit + now = int(time.time()) + sql = f'SELECT *, {now} FROM "Orders" ORDER BY orderkey LIMIT 1' response1 = await client.post( url=f"{base_url}/query?cacheEnable=true", json={ "connectionInfo": connection_info, "manifestStr": manifest_str, - "sql": 'SELECT * FROM "Orders" ORDER BY orderkey LIMIT 1', + "sql": sql, }, ) @@ -131,7 +135,7 @@ async def test_query_with_cache(client, manifest_str): json={ "connectionInfo": connection_info, "manifestStr": manifest_str, - "sql": 'SELECT * FROM "Orders" ORDER BY orderkey LIMIT 1', + "sql": sql, }, ) @@ -283,7 +287,7 @@ async def test_query_empty_json(client, manifest_str): assert response.status_code == 200 result = response.json() assert len(result["data"]) == 0 - assert result["dtypes"] == {"f0_": "object"} + assert result["dtypes"] == {"f0_": "string"} """Test only the json column is null.""" response = await client.post( @@ -299,7 +303,7 @@ async def test_query_empty_json(client, manifest_str): assert len(result["data"]) == 1 assert result["data"][0][0] is None assert result["data"][0][1] == 1 - assert result["dtypes"] == {"f0_": "object", "f1_": "int64"} + assert result["dtypes"] == {"f0_": "string", "f1_": "int64"} async def test_interval(client, manifest_str): @@ -313,8 +317,8 @@ async def test_interval(client, manifest_str): ) assert response.status_code == 200 result = response.json() - assert result["data"][0] == ["112 months 100 days 3600000000 microseconds"] - assert result["dtypes"] == {"col": "object"} + assert result["data"][0] == ["9 years 4 months 100 days 01:00:00"] + assert result["dtypes"] == {"col": "month_day_nano_interval"} async def test_avg_interval(client, manifest_str): @@ -328,8 +332,8 @@ async def test_avg_interval(client, manifest_str): ) assert response.status_code == 200 result = response.json() - assert result["data"][0] == ["10484 days 32054400000 microseconds"] - assert result["dtypes"] == {"col": "object"} + assert result["data"][0] == ["10484 days 08:54:14.4"] + assert result["dtypes"] == {"col": "month_day_nano_interval"} async def test_custom_datatypes_no_overrides(client, manifest_str): @@ -345,7 +349,7 @@ async def test_custom_datatypes_no_overrides(client, manifest_str): assert response.status_code == 200 result = response.json() assert len(result["data"]) == 0 - assert result["dtypes"] == {"f0_": "object"} + assert result["dtypes"] == {"f0_": "string"} # Should use back the custom BigQueryType response = await client.post( @@ -358,8 +362,8 @@ async def test_custom_datatypes_no_overrides(client, manifest_str): ) assert response.status_code == 200 result = response.json() - assert result["data"][0] == ["112 months 100 days 3600000000 microseconds"] - assert result["dtypes"] == {"col": "object"} + assert result["data"][0] == ["9 years 4 months 100 days 01:00:00"] + assert result["dtypes"] == {"col": "month_day_nano_interval"} async def test_validate_with_unknown_rule(client, manifest_str): diff --git a/ibis-server/tests/routers/v3/connector/bigquery/test_functions.py b/ibis-server/tests/routers/v3/connector/bigquery/test_functions.py index 90103e680..62ffe51f4 100644 --- a/ibis-server/tests/routers/v3/connector/bigquery/test_functions.py +++ b/ibis-server/tests/routers/v3/connector/bigquery/test_functions.py @@ -97,8 +97,8 @@ async def test_scalar_function(client, manifest_str: str, connection_info): result = response.json() assert result == { "columns": ["col"], - "data": [["2024-01-02 00:00:00.000000"]], - "dtypes": {"col": "object"}, + "data": [["2024-01-02"]], + "dtypes": {"col": "date32[day]"}, } diff --git a/ibis-server/tests/routers/v3/connector/bigquery/test_query.py b/ibis-server/tests/routers/v3/connector/bigquery/test_query.py index 31e0c3bb0..725198202 100644 --- a/ibis-server/tests/routers/v3/connector/bigquery/test_query.py +++ b/ibis-server/tests/routers/v3/connector/bigquery/test_query.py @@ -1,4 +1,5 @@ import base64 +import time import orjson import pytest @@ -82,7 +83,7 @@ async def test_query(client, manifest_str, connection_info): 1202, "F", "356711.63", - "1992-06-06 00:00:00.000000", + "1992-06-06", "36485_1202", "2024-01-01 23:59:59.000000", "2024-01-01 23:59:59.000000 UTC", @@ -92,24 +93,27 @@ async def test_query(client, manifest_str, connection_info): assert result["dtypes"] == { "o_orderkey": "int64", "o_custkey": "int64", - "o_orderstatus": "object", - "o_totalprice": "float64", - "o_orderdate": "object", - "order_cust_key": "object", - "timestamp": "object", - "timestamptz": "object", - "dst_utc_minus_5": "object", - "dst_utc_minus_4": "object", + "o_orderstatus": "string", + "o_totalprice": "double", + "o_orderdate": "date32[day]", + "order_cust_key": "string", + "timestamp": "timestamp[us]", + "timestamptz": "timestamp[us, tz=UTC]", + "dst_utc_minus_5": "timestamp[us, tz=UTC]", + "dst_utc_minus_4": "timestamp[us, tz=UTC]", } async def test_query_with_cache(client, manifest_str, connection_info): + # add random timestamp to the query to ensure cache is not hit + now = int(time.time()) + sql = f"SELECT *, {now} FROM orders ORDER BY o_orderkey LIMIT 1" response1 = await client.post( url=f"{base_url}/query?cacheEnable=true", json={ "connectionInfo": connection_info, "manifestStr": manifest_str, - "sql": "SELECT * FROM wren.public.orders LIMIT 1", + "sql": sql, }, ) assert response1.status_code == 200 @@ -121,7 +125,7 @@ async def test_query_with_cache(client, manifest_str, connection_info): json={ "connectionInfo": connection_info, "manifestStr": manifest_str, - "sql": "SELECT * FROM wren.public.orders LIMIT 1", + "sql": sql, }, ) @@ -294,9 +298,9 @@ async def test_timestamp_func(client, manifest_str, connection_info): "1970-01-12 13:46:40.000000 UTC", ] assert result["dtypes"] == { - "millis": "object", - "micros": "object", - "seconds": "object", + "millis": "timestamp[us, tz=UTC]", + "micros": "timestamp[us, tz=UTC]", + "seconds": "timestamp[us, tz=UTC]", } response = await client.post( From be409ef3fd820ffd9c050d862cf21f62dc4b49e8 Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Mon, 16 Jun 2025 13:31:59 +0800 Subject: [PATCH 07/26] fix clickhouse --- .../routers/v2/connector/test_clickhouse.py | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/ibis-server/tests/routers/v2/connector/test_clickhouse.py b/ibis-server/tests/routers/v2/connector/test_clickhouse.py index 2abae3d45..aaac2b227 100644 --- a/ibis-server/tests/routers/v2/connector/test_clickhouse.py +++ b/ibis-server/tests/routers/v2/connector/test_clickhouse.py @@ -180,7 +180,7 @@ async def test_query(client, manifest_str, clickhouse: ClickHouseContainer): 370, "O", "172799.49", - "1996-01-02 00:00:00.000000", + "1996-01-02", "1_370", "2024-01-01 23:59:59.000000", "2024-01-01 23:59:59.000000 UTC", @@ -190,14 +190,14 @@ async def test_query(client, manifest_str, clickhouse: ClickHouseContainer): assert result["dtypes"] == { "orderkey": "int32", "custkey": "int32", - "orderstatus": "object", - "totalprice": "object", - "orderdate": "object", - "order_cust_key": "object", - "timestamp": "object", - "timestamptz": "object", - "test_null_time": "object", - "bytea_column": "object", + "orderstatus": "string", + "totalprice": "decimal128(15, 2)", + "orderdate": "date32[day]", + "order_cust_key": "string", + "timestamp": "timestamp[ns]", + "timestamptz": "timestamp[ns, tz=UTC]", + "test_null_time": "string", + "bytea_column": "string", } @@ -266,7 +266,7 @@ async def test_query_join(client, manifest_str, clickhouse: ClickHouseContainer) assert len(result["data"]) == 1 assert result["data"][0] == ["Customer#000000370"] assert result["dtypes"] == { - "customer_name": "object", + "customer_name": "string", } @@ -288,7 +288,7 @@ async def test_query_to_one_relationship( assert len(result["data"]) == 1 assert result["data"][0] == ["Customer#000000370"] assert result["dtypes"] == { - "customer_name": "object", + "customer_name": "string", } @@ -310,7 +310,7 @@ async def test_query_to_many_relationship( assert len(result["data"]) == 1 assert result["data"][0] == ["2860895.79"] assert result["dtypes"] == { - "totalprice": "object", + "totalprice": "decimal128(38, 2)", } From 5b52bdca1b1e0c98d168cbab247b256b14e39104 Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Mon, 16 Jun 2025 14:43:24 +0800 Subject: [PATCH 08/26] add quote for identifier --- ibis-server/app/util.py | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/ibis-server/app/util.py b/ibis-server/app/util.py index 122faad00..1417ad80f 100644 --- a/ibis-server/app/util.py +++ b/ibis-server/app/util.py @@ -167,21 +167,29 @@ def update_response_headers(response, required_headers: dict): def _formater(field: pa.Field) -> str: + column_name = _quote_identifier(field.name) if pa.types.is_decimal(field.type) or pa.types.is_floating(field.type): return f""" - case when {field.name} = 0 then '0' - when length(CAST({field.name} AS VARCHAR)) > 15 then format('{{:.9g}}', {field.name}) - else RTRIM(RTRIM(format('{{:.8f}}', {field.name}), '0'), '.') - end as {field.name}""" + case when {column_name} = 0 then '0' + when length(CAST({column_name} AS VARCHAR)) > 15 then format('{{:.9g}}', {column_name}) + else RTRIM(RTRIM(format('{{:.8f}}', {column_name}), '0'), '.') + end as {column_name}""" elif pa.types.is_date(field.type): - return f"strftime({field.name}, '%Y-%m-%d') as {field.name}" + return f"strftime({column_name}, '%Y-%m-%d') as {column_name}" elif pa.types.is_timestamp(field.type): if field.type.tz is None: - return f"strftime({field.name}, '%Y-%m-%d %H:%M:%S.%f') as {field.name}" + return f"strftime({column_name}, '%Y-%m-%d %H:%M:%S.%f') as {column_name}" else: - return f"strftime({field.name}, '%Y-%m-%d %H:%M:%S.%f %Z') as {field.name}" + return ( + f"strftime({column_name}, '%Y-%m-%d %H:%M:%S.%f %Z') as {column_name}" + ) elif pa.types.is_binary(field.type): - return f"to_hex({field.name}) as {field.name}" + return f"to_hex({column_name}) as {column_name}" elif pa.types.is_interval(field.type): - return f"cast({field.name} as varchar) as {field.name}" - return field.name + return f"cast({column_name} as varchar) as {column_name}" + return column_name + + +def _quote_identifier(identifier: str) -> str: + identifier = identifier.replace('"', '""') # Escape double quotes + return f'"{identifier}"' if identifier else identifier From 2c187a210dc58014f79522d4b2bfba84d28e1e33 Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Mon, 16 Jun 2025 14:43:44 +0800 Subject: [PATCH 09/26] fix mysql --- .../tests/routers/v2/connector/test_mysql.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/ibis-server/tests/routers/v2/connector/test_mysql.py b/ibis-server/tests/routers/v2/connector/test_mysql.py index 63dd3f37e..3dfe33603 100644 --- a/ibis-server/tests/routers/v2/connector/test_mysql.py +++ b/ibis-server/tests/routers/v2/connector/test_mysql.py @@ -170,14 +170,14 @@ async def test_query(client, manifest_str, mysql: MySqlContainer): assert result["dtypes"] == { "orderkey": "int32", "custkey": "int32", - "orderstatus": "object", - "totalprice": "object", - "orderdate": "object", - "order_cust_key": "object", - "timestamp": "object", - "timestamptz": "object", - "test_null_time": "datetime64[ns]", - "bytea_column": "object", + "orderstatus": "string", + "totalprice": "string", + "orderdate": "date32[day]", + "order_cust_key": "string", + "timestamp": "timestamp[us]", + "timestamptz": "timestamp[us]", + "test_null_time": "timestamp[us]", + "bytea_column": "binary", } From ac9e9372d80e9d710027ceaebc91599a447a394c Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Mon, 16 Jun 2025 15:03:39 +0800 Subject: [PATCH 10/26] fix mssql --- .../tests/routers/v2/connector/test_mssql.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/ibis-server/tests/routers/v2/connector/test_mssql.py b/ibis-server/tests/routers/v2/connector/test_mssql.py index 414bdd842..80ac46e5d 100644 --- a/ibis-server/tests/routers/v2/connector/test_mssql.py +++ b/ibis-server/tests/routers/v2/connector/test_mssql.py @@ -126,7 +126,7 @@ async def test_query(client, manifest_str, mssql: SqlServerContainer): 370, "O", "172799.49", - "1996-01-02 00:00:00.000000", + "1996-01-02", "1_370", "2024-01-01 23:59:59.000000", "2024-01-01 23:59:59.000000 UTC", @@ -136,14 +136,14 @@ async def test_query(client, manifest_str, mssql: SqlServerContainer): assert result["dtypes"] == { "orderkey": "int32", "custkey": "int32", - "orderstatus": "object", - "totalprice": "object", - "orderdate": "object", - "order_cust_key": "object", - "timestamp": "object", - "timestamptz": "object", - "test_null_time": "datetime64[ns]", - "bytea_column": "object", + "orderstatus": "string", + "totalprice": "string", + "orderdate": "date32[day]", + "order_cust_key": "string", + "timestamp": "timestamp[ns]", + "timestamptz": "timestamp[ns, tz=UTC]", + "test_null_time": "timestamp[ns]", + "bytea_column": "binary", } From d77fd70f666bf6f3c7c57518c43fc4573c754b91 Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Mon, 16 Jun 2025 15:11:51 +0800 Subject: [PATCH 11/26] fix trino --- .../tests/routers/v2/connector/test_trino.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/ibis-server/tests/routers/v2/connector/test_trino.py b/ibis-server/tests/routers/v2/connector/test_trino.py index 2ff1a1f5c..aaa24b3b2 100644 --- a/ibis-server/tests/routers/v2/connector/test_trino.py +++ b/ibis-server/tests/routers/v2/connector/test_trino.py @@ -112,7 +112,7 @@ async def test_query(client, manifest_str, trino: TrinoContainer): 370, "O", "172799.49", - "1996-01-02 00:00:00.000000", + "1996-01-02", "1_370", "2024-01-01 23:59:59.000000", "2024-01-01 23:59:59.000000", @@ -122,14 +122,14 @@ async def test_query(client, manifest_str, trino: TrinoContainer): assert result["dtypes"] == { "orderkey": "int64", "custkey": "int64", - "orderstatus": "object", - "totalprice": "float64", - "orderdate": "object", - "order_cust_key": "object", - "timestamp": "object", - "timestamptz": "object", - "test_null_time": "datetime64[ns]", - "bytea_column": "object", + "orderstatus": "string", + "totalprice": "double", + "orderdate": "date32[day]", + "order_cust_key": "string", + "timestamp": "timestamp[ms]", + "timestamptz": "timestamp[ms]", + "test_null_time": "timestamp[ms]", + "bytea_column": "binary", } From 56cf02b3d3ce420fb3f8111b9f8823d630a31933 Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Mon, 16 Jun 2025 15:12:26 +0800 Subject: [PATCH 12/26] fix canner --- .../tests/routers/v2/connector/test_canner.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/ibis-server/tests/routers/v2/connector/test_canner.py b/ibis-server/tests/routers/v2/connector/test_canner.py index f43d88320..be2f6bea3 100644 --- a/ibis-server/tests/routers/v2/connector/test_canner.py +++ b/ibis-server/tests/routers/v2/connector/test_canner.py @@ -95,7 +95,7 @@ async def test_query(client, manifest_str): 370, "O", "172799.49", - "1996-01-02 00:00:00.000000", + "1996-01-02", "1_370", "2024-01-01 23:59:59.000000", "2024-01-01 23:59:59.000000 UTC", @@ -104,13 +104,13 @@ async def test_query(client, manifest_str): assert result["dtypes"] == { "orderkey": "int64", "custkey": "int64", - "orderstatus": "object", - "totalprice": "float64", - "orderdate": "object", - "order_cust_key": "object", - "timestamp": "object", - "timestamptz": "object", - "test_null_time": "datetime64[ns]", + "orderstatus": "string", + "totalprice": "double", + "orderdate": "date32[day]", + "order_cust_key": "string", + "timestamp": "timestamp[ms]", + "timestamptz": "timestamp[ms]", + "test_null_time": "timestamp[ms]", } From 48ee41852e49ecdb1740de9a55232833e8e8ac16 Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Mon, 16 Jun 2025 16:29:10 +0800 Subject: [PATCH 13/26] fix file connector --- ibis-server/app/model/metadata/object_storage.py | 2 +- .../tests/routers/v2/connector/test_gcs_file.py | 12 ++++++------ .../tests/routers/v2/connector/test_local_file.py | 12 ++++++------ .../tests/routers/v2/connector/test_minio_file.py | 12 ++++++------ .../tests/routers/v2/connector/test_s3_file.py | 12 ++++++------ .../routers/v3/connector/local_file/test_query.py | 10 +++++----- 6 files changed, 30 insertions(+), 30 deletions(-) diff --git a/ibis-server/app/model/metadata/object_storage.py b/ibis-server/app/model/metadata/object_storage.py index d28baa65d..8ed8f5759 100644 --- a/ibis-server/app/model/metadata/object_storage.py +++ b/ibis-server/app/model/metadata/object_storage.py @@ -253,7 +253,7 @@ def get_version(self): def _get_connection(self): conn = duckdb.connect() init_duckdb_gcs(conn, self.connection_info) - logger.debug("Initialized duckdb minio") + logger.debug("Initialized duckdb gcs") return conn def _get_dal_operator(self): diff --git a/ibis-server/tests/routers/v2/connector/test_gcs_file.py b/ibis-server/tests/routers/v2/connector/test_gcs_file.py index 6ca6695cb..cddcffef8 100644 --- a/ibis-server/tests/routers/v2/connector/test_gcs_file.py +++ b/ibis-server/tests/routers/v2/connector/test_gcs_file.py @@ -116,16 +116,16 @@ async def test_query(client, manifest_str, connection_info): 370, "O", "172799.49", - "1996-01-02 00:00:00.000000", + "1996-01-02", "1_370", ] assert result["dtypes"] == { "orderkey": "int32", "custkey": "int32", - "orderstatus": "object", - "totalprice": "float64", - "orderdate": "object", - "order_cust_key": "object", + "orderstatus": "string", + "totalprice": "decimal128(15, 2)", + "orderdate": "date32[day]", + "order_cust_key": "string", } @@ -163,7 +163,7 @@ async def test_query_calculated_field(client, manifest_str, connection_info): ] assert result["dtypes"] == { "custkey": "int32", - "sum_totalprice": "float64", + "sum_totalprice": "decimal128(38, 2)", } diff --git a/ibis-server/tests/routers/v2/connector/test_local_file.py b/ibis-server/tests/routers/v2/connector/test_local_file.py index 33f98ac4e..73c854335 100644 --- a/ibis-server/tests/routers/v2/connector/test_local_file.py +++ b/ibis-server/tests/routers/v2/connector/test_local_file.py @@ -107,16 +107,16 @@ async def test_query(client, manifest_str, connection_info): 370, "O", "172799.49", - "1996-01-02 00:00:00.000000", + "1996-01-02", "1_370", ] assert result["dtypes"] == { "orderkey": "int32", "custkey": "int32", - "orderstatus": "object", - "totalprice": "float64", - "orderdate": "object", - "order_cust_key": "object", + "orderstatus": "string", + "totalprice": "decimal128(15, 2)", + "orderdate": "date32[day]", + "order_cust_key": "string", } @@ -154,7 +154,7 @@ async def test_query_calculated_field(client, manifest_str, connection_info): ] assert result["dtypes"] == { "custkey": "int32", - "sum_totalprice": "float64", + "sum_totalprice": "decimal128(38, 2)", } diff --git a/ibis-server/tests/routers/v2/connector/test_minio_file.py b/ibis-server/tests/routers/v2/connector/test_minio_file.py index eafce84e8..b5d79b437 100644 --- a/ibis-server/tests/routers/v2/connector/test_minio_file.py +++ b/ibis-server/tests/routers/v2/connector/test_minio_file.py @@ -172,16 +172,16 @@ async def test_query(client, manifest_str, minio): 370, "O", "172799.49", - "1996-01-02 00:00:00.000000", + "1996-01-02", "1_370", ] assert result["dtypes"] == { "orderkey": "int32", "custkey": "int32", - "orderstatus": "object", - "totalprice": "float64", - "orderdate": "object", - "order_cust_key": "object", + "orderstatus": "string", + "totalprice": "decimal128(15, 2)", + "orderdate": "date32[day]", + "order_cust_key": "string", } @@ -221,7 +221,7 @@ async def test_query_calculated_field(client, manifest_str, minio): ] assert result["dtypes"] == { "custkey": "int32", - "sum_totalprice": "float64", + "sum_totalprice": "decimal128(38, 2)", } diff --git a/ibis-server/tests/routers/v2/connector/test_s3_file.py b/ibis-server/tests/routers/v2/connector/test_s3_file.py index 0e5ed035c..4e2603ae6 100644 --- a/ibis-server/tests/routers/v2/connector/test_s3_file.py +++ b/ibis-server/tests/routers/v2/connector/test_s3_file.py @@ -116,16 +116,16 @@ async def test_query(client, manifest_str, connection_info): 370, "O", "172799.49", - "1996-01-02 00:00:00.000000", + "1996-01-02", "1_370", ] assert result["dtypes"] == { "orderkey": "int32", "custkey": "int32", - "orderstatus": "object", - "totalprice": "float64", - "orderdate": "object", - "order_cust_key": "object", + "orderstatus": "string", + "totalprice": "decimal128(15, 2)", + "orderdate": "date32[day]", + "order_cust_key": "string", } @@ -163,7 +163,7 @@ async def test_query_calculated_field(client, manifest_str, connection_info): ] assert result["dtypes"] == { "custkey": "int32", - "sum_totalprice": "float64", + "sum_totalprice": "decimal128(38, 2)", } diff --git a/ibis-server/tests/routers/v3/connector/local_file/test_query.py b/ibis-server/tests/routers/v3/connector/local_file/test_query.py index 19c924086..8d918b3bf 100644 --- a/ibis-server/tests/routers/v3/connector/local_file/test_query.py +++ b/ibis-server/tests/routers/v3/connector/local_file/test_query.py @@ -94,14 +94,14 @@ async def test_query(client, manifest_str): 370, "O", "172799.49", - "1996-01-02 00:00:00.000000", + "1996-01-02", ] assert result["dtypes"] == { "orderkey": "int32", "custkey": "int32", - "orderstatus": "object", - "totalprice": "float64", - "orderdate": "object", + "orderstatus": "string", + "totalprice": "decimal128(15, 2)", + "orderdate": "date32[day]", } @@ -145,7 +145,7 @@ async def test_query_calculated_field(client, manifest_str): ] assert result["dtypes"] == { "custkey": "int32", - "sum_totalprice": "float64", + "sum_totalprice": "double", } From 4c3664321ac971699c059b39595463984c2f9242 Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Mon, 16 Jun 2025 16:29:33 +0800 Subject: [PATCH 14/26] use pyarrow for file connector --- ibis-server/app/model/connector.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ibis-server/app/model/connector.py b/ibis-server/app/model/connector.py index 5ec00b2d1..72596bd1f 100644 --- a/ibis-server/app/model/connector.py +++ b/ibis-server/app/model/connector.py @@ -201,9 +201,9 @@ def __init__(self, connection_info: ConnectionInfo): init_duckdb_gcs(self.connection, connection_info) @tracer.start_as_current_span("duckdb_query", kind=trace.SpanKind.INTERNAL) - def query(self, sql: str, limit: int) -> pd.DataFrame: + def query(self, sql: str, limit: int) -> pa.Table: try: - return self.connection.execute(sql).fetch_df().head(limit) + return self.connection.execute(sql).fetch_arrow_table().slice(length=limit) except IOException as e: raise UnprocessableEntityError(f"Failed to execute query: {e!s}") except HTTPException as e: From 12df74505c6b27e4943e8fa600558832bab31cee Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Mon, 16 Jun 2025 16:29:54 +0800 Subject: [PATCH 15/26] fix snowflake --- .../routers/v2/connector/test_snowflake.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/ibis-server/tests/routers/v2/connector/test_snowflake.py b/ibis-server/tests/routers/v2/connector/test_snowflake.py index ee374d7bb..82ea40493 100644 --- a/ibis-server/tests/routers/v2/connector/test_snowflake.py +++ b/ibis-server/tests/routers/v2/connector/test_snowflake.py @@ -90,22 +90,22 @@ async def test_query(client, manifest_str): 36901, "O", "173665.47", - "1996-01-02 00:00:00.000000", + "1996-01-02", "1_36901", "2024-01-01 23:59:59.000000", "2024-01-01 23:59:59.000000 UTC", None, ] assert result["dtypes"] == { - "orderkey": "int64", - "custkey": "int64", - "orderstatus": "object", - "totalprice": "object", - "orderdate": "object", - "order_cust_key": "object", - "timestamp": "object", - "timestamptz": "object", - "test_null_time": "datetime64[ns]", + "orderkey": "int32", + "custkey": "int32", + "orderstatus": "string", + "totalprice": "string", + "orderdate": "date32[day]", + "order_cust_key": "string", + "timestamp": "timestamp[us]", + "timestamptz": "timestamp[us, tz=UTC]", + "test_null_time": "timestamp[us]", } From d92924ac16c8e5a04a51b9a56e100803784a0ca9 Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Tue, 17 Jun 2025 14:46:32 +0800 Subject: [PATCH 16/26] fix oracle test --- .../tests/routers/v2/connector/test_oracle.py | 68 ++++++++++++------- .../routers/v3/connector/oracle/conftest.py | 32 ++++++++- .../routers/v3/connector/oracle/test_query.py | 24 ++++--- 3 files changed, 89 insertions(+), 35 deletions(-) diff --git a/ibis-server/tests/routers/v2/connector/test_oracle.py b/ibis-server/tests/routers/v2/connector/test_oracle.py index 6aa20be2b..02ffce8a9 100644 --- a/ibis-server/tests/routers/v2/connector/test_oracle.py +++ b/ibis-server/tests/routers/v2/connector/test_oracle.py @@ -66,11 +66,12 @@ "expression": "CAST(NULL AS TIMESTAMP)", "type": "timestamp", }, - { - "name": "blob_column", - "expression": "UTL_RAW.CAST_TO_RAW('abc')", - "type": "blob", - }, + # TODO: ibis to pyarrow conversion does not support CLOBs yet + # { + # "name": "blob_column", + # "expression": "UTL_RAW.CAST_TO_RAW('abc')", + # "type": "blob", + # }, ], "primaryKey": "orderkey", } @@ -122,20 +123,43 @@ def oracle(request) -> OracleDbContainer: "gvenzl/oracle-free:23.6-slim-faststart", oracle_password=f"{oracle_password}" ).start() engine = sqlalchemy.create_engine(oracle.get_connection_url()) + orders_schema = { + "o_orderkey": sqlalchemy.Integer(), + "o_custkey": sqlalchemy.Integer(), + "o_orderstatus": sqlalchemy.String(255), + "o_totalprice": sqlalchemy.DECIMAL(precision=38, scale=2), + "o_orderdate": sqlalchemy.Date(), + "o_orderpriority": sqlalchemy.String(255), + "o_clerk": sqlalchemy.String(255), + "o_shippriority": sqlalchemy.Integer(), + "o_comment": sqlalchemy.String(255), + } + customer_schema = { + "c_custkey": sqlalchemy.Integer(), + "c_name": sqlalchemy.String(255), + "c_address": sqlalchemy.String(255), + "c_nationkey": sqlalchemy.Integer(), + "c_phone": sqlalchemy.String(255), + "c_acctbal": sqlalchemy.DECIMAL(precision=38, scale=2), + "c_mktsegment": sqlalchemy.String(255), + "c_comment": sqlalchemy.String(255), + } with engine.begin() as conn: + # assign dtype to avoid to create CLOB column for text columns pd.read_parquet(file_path("resource/tpch/data/orders.parquet")).to_sql( - "orders", engine, index=False + "orders", engine, index=False, dtype=orders_schema ) pd.read_parquet(file_path("resource/tpch/data/customer.parquet")).to_sql( - "customer", engine, index=False + "customer", engine, index=False, dtype=customer_schema ) + # TODO: ibis to pyarrow conversion does not support CLOBs yet # Create a table with a large CLOB column - large_text = "x" * (1024 * 1024 * 2) # 2MB - conn.execute(text("CREATE TABLE test_lob (id NUMBER, content CLOB)")) - conn.execute( - text("INSERT INTO test_lob VALUES (1, :content)"), {"content": large_text} - ) + # large_text = "x" * (1024 * 1024 * 2) # 2MB + # conn.execute(text("CREATE TABLE test_lob (id NUMBER, content CLOB)")) + # conn.execute( + # text("INSERT INTO test_lob VALUES (1, :content)"), {"content": large_text} + # ) # Add table and column comments conn.execute(text("COMMENT ON TABLE orders IS 'This is a table comment'")) @@ -163,24 +187,22 @@ async def test_query(client, manifest_str, oracle: OracleDbContainer): 370, "O", "172799.49", - "1996-01-02 00:00:00.000000", + "1996-01-02", "1_370", "2024-01-01 23:59:59.000000", "2024-01-01 23:59:59.000000 UTC", None, - "616263", ] assert result["dtypes"] == { "orderkey": "int64", "custkey": "int64", - "orderstatus": "object", - "totalprice": "object", - "orderdate": "object", - "order_cust_key": "object", - "timestamp": "object", - "timestamptz": "object", - "test_null_time": "datetime64[ns]", - "blob_column": "object", + "orderstatus": "string", + "totalprice": "decimal128(38, 2)", + "orderdate": "date32[day]", + "order_cust_key": "string", + "timestamp": "timestamp[ns]", + "timestamptz": "timestamp[ns, tz=UTC]", + "test_null_time": "timestamp[us]", } @@ -405,7 +427,7 @@ async def test_metadata_list_tables(client, oracle: OracleDbContainer): assert result["columns"][8] == { "name": "O_COMMENT", "nestedColumns": None, - "type": "TEXT", + "type": "VARCHAR", "notNull": False, "description": "This is a comment", "properties": None, diff --git a/ibis-server/tests/routers/v3/connector/oracle/conftest.py b/ibis-server/tests/routers/v3/connector/oracle/conftest.py index 44a38a326..0a942955d 100644 --- a/ibis-server/tests/routers/v3/connector/oracle/conftest.py +++ b/ibis-server/tests/routers/v3/connector/oracle/conftest.py @@ -29,13 +29,41 @@ def oracle(request) -> OracleDbContainer: oracle = OracleDbContainer( "gvenzl/oracle-free:23.6-slim-faststart", oracle_password=f"{oracle_password}" ).start() + orders_schema = { + "o_orderkey": sqlalchemy.Integer(), + "o_custkey": sqlalchemy.Integer(), + "o_orderstatus": sqlalchemy.String(255), + "o_totalprice": sqlalchemy.DECIMAL(precision=38, scale=2), + "o_orderdate": sqlalchemy.Date(), + "o_orderpriority": sqlalchemy.String(255), + "o_clerk": sqlalchemy.String(255), + "o_shippriority": sqlalchemy.Integer(), + "o_comment": sqlalchemy.String(255), + } + customer_schema = { + "c_custkey": sqlalchemy.Integer(), + "c_name": sqlalchemy.String(255), + "c_address": sqlalchemy.String(255), + "c_nationkey": sqlalchemy.Integer(), + "c_phone": sqlalchemy.String(255), + "c_acctbal": sqlalchemy.DECIMAL(precision=38, scale=2), + "c_mktsegment": sqlalchemy.String(255), + "c_comment": sqlalchemy.String(255), + } engine = sqlalchemy.create_engine(oracle.get_connection_url()) with engine.begin() as conn: + # assign dtype to avoid to create CLOB column for text columns pd.read_parquet(file_path("resource/tpch/data/orders.parquet")).to_sql( - "orders", engine, index=False + "orders", + engine, + index=False, + dtype=orders_schema, ) pd.read_parquet(file_path("resource/tpch/data/customer.parquet")).to_sql( - "customer", engine, index=False + "customer", + engine, + index=False, + dtype=customer_schema, ) # Create a table with a large CLOB column diff --git a/ibis-server/tests/routers/v3/connector/oracle/test_query.py b/ibis-server/tests/routers/v3/connector/oracle/test_query.py index 8c0fb358d..dfef792e7 100644 --- a/ibis-server/tests/routers/v3/connector/oracle/test_query.py +++ b/ibis-server/tests/routers/v3/connector/oracle/test_query.py @@ -94,7 +94,7 @@ async def test_query(client, manifest_str, connection_info): 370, "O", "172799.49", - "1996-01-02 00:00:00.000000", + "1996-01-02", "1_370", "2024-01-01 23:59:59.000000", "2024-01-01 23:59:59.000000 UTC", @@ -103,13 +103,13 @@ async def test_query(client, manifest_str, connection_info): assert result["dtypes"] == { "orderkey": "int64", "custkey": "int64", - "orderstatus": "object", - "totalprice": "object", - "orderdate": "object", - "order_cust_key": "object", - "timestamp": "object", - "timestamptz": "object", - "test_null_time": "datetime64[ns]", + "orderstatus": "string", + "totalprice": "decimal128(38, 2)", + "orderdate": "date32[day]", + "order_cust_key": "string", + "timestamp": "timestamp[ns]", + "timestamptz": "timestamp[ns, tz=UTC]", + "test_null_time": "timestamp[us]", } @@ -175,7 +175,11 @@ async def test_query_number_scale(client, connection_info): "columns": [ {"name": "id", "expression": '"ID"', "type": "number"}, {"name": "id_p", "expression": '"ID_P"', "type": "number"}, - {"name": "id_p_s", "expression": '"ID_P_S"', "type": "number"}, + { + "name": "id_p_s", + "expression": '"ID_P_S"', + "type": "decimal128(10, 2)", + }, ], "primaryKey": "id", } @@ -203,5 +207,5 @@ async def test_query_number_scale(client, connection_info): assert result["dtypes"] == { "id": "int64", "id_p": "int64", - "id_p_s": "object", + "id_p_s": "decimal128(10, 2)", } From a39c9316c241d461fc8138b2020c406ee9b97c68 Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Tue, 17 Jun 2025 14:47:18 +0800 Subject: [PATCH 17/26] add oracle for local script --- ibis-server/tools/query_local_run.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ibis-server/tools/query_local_run.py b/ibis-server/tools/query_local_run.py index 770540c15..e0163280d 100644 --- a/ibis-server/tools/query_local_run.py +++ b/ibis-server/tools/query_local_run.py @@ -14,7 +14,7 @@ import base64 import json import os -from app.model import MySqlConnectionInfo, PostgresConnectionInfo +from app.model import MySqlConnectionInfo, OracleConnectionInfo, PostgresConnectionInfo from app.util import to_json import sqlglot import sys @@ -88,6 +88,9 @@ elif data_source == "postgres": connection_info = PostgresConnectionInfo.model_validate_json(json.dumps(connection_info)) connection = DataSourceExtension.get_postgres_connection(connection_info) +elif data_source == "oracle": + connection_info = OracleConnectionInfo.model_validate_json(json.dumps(connection_info)) + connection = DataSourceExtension.get_oracle_connection(connection_info) else: raise Exception("Unsupported data source:", data_source) From c06b0df406f5950daf3a31e186d8a45ad21502d8 Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Tue, 17 Jun 2025 15:27:58 +0800 Subject: [PATCH 18/26] fix validator --- ibis-server/app/model/validator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ibis-server/app/model/validator.py b/ibis-server/app/model/validator.py index 30ecbb228..d792a480c 100644 --- a/ibis-server/app/model/validator.py +++ b/ibis-server/app/model/validator.py @@ -142,7 +142,7 @@ def format_result(result): ) try: rewritten_sql = await self.rewriter.rewrite(sql) - result = self.connector.query(rewritten_sql, limit=1) + result = self.connector.query(rewritten_sql, limit=1).to_pandas() if not result.get("result").get(0): raise ValidationError( f"Relationship {relationship_name} is not valid: {format_result(result)}" From c261cc0e40083670f9dca72ff0b6e1f500a2003c Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Tue, 17 Jun 2025 15:57:08 +0800 Subject: [PATCH 19/26] fix athena test --- .../tests/routers/v2/connector/test_athena.py | 36 +++++++++---------- .../routers/v3/connector/athena/test_query.py | 16 ++++----- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/ibis-server/tests/routers/v2/connector/test_athena.py b/ibis-server/tests/routers/v2/connector/test_athena.py index f0267d7be..fe1b54b07 100644 --- a/ibis-server/tests/routers/v2/connector/test_athena.py +++ b/ibis-server/tests/routers/v2/connector/test_athena.py @@ -104,7 +104,7 @@ async def test_query(client, manifest_str): 36901, "O", "173665.47", - "1996-01-02 00:00:00.000000", + "1996-01-02", "1_36901", "2024-01-01 23:59:59.000000", "2024-01-01 23:59:59.000000", @@ -114,14 +114,14 @@ async def test_query(client, manifest_str): assert result["dtypes"] == { "orderkey": "int64", "custkey": "int64", - "orderstatus": "object", - "totalprice": "object", ### fixme this should be float64 - "orderdate": "object", - "order_cust_key": "object", - "timestamp": "object", - "timestamptz": "object", - "test_null_time": "datetime64[ns]", - "bytea_column": "object", + "orderstatus": "string", + "totalprice": "decimal128(15, 2)", + "orderdate": "date32[day]", + "order_cust_key": "string", + "timestamp": "timestamp[us]", + "timestamptz": "timestamp[us]", + "test_null_time": "timestamp[us]", + "bytea_column": "binary", } @@ -144,7 +144,7 @@ async def test_query_glue_database(client, manifest_str): 36901, "O", "173665.47", - "1996-01-02 00:00:00.000000", + "1996-01-02", "1_36901", "2024-01-01 23:59:59.000000", "2024-01-01 23:59:59.000000", @@ -154,14 +154,14 @@ async def test_query_glue_database(client, manifest_str): assert result["dtypes"] == { "orderkey": "int64", "custkey": "int64", - "orderstatus": "object", - "totalprice": "object", ### fixme this should be float64 - "orderdate": "object", - "order_cust_key": "object", - "timestamp": "object", - "timestamptz": "object", - "test_null_time": "datetime64[ns]", - "bytea_column": "object", + "orderstatus": "string", + "totalprice": "decimal128(15, 2)", + "orderdate": "date32[day]", + "order_cust_key": "string", + "timestamp": "timestamp[us]", + "timestamptz": "timestamp[us]", + "test_null_time": "timestamp[us]", + "bytea_column": "binary", } diff --git a/ibis-server/tests/routers/v3/connector/athena/test_query.py b/ibis-server/tests/routers/v3/connector/athena/test_query.py index c8b980248..74a48a3a8 100644 --- a/ibis-server/tests/routers/v3/connector/athena/test_query.py +++ b/ibis-server/tests/routers/v3/connector/athena/test_query.py @@ -79,7 +79,7 @@ async def test_query(client, manifest_str, connection_info): 36901, "O", "173665.47", - "1996-01-02 00:00:00.000000", + "1996-01-02", "1_36901", "2024-01-01 23:59:59.000000", "2024-01-01 23:59:59.000000", @@ -89,13 +89,13 @@ async def test_query(client, manifest_str, connection_info): assert result["dtypes"] == { "orderkey": "int64", "custkey": "int64", - "orderstatus": "object", - "totalprice": "object", ### fixme this should be float64 - "orderdate": "object", - "order_cust_key": "object", - "timestamp": "object", - "timestamptz": "object", - "test_null_time": "datetime64[ns]", + "orderstatus": "string", + "totalprice": "decimal128(15, 2)", + "orderdate": "date32[day]", + "order_cust_key": "string", + "timestamp": "timestamp[us]", + "timestamptz": "timestamp[us]", + "test_null_time": "timestamp[us]", } From b97a359592b610e781dbbf79a138146af890230d Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Wed, 18 Jun 2025 10:13:23 +0800 Subject: [PATCH 20/26] address comments --- ibis-server/app/model/metadata/redshift.py | 4 ++-- ibis-server/app/util.py | 11 ++++------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/ibis-server/app/model/metadata/redshift.py b/ibis-server/app/model/metadata/redshift.py index 060467f63..495f8ed5d 100644 --- a/ibis-server/app/model/metadata/redshift.py +++ b/ibis-server/app/model/metadata/redshift.py @@ -51,7 +51,7 @@ def get_table_list(self) -> list[Table]: # we reuse the connector.query method to execute the SQL # so we have to give a limit to avoid too many rows # the default limit for tables metadata is 500, i think it's a sensible limit - response = self.connector.query(sql, limit=500).to_dict(orient="records") + response = self.connector.query(sql, limit=500).to_pylist() unique_tables = {} for row in response: @@ -104,7 +104,7 @@ def get_constraints(self) -> list[Constraint]: # we reuse the connector.query method to execute the SQL # so we have to give a limit to avoid too many rows # the default limit for constraints metadata is 500, i think it's a sensible limit - response = self.connector.query(sql, limit=500).to_dict(orient="records") + response = self.connector.query(sql, limit=500).to_pylist() constraints = [] for row in response: constraints.append( diff --git a/ibis-server/app/util.py b/ibis-server/app/util.py index 1417ad80f..c74571089 100644 --- a/ibis-server/app/util.py +++ b/ibis-server/app/util.py @@ -79,13 +79,10 @@ def get_timezone_from_offset(offset: str) -> str: if offset.startswith("+"): offset = offset[1:] # Remove the leading '+' sign - sql = f""" - SELECT name, utc_offset - FROM pg_timezone_names() - WHERE utc_offset = '{offset}' - """ - - first = duckdb.execute(sql).fetchone() + first = duckdb.execute( + "SELECT name, utc_offset FROM pg_timezone_names() WHERE utc_offset = ?", + [offset], + ).fetchone() if first is None: raise ValueError(f"Invalid timezone offset: {offset}") return first[0] # Return the timezone name From fb1df6dea446c0a0d4400430bcf3fdc205043727 Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Wed, 18 Jun 2025 14:12:02 +0800 Subject: [PATCH 21/26] change redsfift result to pyarrow --- ibis-server/app/model/connector.py | 3 ++- .../tests/routers/v2/connector/test_redshift.py | 16 ++++++++-------- .../routers/v3/connector/redshift/test_query.py | 14 +++++++------- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/ibis-server/app/model/connector.py b/ibis-server/app/model/connector.py index 72596bd1f..e4f68a0b8 100644 --- a/ibis-server/app/model/connector.py +++ b/ibis-server/app/model/connector.py @@ -250,7 +250,8 @@ def query(self, sql: str, limit: int) -> pd.DataFrame: cursor.execute(sql) cols = [desc[0] for desc in cursor.description] rows = cursor.fetchall() - return pd.DataFrame(rows, columns=cols).head(limit) + df = pd.DataFrame(rows, columns=cols).head(limit) + return pa.Table.from_pandas(df) @tracer.start_as_current_span("connector_dry_run", kind=trace.SpanKind.CLIENT) def dry_run(self, sql: str) -> None: diff --git a/ibis-server/tests/routers/v2/connector/test_redshift.py b/ibis-server/tests/routers/v2/connector/test_redshift.py index 3b02628cf..3ba96011a 100644 --- a/ibis-server/tests/routers/v2/connector/test_redshift.py +++ b/ibis-server/tests/routers/v2/connector/test_redshift.py @@ -117,14 +117,14 @@ async def test_query(client, manifest_str): assert result["dtypes"] == { "orderkey": "int64", "custkey": "int64", - "orderstatus": "object", - "totalprice": "object", - "orderdate": "object", - "order_cust_key": "object", - "timestamp": "object", - "timestamptz": "object", - "test_null_time": "object", - "bytea_column": "object", + "orderstatus": "string", + "totalprice": "decimal128(5, 2)", + "orderdate": "date32[day]", + "order_cust_key": "string", + "timestamp": "timestamp[ns]", + "timestamptz": "timestamp[ns, tz=UTC]", + "test_null_time": "null", + "bytea_column": "string", } diff --git a/ibis-server/tests/routers/v3/connector/redshift/test_query.py b/ibis-server/tests/routers/v3/connector/redshift/test_query.py index 38e7ec6d3..791c3c85e 100644 --- a/ibis-server/tests/routers/v3/connector/redshift/test_query.py +++ b/ibis-server/tests/routers/v3/connector/redshift/test_query.py @@ -81,13 +81,13 @@ async def test_query(client, manifest_str, connection_info): assert result["dtypes"] == { "orderkey": "int64", "custkey": "int64", - "orderstatus": "object", - "totalprice": "object", - "orderdate": "object", - "order_cust_key": "object", - "timestamp": "object", - "timestamptz": "object", - "test_null_time": "object", + "orderstatus": "string", + "totalprice": "decimal128(5, 2)", + "orderdate": "date32[day]", + "order_cust_key": "string", + "timestamp": "timestamp[ns]", + "timestamptz": "timestamp[ns]", + "test_null_time": "null", } From 8e789a78571b4420c40ac863c09df8f4a8c42651 Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Wed, 18 Jun 2025 14:55:45 +0800 Subject: [PATCH 22/26] address comment --- ibis-server/app/util.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ibis-server/app/util.py b/ibis-server/app/util.py index c74571089..551be7302 100644 --- a/ibis-server/app/util.py +++ b/ibis-server/app/util.py @@ -43,8 +43,7 @@ def base64_to_dict(base64_str: str) -> dict: def to_json(df: pa.Table, headers: dict) -> dict: dtypes = {field.name: str(field.type) for field in df.schema} if df.num_rows == 0: - columns = {field.name: [] for field in df.schema} - return {"columns": columns, "data": [], "dtypes": dtypes} + return {"columns": [field.name for field in df.schema], "data": [], "dtypes": dtypes} formatted_sql = ( "SELECT " + ", ".join([_formater(field) for field in df.schema]) + " FROM df" @@ -67,7 +66,7 @@ def get_duckdb_conn(headers: dict) -> duckdb.DuckDBPyConnection: if timezone.startwith("+") or timezone.startswith("-"): # If the timezone is an offset, convert it to a named timezone timezone = get_timezone_from_offset(timezone) - conn.execute(f"SET TimeZone = '{timezone}'") + conn.execute("SET TimeZone = ?", [timezone]) else: # Default to UTC if no timezone is provided conn.execute("SET TimeZone = 'UTC'") From 8c639ec1aca37b5734fa51c116a18ee02f8c15bc Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Wed, 18 Jun 2025 16:29:10 +0800 Subject: [PATCH 23/26] fix fmt --- ibis-server/app/util.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ibis-server/app/util.py b/ibis-server/app/util.py index 551be7302..5917c5101 100644 --- a/ibis-server/app/util.py +++ b/ibis-server/app/util.py @@ -43,7 +43,11 @@ def base64_to_dict(base64_str: str) -> dict: def to_json(df: pa.Table, headers: dict) -> dict: dtypes = {field.name: str(field.type) for field in df.schema} if df.num_rows == 0: - return {"columns": [field.name for field in df.schema], "data": [], "dtypes": dtypes} + return { + "columns": [field.name for field in df.schema], + "data": [], + "dtypes": dtypes, + } formatted_sql = ( "SELECT " + ", ".join([_formater(field) for field in df.schema]) + " FROM df" From 6c737ccba4d446c36a6107f230096379b601c83b Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Fri, 20 Jun 2025 10:32:22 +0800 Subject: [PATCH 24/26] fix return type --- ibis-server/app/model/connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ibis-server/app/model/connector.py b/ibis-server/app/model/connector.py index e4f68a0b8..ffaa91fe9 100644 --- a/ibis-server/app/model/connector.py +++ b/ibis-server/app/model/connector.py @@ -245,7 +245,7 @@ def __init__(self, connection_info: RedshiftConnectionUnion): raise ValueError("Invalid Redshift connection_info type") @tracer.start_as_current_span("connector_query", kind=trace.SpanKind.CLIENT) - def query(self, sql: str, limit: int) -> pd.DataFrame: + def query(self, sql: str, limit: int) -> pa.Table: with closing(self.connection.cursor()) as cursor: cursor.execute(sql) cols = [desc[0] for desc in cursor.description] From 15723c61634ea905636de59a37d674044b3720a6 Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Fri, 20 Jun 2025 16:30:15 +0800 Subject: [PATCH 25/26] fix clob for oracle --- ibis-server/poetry.lock | 45 ++++++++-------- ibis-server/pyproject.toml | 2 +- .../tests/routers/v2/connector/test_oracle.py | 54 ++++++++++--------- .../routers/v3/connector/oracle/conftest.py | 18 +++---- 4 files changed, 62 insertions(+), 57 deletions(-) diff --git a/ibis-server/poetry.lock b/ibis-server/poetry.lock index 9a09f5b31..c9f5ee91f 100644 --- a/ibis-server/poetry.lock +++ b/ibis-server/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. [[package]] name = "aiobotocore" @@ -1933,35 +1933,35 @@ files = [ [package.dependencies] atpublic = ">=2.3" -clickhouse-connect = {version = ">=0.5.23", extras = ["arrow", "numpy", "pandas"], optional = true, markers = "extra == \"clickhouse\""} -db-dtypes = {version = ">=0.3", optional = true, markers = "extra == \"bigquery\""} -fsspec = {version = "*", extras = ["s3"], optional = true, markers = "extra == \"athena\""} -google-cloud-bigquery = {version = ">=3", optional = true, markers = "extra == \"bigquery\""} -google-cloud-bigquery-storage = {version = ">=2", optional = true, markers = "extra == \"bigquery\""} -mysqlclient = {version = ">=2.2.4", optional = true, markers = "extra == \"mysql\""} -numpy = {version = ">=1.23.2,<3", optional = true, markers = "extra == \"athena\" or extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"postgres\" or extra == \"snowflake\" or extra == \"trino\""} -oracledb = {version = ">=1.3.1", optional = true, markers = "extra == \"oracle\""} -packaging = {version = ">=21.3", optional = true, markers = "extra == \"athena\" or extra == \"oracle\""} -pandas = {version = ">=1.5.3,<3", optional = true, markers = "extra == \"athena\" or extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"postgres\" or extra == \"snowflake\" or extra == \"trino\""} -pandas-gbq = {version = ">=0.26.1", optional = true, markers = "extra == \"bigquery\""} +clickhouse-connect = {version = ">=0.5.23", extras = ["arrow", "numpy", "pandas"], optional = true} +db-dtypes = {version = ">=0.3", optional = true} +fsspec = {version = "*", extras = ["s3"], optional = true} +google-cloud-bigquery = {version = ">=3", optional = true} +google-cloud-bigquery-storage = {version = ">=2", optional = true} +mysqlclient = {version = ">=2.2.4", optional = true} +numpy = {version = ">=1.23.2,<3", optional = true} +oracledb = {version = ">=1.3.1", optional = true} +packaging = {version = ">=21.3", optional = true} +pandas = {version = ">=1.5.3,<3", optional = true} +pandas-gbq = {version = ">=0.26.1", optional = true} parsy = ">=2" -psycopg = {version = ">=3.2.0", extras = ["binary"], optional = true, markers = "extra == \"postgres\""} -pyarrow = {version = ">=10.0.1", optional = true, markers = "extra == \"athena\" or extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"postgres\" or extra == \"snowflake\" or extra == \"trino\""} -pyarrow-hotfix = {version = ">=0.4", optional = true, markers = "extra == \"athena\" or extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"postgres\" or extra == \"snowflake\" or extra == \"trino\""} -pyathena = {version = ">=3.11.0", extras = ["arrow", "pandas"], optional = true, markers = "extra == \"athena\""} -pydata-google-auth = {version = ">=1.4.0", optional = true, markers = "extra == \"bigquery\""} -pyodbc = {version = ">=4.0.39", optional = true, markers = "extra == \"mssql\""} +psycopg = {version = ">=3.2.0", extras = ["binary"], optional = true} +pyarrow = {version = ">=10.0.1", optional = true} +pyarrow-hotfix = {version = ">=0.4", optional = true} +PyAthena = {version = ">=3.11.0", extras = ["arrow", "pandas"], optional = true} +pydata-google-auth = {version = ">=1.4.0", optional = true} +pyodbc = {version = ">=4.0.39", optional = true} python-dateutil = ">=2.8.2" -rich = {version = ">=12.4.4", optional = true, markers = "extra == \"athena\" or extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"postgres\" or extra == \"snowflake\" or extra == \"trino\""} -snowflake-connector-python = {version = ">=3.0.2,<3.3.0b1 || >3.3.0b1", optional = true, markers = "extra == \"snowflake\""} +rich = {version = ">=12.4.4", optional = true} +snowflake-connector-python = {version = ">=3.0.2,<3.3.0b1 || >3.3.0b1", optional = true} sqlglot = ">=23.4" toolz = ">=0.11" -trino = {version = ">=0.321", optional = true, markers = "extra == \"trino\""} +trino = {version = ">=0.321", optional = true} typing-extensions = ">=4.3.0" tzdata = ">=2022.7" [package.extras] -athena = ["fsspec[s3]", "numpy (>=1.23.2,<3)", "packaging (>=21.3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1)", "pyarrow-hotfix (>=0.4)", "pyathena[arrow,pandas] (>=3.11.0)", "rich (>=12.4.4)"] +athena = ["PyAthena[arrow,pandas] (>=3.11.0)", "fsspec[s3]", "numpy (>=1.23.2,<3)", "packaging (>=21.3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1)", "pyarrow-hotfix (>=0.4)", "rich (>=12.4.4)"] bigquery = ["db-dtypes (>=0.3)", "google-cloud-bigquery (>=3)", "google-cloud-bigquery-storage (>=2)", "numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pandas-gbq (>=0.26.1)", "pyarrow (>=10.0.1)", "pyarrow-hotfix (>=0.4)", "pydata-google-auth (>=1.4.0)", "rich (>=12.4.4)"] clickhouse = ["clickhouse-connect[arrow,numpy,pandas] (>=0.5.23)", "numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1)", "pyarrow-hotfix (>=0.4)", "rich (>=12.4.4)"] databricks = ["databricks-sql-connector-core (>=4)", "numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1)", "pyarrow-hotfix (>=0.4)", "rich (>=12.4.4)"] @@ -3311,7 +3311,6 @@ files = [ {file = "psycopg2-2.9.10-cp311-cp311-win_amd64.whl", hash = "sha256:0435034157049f6846e95103bd8f5a668788dd913a7c30162ca9503fdf542cb4"}, {file = "psycopg2-2.9.10-cp312-cp312-win32.whl", hash = "sha256:65a63d7ab0e067e2cdb3cf266de39663203d38d6a8ed97f5ca0cb315c73fe067"}, {file = "psycopg2-2.9.10-cp312-cp312-win_amd64.whl", hash = "sha256:4a579d6243da40a7b3182e0430493dbd55950c493d8c68f4eec0b302f6bbf20e"}, - {file = "psycopg2-2.9.10-cp313-cp313-win_amd64.whl", hash = "sha256:91fd603a2155da8d0cfcdbf8ab24a2d54bca72795b90d2a3ed2b6da8d979dee2"}, {file = "psycopg2-2.9.10-cp39-cp39-win32.whl", hash = "sha256:9d5b3b94b79a844a986d029eee38998232451119ad653aea42bb9220a8c5066b"}, {file = "psycopg2-2.9.10-cp39-cp39-win_amd64.whl", hash = "sha256:88138c8dedcbfa96408023ea2b0c369eda40fe5d75002c0964c78f46f11fa442"}, {file = "psycopg2-2.9.10.tar.gz", hash = "sha256:12ec0b40b0273f95296233e8750441339298e6a572f7039da5b260e3c8b60e11"}, diff --git a/ibis-server/pyproject.toml b/ibis-server/pyproject.toml index 75e87d51d..0a2a82f46 100644 --- a/ibis-server/pyproject.toml +++ b/ibis-server/pyproject.toml @@ -10,7 +10,7 @@ packages = [{ include = "app" }] python = ">=3.11,<3.12" fastapi = { version = "0.115.12", extras = ["standard"] } pydantic = "2.10.6" -ibis-framework = { version = "10.6.0", extras = [ +ibis-framework = { git = "https://github.com/Canner/ibis.git", branch = "canner/10.6.1", extras = [ "athena", "bigquery", "clickhouse", diff --git a/ibis-server/tests/routers/v2/connector/test_oracle.py b/ibis-server/tests/routers/v2/connector/test_oracle.py index 02ffce8a9..66feb09da 100644 --- a/ibis-server/tests/routers/v2/connector/test_oracle.py +++ b/ibis-server/tests/routers/v2/connector/test_oracle.py @@ -66,12 +66,11 @@ "expression": "CAST(NULL AS TIMESTAMP)", "type": "timestamp", }, - # TODO: ibis to pyarrow conversion does not support CLOBs yet - # { - # "name": "blob_column", - # "expression": "UTL_RAW.CAST_TO_RAW('abc')", - # "type": "blob", - # }, + { + "name": "blob_column", + "expression": "UTL_RAW.CAST_TO_RAW('abc')", + "type": "blob", + }, ], "primaryKey": "orderkey", } @@ -126,40 +125,45 @@ def oracle(request) -> OracleDbContainer: orders_schema = { "o_orderkey": sqlalchemy.Integer(), "o_custkey": sqlalchemy.Integer(), - "o_orderstatus": sqlalchemy.String(255), + "o_orderstatus": sqlalchemy.Text(), "o_totalprice": sqlalchemy.DECIMAL(precision=38, scale=2), "o_orderdate": sqlalchemy.Date(), - "o_orderpriority": sqlalchemy.String(255), - "o_clerk": sqlalchemy.String(255), + "o_orderpriority": sqlalchemy.Text(), + "o_clerk": sqlalchemy.Text(), "o_shippriority": sqlalchemy.Integer(), - "o_comment": sqlalchemy.String(255), + "o_comment": sqlalchemy.Text(), } customer_schema = { "c_custkey": sqlalchemy.Integer(), - "c_name": sqlalchemy.String(255), - "c_address": sqlalchemy.String(255), + "c_name": sqlalchemy.Text(), + "c_address": sqlalchemy.Text(), "c_nationkey": sqlalchemy.Integer(), - "c_phone": sqlalchemy.String(255), + "c_phone": sqlalchemy.Text(), "c_acctbal": sqlalchemy.DECIMAL(precision=38, scale=2), - "c_mktsegment": sqlalchemy.String(255), - "c_comment": sqlalchemy.String(255), + "c_mktsegment": sqlalchemy.Text(), + "c_comment": sqlalchemy.Text(), } with engine.begin() as conn: # assign dtype to avoid to create CLOB column for text columns pd.read_parquet(file_path("resource/tpch/data/orders.parquet")).to_sql( - "orders", engine, index=False, dtype=orders_schema + "orders", + engine, + index=False, + dtype=orders_schema, ) pd.read_parquet(file_path("resource/tpch/data/customer.parquet")).to_sql( - "customer", engine, index=False, dtype=customer_schema + "customer", + engine, + index=False, + dtype=customer_schema, ) - # TODO: ibis to pyarrow conversion does not support CLOBs yet # Create a table with a large CLOB column - # large_text = "x" * (1024 * 1024 * 2) # 2MB - # conn.execute(text("CREATE TABLE test_lob (id NUMBER, content CLOB)")) - # conn.execute( - # text("INSERT INTO test_lob VALUES (1, :content)"), {"content": large_text} - # ) + large_text = "x" * (1024 * 1024 * 2) # 2MB + conn.execute(text("CREATE TABLE test_lob (id NUMBER, content CLOB)")) + conn.execute( + text("INSERT INTO test_lob VALUES (1, :content)"), {"content": large_text} + ) # Add table and column comments conn.execute(text("COMMENT ON TABLE orders IS 'This is a table comment'")) @@ -192,6 +196,7 @@ async def test_query(client, manifest_str, oracle: OracleDbContainer): "2024-01-01 23:59:59.000000", "2024-01-01 23:59:59.000000 UTC", None, + "616263", ] assert result["dtypes"] == { "orderkey": "int64", @@ -203,6 +208,7 @@ async def test_query(client, manifest_str, oracle: OracleDbContainer): "timestamp": "timestamp[ns]", "timestamptz": "timestamp[ns, tz=UTC]", "test_null_time": "timestamp[us]", + "blob_column": "binary", } @@ -427,7 +433,7 @@ async def test_metadata_list_tables(client, oracle: OracleDbContainer): assert result["columns"][8] == { "name": "O_COMMENT", "nestedColumns": None, - "type": "VARCHAR", + "type": "TEXT", "notNull": False, "description": "This is a comment", "properties": None, diff --git a/ibis-server/tests/routers/v3/connector/oracle/conftest.py b/ibis-server/tests/routers/v3/connector/oracle/conftest.py index 0a942955d..f044ab42d 100644 --- a/ibis-server/tests/routers/v3/connector/oracle/conftest.py +++ b/ibis-server/tests/routers/v3/connector/oracle/conftest.py @@ -32,23 +32,23 @@ def oracle(request) -> OracleDbContainer: orders_schema = { "o_orderkey": sqlalchemy.Integer(), "o_custkey": sqlalchemy.Integer(), - "o_orderstatus": sqlalchemy.String(255), + "o_orderstatus": sqlalchemy.Text(), "o_totalprice": sqlalchemy.DECIMAL(precision=38, scale=2), "o_orderdate": sqlalchemy.Date(), - "o_orderpriority": sqlalchemy.String(255), - "o_clerk": sqlalchemy.String(255), + "o_orderpriority": sqlalchemy.Text(), + "o_clerk": sqlalchemy.Text(), "o_shippriority": sqlalchemy.Integer(), - "o_comment": sqlalchemy.String(255), + "o_comment": sqlalchemy.Text(), } customer_schema = { "c_custkey": sqlalchemy.Integer(), - "c_name": sqlalchemy.String(255), - "c_address": sqlalchemy.String(255), + "c_name": sqlalchemy.Text(), + "c_address": sqlalchemy.Text(), "c_nationkey": sqlalchemy.Integer(), - "c_phone": sqlalchemy.String(255), + "c_phone": sqlalchemy.Text(), "c_acctbal": sqlalchemy.DECIMAL(precision=38, scale=2), - "c_mktsegment": sqlalchemy.String(255), - "c_comment": sqlalchemy.String(255), + "c_mktsegment": sqlalchemy.Text(), + "c_comment": sqlalchemy.Text(), } engine = sqlalchemy.create_engine(oracle.get_connection_url()) with engine.begin() as conn: From 76c5114ac63fe5f743c1eb7edb9fb87c00f5da0b Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Fri, 20 Jun 2025 16:32:11 +0800 Subject: [PATCH 26/26] update lock --- ibis-server/poetry.lock | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/ibis-server/poetry.lock b/ibis-server/poetry.lock index c9f5ee91f..bdb2a1338 100644 --- a/ibis-server/poetry.lock +++ b/ibis-server/poetry.lock @@ -1926,10 +1926,8 @@ description = "The portable Python dataframe library" optional = false python-versions = ">=3.9" groups = ["main"] -files = [ - {file = "ibis_framework-10.6.0-py3-none-any.whl", hash = "sha256:f33b73075710425548f474c60489ac33adee499fa71736a4583f77b21b6f7ab8"}, - {file = "ibis_framework-10.6.0.tar.gz", hash = "sha256:58dae8c926f4cef611e90e289ce8ff3ff00a06c1be32ecae4d9bede68f806f8a"}, -] +files = [] +develop = false [package.dependencies] atpublic = ">=2.3" @@ -1987,6 +1985,12 @@ sqlite = ["numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1)", "p trino = ["numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1)", "pyarrow-hotfix (>=0.4)", "rich (>=12.4.4)", "trino (>=0.321)"] visualization = ["graphviz (>=0.16)"] +[package.source] +type = "git" +url = "https://github.com/Canner/ibis.git" +reference = "canner/10.6.1" +resolved_reference = "0a73105444f206ab92d35b3b8523faf0f0c60051" + [[package]] name = "identify" version = "2.6.12" @@ -5568,4 +5572,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.1" python-versions = ">=3.11,<3.12" -content-hash = "4db2887ccc9749740cb32856011fbba5d8f38a2f96c889f632585fc457b9ca4e" +content-hash = "510f68190b37a75fdda435c780f810f35f981dc7c906634e969a818d28c4f3fa"