Skip to content
This repository was archived by the owner on May 7, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ibis-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ jobs:
sudo ACCEPT_EULA=Y apt-get -y install unixodbc-dev msodbcsql18
- name: Install dependencies
run: |
just install
just install --with dev
- name: Run tests
Comment thread
goldmedal marked this conversation as resolved.
env:
WREN_ENGINE_ENDPOINT: http://localhost:8080
Expand Down
26 changes: 22 additions & 4 deletions ibis-server/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,37 @@ RUN apt-get -y install libpq-dev \

ENV VIRTUAL_ENV=/app/.venv \
PATH="/app/.venv/bin:$PATH" \
PYTHONPATH="/app:$PYTHONPATH" \
REMOTE_FUNCTION_LIST_PATH=/resources/function_list
Comment thread
goldmedal marked this conversation as resolved.

# Set working directory before copying files
WORKDIR /app

COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV}
COPY app app
COPY resources resources
COPY app ./app/
COPY resources ./resources/
COPY notebooks ./notebooks/
COPY wren/ ./wren/

# Copy pyproject.toml and poetry.lock for poetry to work
COPY pyproject.toml poetry.lock ./

# Install poetry in runtime stage
RUN pip install poetry==1.8.3

# Install jupyter dependencies
RUN poetry install --with jupyter
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

# Install Opentelemetry zero-instrumentation python
RUN pip install opentelemetry-distro opentelemetry-exporter-otlp \
&& opentelemetry-bootstrap -a install

# Generate jupyter lab config
RUN jupyter lab --generate-config

Comment thread
coderabbitai[bot] marked this conversation as resolved.
COPY entrypoint.sh ./
RUN chmod +x ./entrypoint.sh

EXPOSE 8000
EXPOSE 8000 8888

CMD ["./entrypoint.sh"]
ENTRYPOINT ["./entrypoint.sh"]
37 changes: 35 additions & 2 deletions ibis-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ This module is the API server of Wren Engine. It's built on top of [FastAPI](htt

## Quick Start

### Running on Docker
### Running Ibis Server on Docker
You can follow the steps below to run the Java engine and ibis.
> Wren Engine is migrating to [wren-core](../wren-core/). However, we still recommend starting [the Java engine](../wren-core-legacy/) to enable the query fallback mechanism.

