Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions .github/workflows/ibis-ci-spark.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
name: ibis CI (spark)
permissions:
contents: read
pull-requests: write

on:
pull_request:

concurrency:
group: ${{ github.workflow }}-${{ github.event.number }}
cancel-in-progress: true

defaults:
run:
working-directory: ibis-server

jobs:
ci:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Ruff check
uses: chartboost/ruff-action@v1
with:
src: './ibis-server'
args: 'format --check'

- uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: '21'
cache: 'maven'

- name: Start Wren JAVA engine
working-directory: ./wren-core-legacy
run: |
mkdir etc
echo "node.environment=production" >> etc/config.properties
echo "wren.directory=./etc/mdl" >> etc/config.properties
echo "wren.experimental-enable-dynamic-fields=true" >> etc/config.properties
./mvnw clean install -B -DskipTests -P exec-jar
java -Dconfig=etc/config.properties \
--add-opens=java.base/java.nio=ALL-UNNAMED \
-jar ./wren-server/target/wren-server-*-executable.jar &

- name: Start Spark cluster
working-directory: ./ibis-server/tests/routers/v3/connector/spark
run: |
docker compose up -d --build
echo "Waiting for Spark Connect to be ready..."
timeout 180 bash -c 'until docker logs spark-connect 2>&1 | grep -q "Spark Connect server started"; do echo "Waiting..."; sleep 5; done' || (echo "Timeout waiting for Spark"; docker compose logs; exit 1)
echo "Spark Connect is ready!"

- name: Verify Spark cluster status
run: |
docker ps
echo "Checking Spark Connect logs:"
docker logs spark-connect --tail 50

- name: Install poetry
run: pipx install poetry

- uses: actions/setup-python@v5
with:
python-version-file: ./ibis-server/pyproject.toml
cache: 'poetry'

- uses: extractions/setup-just@v2

- name: Cache Cargo
uses: actions/cache@v4
with:
path: |
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
wren-core-py/target/
key: ${{ runner.os }}-cargo-${{ hashFiles('wren-core-py/Cargo.lock') }}

- name: Install dependencies
run: |
just install --with dev

- name: Run Spark tests
env:
WREN_ENGINE_ENDPOINT: http://localhost:8080
run: poetry run pytest -m "spark" -v

- name: Show Spark logs on failure
if: failure()
run: |
echo "=== Spark Connect logs ==="
docker logs spark-connect
echo "=== Spark Master logs ==="
docker logs spark-master
echo "=== Spark Worker logs ==="
docker logs spark-worker

- name: Cleanup Spark cluster
if: always()
working-directory: ./ibis-server/tests/routers/v3/connector/spark
run: docker compose down -v
41 changes: 41 additions & 0 deletions ibis-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,47 @@ Run the server
just run
```


### Running Spark Tests Locally
Spark-related tests require a local Spark cluster with Spark Connect enabled.
Our GitHub CI already handles this automatically, but you must start Spark manually when running tests locally.

Prerequisites

- Docker & Docker Compose
- Python dependencies installed (just install)
- Java engine running (same as other integration tests)

1. Start the Spark Cluster
From the ibis-server directory:
```
cd tests/routers/v3/connector/spark
docker compose up -d --build
```
Wait until Spark Connect is ready. You should see this in the logs:
```
docker logs spark-connect
# Spark Connect server started
```
- Spark Master: spark://localhost:7077
- Spark Connect: localhost:15002

2. Run Spark Tests
Go back to the ibis-server directory and run:
```
just test spark
```
⚠️ Spark tests will fail if the Spark cluster is not running.

3. Stop the Spark Cluster (Cleanup)
After tests finish:
```
cd tests/routers/v3/connector/spark
docker compose down -v
```



### Start with Python Interactive Mode
Install the dependencies
```bash
Expand Down
13 changes: 13 additions & 0 deletions ibis-server/app/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ class QueryDatabricksDTO(QueryDTO):
connection_info: DatabricksConnectionUnion = connection_info_field


class QuerySparkDTO(QueryDTO):
connection_info: SparkConnectionInfo = connection_info_field


class QueryTrinoDTO(QueryDTO):
connection_info: ConnectionUrl | TrinoConnectionInfo = connection_info_field

Expand Down Expand Up @@ -447,6 +451,14 @@ class SnowflakeConnectionInfo(BaseConnectionInfo):
)


class SparkConnectionInfo(BaseConnectionInfo):
host: SecretStr = Field(
description="Spark Connect server hostname",
examples=["localhost", "spark-connect.mycompany.internal"],
)
port: SecretStr = Field(description="the port of your spark connect server")


