Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ibis-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ jobs:
sudo ACCEPT_EULA=Y apt-get -y install unixodbc-dev msodbcsql18
- name: Install dependencies
run: |
just install
just install --with dev
- name: Run tests
env:
WREN_ENGINE_ENDPOINT: http://localhost:8080
Expand Down
29 changes: 25 additions & 4 deletions ibis-server/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,40 @@ RUN apt-get -y install libpq-dev \

ENV VIRTUAL_ENV=/app/.venv \
PATH="/app/.venv/bin:$PATH" \
PYTHONPATH="/app" \
REMOTE_FUNCTION_LIST_PATH=/resources/function_list

# Set working directory before copying files
WORKDIR /app

COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV}
COPY app app
COPY resources resources
COPY app ./app/
COPY resources ./resources/
COPY notebooks ./notebooks/
COPY wren/ ./wren/

# Copy pyproject.toml and poetry.lock for poetry to work
COPY pyproject.toml poetry.lock ./

# Install poetry in runtime stage
RUN pip install poetry==1.8.3

# Install jupyter dependencies
RUN poetry install --without dev --with jupyter

# runtime
COPY --from=builder /app/.venv ${VIRTUAL_ENV}

# Install Opentelemetry zero-instrumentation python
RUN pip install opentelemetry-distro opentelemetry-exporter-otlp \
&& opentelemetry-bootstrap -a install

# Generate jupyter lab config
RUN jupyter lab --generate-config --allow-root

COPY entrypoint.sh ./
RUN chmod +x ./entrypoint.sh

EXPOSE 8000
EXPOSE 8000 8888

CMD ["./entrypoint.sh"]
ENTRYPOINT ["./entrypoint.sh"]
37 changes: 35 additions & 2 deletions ibis-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ This module is the API server of Wren Engine. It's built on top of [FastAPI](htt

## Quick Start

### Running on Docker
### Running Ibis Server on Docker
You can follow the steps below to run the Java engine and ibis.
> Wren Engine is migrating to [wren-core](../wren-core/). However, we still recommend starting [the Java engine](../wren-core-legacy/) to enable the query fallback mechanism.