Expand Down Expand Up @@ -41,7 +41,7 @@ docker compose up
Set up [OpenTelemetry Envrionment Variables](docs/development#environment-variable) to enable tracing log.
See [Tracing with Jaeger](#tracing-with-jaeger) to start up a Jaeger Server.

### Running on Local
### Running Ibis Server on Local
Requirements:
- Python 3.11
- [casey/just](https://github.com/casey/just)
Expand Down Expand Up @@ -74,6 +74,39 @@ Run the server
just run
```

### Start with Python Interactive Mode
Install the dependencies
```bash
just install
```
Launch a CLI with an active Wren session using the following command:
```
python -m wren local_file <mdl_path> <connection_info_path>
```
This will create an interactive CLI environment with a `wren.session.Context` instance for querying your database.
```
Session created: Context(id=1352f5de-a8a7-4342-b2cf-015dbb2bba4f, data_source=local_file)
You can now interact with the Wren session using the 'wren' variable:
> task = wren.sql('SELECT * FROM your_table').execute()
> print(task.results)
> print(task.formatted_result())
Python 3.11.11 (main, Dec 3 2024, 17:20:40) [Clang 16.0.0 (clang-1600.0.26.4)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>>
```

### Start with Jupyter Notebook
Launch a Jupyter notebook server with Wren engine dependencies using Docker:
```
docker run --rm -p 8888:8888 ghcr.io/canner/wren-engine-ibis:latest jupyter
```
Explore the demo notebooks to learn how to use the Wren session context:
```
http://localhost:8888/lab/doc/tree/notebooks/demo.ipynb
```


### Enable Tracing
We use OpenTelemetry as its tracing framework. Refer to OpenTelemetry zero-code instrumentation to install the required dependencies.
Then, use the following just command to start the Ibis server, which exports tracing logs to the console:
Expand Down
8 changes: 2 additions & 6 deletions ibis-server/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,9 @@ def __init__(self):
load_dotenv(override=True)
self.wren_engine_endpoint = os.getenv("WREN_ENGINE_ENDPOINT")
self.remote_function_list_path = os.getenv("REMOTE_FUNCTION_LIST_PATH")
self.validate_wren_engine_endpoint(self.wren_engine_endpoint)
self.diagnose = False
self.init_logger()

@staticmethod
def validate_wren_engine_endpoint(endpoint):
if endpoint is None:
raise ValueError("WREN_ENGINE_ENDPOINT is not set")

@staticmethod
def init_logger():
logger.remove()
Expand All @@ -39,6 +33,7 @@ def init_logger():
diagnose=False,
enqueue=True,
)
logger.configure(extra={"correlation_id": "no-correlation"})

@staticmethod
def logger_diagnose():
Expand All @@ -50,6 +45,7 @@ def logger_diagnose():
diagnose=True,
enqueue=True,
)
logger.configure(extra={"correlation_id": "no-correlation"})

def update(self, diagnose: bool):
self.diagnose = diagnose
Expand Down
34 changes: 24 additions & 10 deletions ibis-server/app/mdl/java_engine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import anyio
import httpcore
import httpx
from loguru import logger
from orjson import orjson

from app.config import get_config
Expand All @@ -9,16 +10,27 @@


class JavaEngineConnector:
def __init__(self):
self.client = httpx.AsyncClient(
base_url=wren_engine_endpoint,
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
)
def __init__(self, end_point: str | None = None):
if end_point is None and wren_engine_endpoint is None:
logger.warning(
"WREN_ENGINE_ENDPOINT is not set. The v2 MDL endpoint and the fallback will not be available."
)
self.client = None
else:
self.client = httpx.AsyncClient(
base_url=end_point or wren_engine_endpoint,
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
)

async def dry_plan(self, manifest_str: str, sql: str):
if self.client is None:
raise ValueError(
"WREN_ENGINE_ENDPOINT is not set. Cannot call dry_plan without a valid endpoint."
)

r = await self.client.request(
method="GET",
url="/v2/mdl/dry-plan",
Expand All @@ -41,10 +53,12 @@ async def _warmup(self, timeout=30):
await anyio.sleep(1)

async def close(self):
await self.client.aclose()
if self.client is not None:
await self.client.aclose()

async def __aenter__(self):
await self._warmup()
if self.client is not None:
await self._warmup()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
Expand Down
13 changes: 13 additions & 0 deletions ibis-server/app/mdl/rewriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,19 @@ async def rewrite(
except Exception as e:
raise RewriteError(str(e))

@tracer.start_as_current_span("embedded_rewrite", kind=trace.SpanKind.INTERNAL)
def rewrite_sync(
self, manifest_str: str, sql: str, properties: dict | None = None
) -> str:
try:
processed_properties = self.get_session_properties(properties)
session_context = get_session_context(
manifest_str, self.function_path, processed_properties
)
return session_context.transform_sql(sql)
except Exception as e:
raise RewriteError(str(e))

def get_session_properties(self, properties: dict) -> frozenset | None:
if properties is None:
return None
Expand Down
16 changes: 10 additions & 6 deletions ibis-server/app/model/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(self, data_source: DataSource, connection_info: ConnectionInfo):
else:
self._connector = SimpleConnector(data_source, connection_info)

def query(self, sql: str, limit: int) -> pa.Table:
def query(self, sql: str, limit: int | None = None) -> pa.Table:
return self._connector.query(sql, limit)

def dry_run(self, sql: str) -> None:
Expand All @@ -78,8 +78,10 @@ def __init__(self, data_source: DataSource, connection_info: ConnectionInfo):
self.connection = self.data_source.get_connection(connection_info)

@tracer.start_as_current_span("connector_query", kind=trace.SpanKind.CLIENT)
def query(self, sql: str, limit: int) -> pa.Table:
ibis_table = self.connection.sql(sql).limit(limit)
def query(self, sql: str, limit: int | None = None) -> pa.Table:
ibis_table = self.connection.sql(sql)
if limit is not None:
ibis_table = ibis_table.limit(limit)
ibis_table = round_decimal_columns(ibis_table)
return ibis_table.to_pyarrow()

Expand Down Expand Up @@ -120,10 +122,12 @@ def __init__(self, connection_info: ConnectionInfo):
self.connection = DataSource.canner.get_connection(connection_info)

@tracer.start_as_current_span("connector_query", kind=trace.SpanKind.CLIENT)
def query(self, sql: str, limit: int) -> pa.Table:
def query(self, sql: str, limit: int | None = None) -> pa.Table:
# Canner enterprise does not support `CREATE TEMPORARY VIEW` for getting schema
schema = self._get_schema(sql)
ibis_table = self.connection.sql(sql, schema=schema).limit(limit)
ibis_table = self.connection.sql(sql, schema=schema)
if limit is not None:
ibis_table = ibis_table.limit(limit)
ibis_table = round_decimal_columns(ibis_table)
return ibis_table.to_pyarrow()

Expand Down Expand Up @@ -153,7 +157,7 @@ def __init__(self, connection_info: ConnectionInfo):
super().__init__(DataSource.bigquery, connection_info)
self.connection_info = connection_info

def query(self, sql: str, limit: int) -> pa.Table:
def query(self, sql: str, limit: int | None = None) -> pa.Table:
try:
return super().query(sql, limit)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
except ValueError as e:
Expand Down
43 changes: 43 additions & 0 deletions ibis-server/app/model/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
CannerConnectionInfo,
ClickHouseConnectionInfo,
ConnectionInfo,
GcsFileConnectionInfo,
LocalFileConnectionInfo,
MinioFileConnectionInfo,
MSSqlConnectionInfo,
MySqlConnectionInfo,
OracleConnectionInfo,
Expand All @@ -35,6 +38,8 @@
QueryS3FileDTO,
QuerySnowflakeDTO,
QueryTrinoDTO,
RedshiftConnectionInfo,
S3FileConnectionInfo,
SnowflakeConnectionInfo,
SSLMode,
TrinoConnectionInfo,
Expand Down Expand Up @@ -70,6 +75,44 @@ def get_dto_type(self):
except KeyError:
raise NotImplementedError(f"Unsupported data source: {self}")

def get_connection_info(self, data: dict) -> ConnectionInfo:
"""Build a ConnectionInfo object from the provided data."""
match self:
case DataSource.athena:
return AthenaConnectionInfo.model_validate(data)
case DataSource.bigquery:
return BigQueryConnectionInfo.model_validate(data)
case DataSource.canner:
return CannerConnectionInfo.model_validate(data)
case DataSource.clickhouse:
return ClickHouseConnectionInfo.model_validate(data)
case DataSource.mssql:
return MSSqlConnectionInfo.model_validate(data)
case DataSource.mysql:
return MySqlConnectionInfo.model_validate(data)
case DataSource.oracle:
return OracleConnectionInfo.model_validate(data)
case DataSource.postgres:
return PostgresConnectionInfo.model_validate(data)
case DataSource.redshift:
if "redshift_type" in data and data["redshift_type"] == "redshift_iam":
return RedshiftConnectionInfo.model_validate(data)
return RedshiftConnectionInfo.model_validate(data)
case DataSource.snowflake:
return SnowflakeConnectionInfo.model_validate(data)
case DataSource.trino:
return TrinoConnectionInfo.model_validate(data)
case DataSource.local_file:
return LocalFileConnectionInfo.model_validate(data)
case DataSource.s3_file:
return S3FileConnectionInfo.model_validate(data)
case DataSource.minio_file:
return MinioFileConnectionInfo.model_validate(data)
case DataSource.gcs_file:
return GcsFileConnectionInfo.model_validate(data)
case _:
raise NotImplementedError(f"Unsupported data source: {self}")


class DataSourceExtension(Enum):
athena = QueryAthenaDTO
Expand Down
26 changes: 21 additions & 5 deletions ibis-server/app/routers/v3/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,11 @@ async def query(
)
# because the v2 API doesn't support row-level access control,
# we don't fallback to v2 if the header include row-level access control properties.
if is_fallback_disable or exist_wren_variables_header(headers):
if (
java_engine_connector.client is None
or is_fallback_disable
or exist_wren_variables_header(headers)
):
raise e

logger.warning(
Expand Down Expand Up @@ -210,7 +214,11 @@ async def dry_plan(
)
# because the v2 API doesn't support row-level access control,
# we don't fallback to v2 if the header include row-level access control properties.
if is_fallback_disable or exist_wren_variables_header(headers):
if (
java_engine_connector.client is None
or is_fallback_disable
or exist_wren_variables_header(headers)
):
raise e

logger.warning(
Expand Down Expand Up @@ -254,7 +262,11 @@ async def dry_plan_for_data_source(
)
# because the v2 API doesn't support row-level access control,
# we don't fallback to v2 if the header include row-level access control properties.
if is_fallback_disable or exist_wren_variables_header(headers):
if (
java_engine_connector.client is None
or is_fallback_disable
or exist_wren_variables_header(headers)
):
raise e

logger.warning(
Expand Down Expand Up @@ -305,7 +317,11 @@ async def validate(
)
# because the v2 API doesn't support row-level access control,
# we don't fallback to v2 if the header include row-level access control properties.
if is_fallback_disable or exist_wren_variables_header(headers):
if (
java_engine_connector.client is None
or is_fallback_disable
or exist_wren_variables_header(headers)
):
raise e

logger.warning(
Expand Down Expand Up @@ -373,7 +389,7 @@ async def model_substitute(
headers.get(X_WREN_FALLBACK_DISABLE)
and safe_strtobool(headers.get(X_WREN_FALLBACK_DISABLE, "false"))
)
if is_fallback_disable:
if java_engine_connector.client is None or is_fallback_disable:
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
raise e

logger.warning(
Expand Down
Loading