class DatabricksTokenConnectionInfo(BaseConnectionInfo):
databricks_type: Literal["token"] = "token"
server_hostname: SecretStr = Field(
Expand Down Expand Up @@ -629,6 +641,7 @@ class GcsFileConnectionInfo(BaseConnectionInfo):
| RedshiftConnectionInfo
| RedshiftIAMConnectionInfo
| SnowflakeConnectionInfo
| SparkConnectionInfo
| DatabricksTokenConnectionInfo
| TrinoConnectionInfo
| LocalFileConnectionInfo
Expand Down
57 changes: 57 additions & 0 deletions ibis-server/app/model/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class ClickHouseDbError(Exception):
from ibis.expr.types import Table
from loguru import logger
from opentelemetry import trace
from pyspark.sql import SparkSession
from sqlglot import exp, parse_one

from app.model import (
Expand All @@ -54,6 +55,7 @@ class ClickHouseDbError(Exception):
RedshiftConnectionUnion,
RedshiftIAMConnectionInfo,
S3FileConnectionInfo,
SparkConnectionInfo,
)
from app.model.data_source import DataSource
from app.model.error import (
Expand Down Expand Up @@ -96,6 +98,8 @@ def __init__(self, data_source: DataSource, connection_info: ConnectionInfo):
self._connector = RedshiftConnector(connection_info)
elif data_source == DataSource.postgres:
self._connector = PostgresConnector(connection_info)
elif data_source == DataSource.spark:
self._connector = SparkConnector(connection_info)
elif data_source == DataSource.databricks:
self._connector = DatabricksConnector(connection_info)
else:
Expand Down Expand Up @@ -686,6 +690,59 @@ def close(self) -> None:
logger.warning(f"Error closing Redshift connection: {e}")


class SparkConnector(ConnectorABC):
def __init__(self, connection_info: SparkConnectionInfo):
self.connection_info = connection_info
self.connection = self._create_session()
self._closed = False

def _create_session(self) -> SparkSession:
host = self.connection_info.host.get_secret_value()
port = self.connection_info.port.get_secret_value()

# Spark Connect endpoint
endpoint = f"sc://{host}:{port}"

builder = SparkSession.builder.remote(endpoint).appName("wren-spark-connect")

return builder.getOrCreate()

def query(self, sql: str, limit: int | None = None) -> pa.Table:
df = self.connection.sql(sql).toPandas()
# SPARK-54068 workaround: Remove non-serializable attrs
# PyArrow >= 22.0.0 tries to serialize DataFrame.attrs to JSON
# but PlanMetrics objects aren't JSON serializable
if hasattr(df, "attrs") and df.attrs:
df.attrs = {
k: v
for k, v in df.attrs.items()
if k not in ("metrics", "observed_metrics")
}

# Convert to Arrow
arrow_table = pa.Table.from_pandas(df)

if limit is not None:
arrow_table = arrow_table.slice(0, limit)

return arrow_table

def dry_run(self, sql: str) -> None:
self.connection.sql(sql).limit(0).count()

def close(self) -> None:
if self._closed:
return

try:
# Spark Connect client-side cleanup
self.connection.stop()
except Exception:
pass
finally:
self._closed = True


class DatabricksConnector(ConnectorABC):
def __init__(self, connection_info: DatabricksConnectionUnion):
if isinstance(connection_info, DatabricksTokenConnectionInfo):
Expand Down
8 changes: 7 additions & 1 deletion ibis-server/app/model/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,13 @@
QueryRedshiftDTO,
QueryS3FileDTO,
QuerySnowflakeDTO,
QuerySparkDTO,
QueryTrinoDTO,
RedshiftConnectionInfo,
RedshiftIAMConnectionInfo,
S3FileConnectionInfo,
SnowflakeConnectionInfo,
SparkConnectionInfo,
SSLMode,
TrinoConnectionInfo,
)
Expand All @@ -76,6 +78,7 @@ class DataSource(StrEnum):
s3_file = auto()
minio_file = auto()
gcs_file = auto()
spark = auto()
databricks = auto()

def get_connection(self, info: ConnectionInfo) -> BaseBackend:
Expand Down Expand Up @@ -184,6 +187,8 @@ def _build_connection_info(self, data: dict) -> ConnectionInfo:
return MinioFileConnectionInfo.model_validate(data)
case DataSource.gcs_file:
return GcsFileConnectionInfo.model_validate(data)
case DataSource.spark:
return SparkConnectionInfo.model_validate(data)
case DataSource.databricks:
if (
"databricks_type" in data
Expand Down Expand Up @@ -241,6 +246,7 @@ class DataSourceExtension(Enum):
minio_file = QueryMinioFileDTO
gcs_file = QueryGcsFileDTO
databricks = QueryDatabricksDTO
spark = QuerySparkDTO

def __init__(self, dto: QueryDTO):
self.dto = dto
Expand All @@ -250,7 +256,7 @@ def get_connection(self, info: ConnectionInfo) -> BaseBackend:
if hasattr(info, "connection_url"):
kwargs = info.kwargs if info.kwargs else {}
return ibis.connect(info.connection_url.get_secret_value(), **kwargs)
if self.name in {"local_file", "redshift"}:
if self.name in {"local_file", "redshift", "spark"}:
raise NotImplementedError(
f"{self.name} connection is not implemented to get ibis backend"
)
Expand Down
2 changes: 2 additions & 0 deletions ibis-server/app/model/metadata/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from app.model.metadata.postgres import PostgresMetadata
from app.model.metadata.redshift import RedshiftMetadata
from app.model.metadata.snowflake import SnowflakeMetadata
from app.model.metadata.spark import SparkMetadata
from app.model.metadata.trino import TrinoMetadata

mapping = {
Expand All @@ -37,6 +38,7 @@
DataSource.minio_file: MinioFileMetadata,
DataSource.gcs_file: GcsFileMetadata,
DataSource.databricks: DatabricksMetadata,
DataSource.spark: SparkMetadata,
}


Expand Down
Loading