Expand Down Expand Up @@ -41,7 +41,7 @@ docker compose up
Set up [OpenTelemetry Envrionment Variables](docs/development#environment-variable) to enable tracing log.
See [Tracing with Jaeger](#tracing-with-jaeger) to start up a Jaeger Server.

### Running on Local
### Running Ibis Server on Local
Requirements:
- Python 3.11
- [casey/just](https://github.com/casey/just)
Expand Down Expand Up @@ -74,6 +74,39 @@ Run the server
just run
```

### Start with Python Interactive Mode
Install the dependencies
```bash
just install
```
Launch a CLI with an active Wren session using the following command:
```
python -m wren local_file <mdl_path> <connection_info_path>
```
This will create an interactive CLI environment with a `wren.session.Context` instance for querying your database.
```
Session created: Context(id=1352f5de-a8a7-4342-b2cf-015dbb2bba4f, data_source=local_file)
You can now interact with the Wren session using the 'wren' variable:
> task = wren.sql('SELECT * FROM your_table').execute()
> print(task.results)
> print(task.formatted_result())
Python 3.11.11 (main, Dec 3 2024, 17:20:40) [Clang 16.0.0 (clang-1600.0.26.4)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>>
```

### Start with Jupyter Notebook
Launch a Jupyter notebook server with Wren engine dependencies using Docker:
```
docker run --rm -p 8888:8888 ghcr.io/canner/wren-engine-ibis:latest jupyter
```
Explore the demo notebooks to learn how to use the Wren session context:
```
http://localhost:8888/lab/doc/tree/notebooks/demo.ipynb
```


### Enable Tracing
We use OpenTelemetry as its tracing framework. Refer to OpenTelemetry zero-code instrumentation to install the required dependencies.
Then, use the following just command to start the Ibis server, which exports tracing logs to the console:
Expand Down
8 changes: 2 additions & 6 deletions ibis-server/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,9 @@ def __init__(self):
load_dotenv(override=True)
self.wren_engine_endpoint = os.getenv("WREN_ENGINE_ENDPOINT")
self.remote_function_list_path = os.getenv("REMOTE_FUNCTION_LIST_PATH")
self.validate_wren_engine_endpoint(self.wren_engine_endpoint)
self.diagnose = False
self.init_logger()

@staticmethod
def validate_wren_engine_endpoint(endpoint):
if endpoint is None:
raise ValueError("WREN_ENGINE_ENDPOINT is not set")

@staticmethod
def init_logger():
logger.remove()
Expand All @@ -39,6 +33,7 @@ def init_logger():
diagnose=False,
enqueue=True,
)
logger.configure(extra={"correlation_id": "no-correlation"})

@staticmethod
def logger_diagnose():
Expand All @@ -50,6 +45,7 @@ def logger_diagnose():
diagnose=True,
enqueue=True,
)
logger.configure(extra={"correlation_id": "no-correlation"})

def update(self, diagnose: bool):
self.diagnose = diagnose
Expand Down
34 changes: 24 additions & 10 deletions ibis-server/app/mdl/java_engine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import anyio
import httpcore
import httpx
from loguru import logger
from orjson import orjson

from app.config import get_config
Expand All @@ -9,16 +10,27 @@


class JavaEngineConnector:
def __init__(self):
self.client = httpx.AsyncClient(
base_url=wren_engine_endpoint,
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
)
def __init__(self, end_point: str | None = None):
if end_point is None and wren_engine_endpoint is None:
logger.warning(
"WREN_ENGINE_ENDPOINT is not set. The v2 MDL endpoint and the fallback will not be available."
)
self.client = None
else:
self.client = httpx.AsyncClient(
base_url=end_point or wren_engine_endpoint,
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
)

async def dry_plan(self, manifest_str: str, sql: str):
if self.client is None:
raise ValueError(
"WREN_ENGINE_ENDPOINT is not set. Cannot call dry_plan without a valid endpoint."
)

r = await self.client.request(
method="GET",
url="/v2/mdl/dry-plan",
Expand All @@ -41,10 +53,12 @@ async def _warmup(self, timeout=30):
await anyio.sleep(1)

async def close(self):
await self.client.aclose()
if self.client is not None:
await self.client.aclose()

async def __aenter__(self):
await self._warmup()
if self.client is not None:
await self._warmup()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
Expand Down
13 changes: 13 additions & 0 deletions ibis-server/app/mdl/rewriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,19 @@ async def rewrite(
except Exception as e:
raise RewriteError(str(e))

@tracer.start_as_current_span("embedded_rewrite", kind=trace.SpanKind.INTERNAL)
def rewrite_sync(
self, manifest_str: str, sql: str, properties: dict | None = None
) -> str:
try:
processed_properties = self.get_session_properties(properties)
session_context = get_session_context(
manifest_str, self.function_path, processed_properties
)
return session_context.transform_sql(sql)
except Exception as e:
raise RewriteError(str(e))

def get_session_properties(self, properties: dict) -> frozenset | None:
if properties is None:
return None
Expand Down
22 changes: 14 additions & 8 deletions ibis-server/app/model/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(self, data_source: DataSource, connection_info: ConnectionInfo):
else:
self._connector = SimpleConnector(data_source, connection_info)

def query(self, sql: str, limit: int) -> pa.Table:
def query(self, sql: str, limit: int | None = None) -> pa.Table:
return self._connector.query(sql, limit)

def dry_run(self, sql: str) -> None:
Expand All @@ -78,8 +78,10 @@ def __init__(self, data_source: DataSource, connection_info: ConnectionInfo):
self.connection = self.data_source.get_connection(connection_info)

@tracer.start_as_current_span("connector_query", kind=trace.SpanKind.CLIENT)
def query(self, sql: str, limit: int) -> pa.Table:
ibis_table = self.connection.sql(sql).limit(limit)
def query(self, sql: str, limit: int | None = None) -> pa.Table:
ibis_table = self.connection.sql(sql)
if limit is not None:
ibis_table = ibis_table.limit(limit)
ibis_table = round_decimal_columns(ibis_table)
return ibis_table.to_pyarrow()

Expand Down Expand Up @@ -120,10 +122,12 @@ def __init__(self, connection_info: ConnectionInfo):
self.connection = DataSource.canner.get_connection(connection_info)

@tracer.start_as_current_span("connector_query", kind=trace.SpanKind.CLIENT)
def query(self, sql: str, limit: int) -> pa.Table:
def query(self, sql: str, limit: int | None = None) -> pa.Table:
# Canner enterprise does not support `CREATE TEMPORARY VIEW` for getting schema
schema = self._get_schema(sql)
ibis_table = self.connection.sql(sql, schema=schema).limit(limit)
ibis_table = self.connection.sql(sql, schema=schema)
if limit is not None:
ibis_table = ibis_table.limit(limit)
ibis_table = round_decimal_columns(ibis_table)
return ibis_table.to_pyarrow()

Expand Down Expand Up @@ -153,7 +157,7 @@ def __init__(self, connection_info: ConnectionInfo):
super().__init__(DataSource.bigquery, connection_info)
self.connection_info = connection_info

def query(self, sql: str, limit: int) -> pa.Table:
def query(self, sql: str, limit: int | None = None) -> pa.Table:
try:
return super().query(sql, limit)
except ValueError as e:
Expand Down Expand Up @@ -299,12 +303,14 @@ def __init__(self, connection_info: RedshiftConnectionUnion):
raise ValueError("Invalid Redshift connection_info type")

@tracer.start_as_current_span("connector_query", kind=trace.SpanKind.CLIENT)
def query(self, sql: str, limit: int) -> pa.Table:
def query(self, sql: str, limit: int | None = None) -> pa.Table:
with closing(self.connection.cursor()) as cursor:
cursor.execute(sql)
cols = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
df = pd.DataFrame(rows, columns=cols).head(limit)
df = pd.DataFrame(rows, columns=cols)
if limit is not None:
df = df.head(limit)
return pa.Table.from_pandas(df)

@tracer.start_as_current_span("connector_dry_run", kind=trace.SpanKind.CLIENT)
Expand Down
43 changes: 43 additions & 0 deletions ibis-server/app/model/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
CannerConnectionInfo,
ClickHouseConnectionInfo,
ConnectionInfo,
GcsFileConnectionInfo,
LocalFileConnectionInfo,
MinioFileConnectionInfo,
MSSqlConnectionInfo,
MySqlConnectionInfo,
OracleConnectionInfo,
Expand All @@ -35,6 +38,8 @@
QueryS3FileDTO,
QuerySnowflakeDTO,
QueryTrinoDTO,
RedshiftConnectionInfo,
S3FileConnectionInfo,
SnowflakeConnectionInfo,
SSLMode,
TrinoConnectionInfo,
Expand Down Expand Up @@ -70,6 +75,44 @@ def get_dto_type(self):
except KeyError:
raise NotImplementedError(f"Unsupported data source: {self}")

def get_connection_info(self, data: dict) -> ConnectionInfo:
"""Build a ConnectionInfo object from the provided data."""
match self:
case DataSource.athena:
return AthenaConnectionInfo.model_validate(data)
case DataSource.bigquery:
return BigQueryConnectionInfo.model_validate(data)
case DataSource.canner:
return CannerConnectionInfo.model_validate(data)
case DataSource.clickhouse:
return ClickHouseConnectionInfo.model_validate(data)
case DataSource.mssql:
return MSSqlConnectionInfo.model_validate(data)
case DataSource.mysql:
return MySqlConnectionInfo.model_validate(data)
case DataSource.oracle:
return OracleConnectionInfo.model_validate(data)
case DataSource.postgres:
return PostgresConnectionInfo.model_validate(data)
case DataSource.redshift:
if "redshift_type" in data and data["redshift_type"] == "redshift_iam":
return RedshiftConnectionInfo.model_validate(data)
return RedshiftConnectionInfo.model_validate(data)
case DataSource.snowflake:
return SnowflakeConnectionInfo.model_validate(data)
case DataSource.trino:
return TrinoConnectionInfo.model_validate(data)
case DataSource.local_file:
return LocalFileConnectionInfo.model_validate(data)
case DataSource.s3_file:
return S3FileConnectionInfo.model_validate(data)
case DataSource.minio_file:
return MinioFileConnectionInfo.model_validate(data)
case DataSource.gcs_file:
return GcsFileConnectionInfo.model_validate(data)
case _:
raise NotImplementedError(f"Unsupported data source: {self}")


class DataSourceExtension(Enum):
athena = QueryAthenaDTO
Expand Down
Loading