feat(ibis): Add Spark connector #1398
Conversation
📝 WalkthroughWalkthroughAdds first-class Apache Spark SQL support: new Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Client
participant API as ibis-server API
participant Connector as SparkConnector
participant Spark as Spark Connect
participant Arrow as PyArrow/Pandas
Client->>API: POST /v2/connector/spark/query (manifest, sql, connectionInfo)
API->>Connector: execute(sql, limit?, dry_run?)
Connector->>Spark: session.sql(query).collect()/to_pandas()
Spark-->>Connector: DataFrame
alt dry_run
Connector-->>API: 204 No Content
else normal
Connector->>Arrow: df.to_arrow()/to_pandas()
Arrow-->>API: serialized result (columns, dtypes, rows)
API-->>Client: 200 OK with results
end
sequenceDiagram
autonumber
participant Client
participant API as ibis-server API
participant Metadata as SparkMetadata
participant Spark as Spark Connect
Client->>API: POST /v2/connector/spark/metadata/tables (connectionInfo)
API->>Metadata: get_table_list()
loop per catalog/schema/table
Metadata->>Spark: SHOW TABLES IN schema
Spark-->>Metadata: table rows
Metadata->>Spark: DESCRIBE TABLE schema.table
Spark-->>Metadata: column rows (name,type,comment)
Metadata->>Metadata: map types via SPARK_TYPE_MAPPING
end
Metadata-->>API: JSON list[Table]
API-->>Client: 200 OK with metadata
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 9
Fix all issues with AI Agents 🤖
In @ibis-server/app/model/connector.py:
- Line 44: Top-level import of SparkSession causes import errors when PySpark is
absent; move the import into the SparkConnector class (e.g., inside
SparkConnector.__init__ or the methods that use it) or wrap it in a try/except
exactly like clickhouse_connect handling, and add a runtime check in
SparkConnector.__init__ that raises a clear error if SparkSession cannot be
imported; update references to SparkSession in methods to use the locally
imported symbol so importing this module no longer fails when PySpark is not
installed.
- Around line 730-731: The dry_run method currently builds a lazy Spark query
via self.connection.sql(sql).limit(0) but never triggers execution/validation;
update dry_run to execute a terminal operation (for example call .collect() or
.count() on the result of self.connection.sql(sql).limit(0)) so Spark Connect
will validate table/column references and surface SQL errors immediately (use
the returned DataFrame from self.connection.sql(sql).limit(0) and invoke a
terminal action like collect() or count()).
In @ibis-server/app/model/metadata/spark.py:
- Around line 67-71: The DESCRIBE SQL is built by interpolating raw database and
table_name values which can lead to injection or syntax errors; update the code
that constructs columns_sql in spark.py (around the columns_sql / columns_table
usage) to safely quote identifiers by wrapping database and table_name in
backticks and escaping any backticks inside the names (replace ` with double
backticks) before interpolation so the generated query becomes DESCRIBE
`db`.`table` with properly escaped identifiers.
- Around line 145-147: The code treats the result of self.connector.query(sql)
as if it had to_dict(); instead convert the returned PyArrow Table to a pandas
DataFrame before calling to_dict, mirroring get_table_list(): replace df =
self.connector.query(sql) / res = df.to_dict(...) with temp =
self.connector.query(sql); df = temp.to_pandas(); res =
df.to_dict(orient="records") so the PyArrow Table is converted properly before
serialization.
- Around line 46-49: The Connector is being constructed with the string "spark"
but Connector.__init__ expects a DataSource enum; update SparkMetadata.__init__
to pass the appropriate enum member (e.g., DataSource.SPARK) instead of the
literal string and add the necessary import for DataSource at the top of the
module so the call becomes Connector(DataSource.SPARK, connection_info).
In @ibis-server/tests/routers/v3/connector/spark/conftest.py:
- Around line 73-77: Remove the unused pytest fixture named connection_url (the
function def connection_url(connection_info: dict[str, str]) that returns
f"sc://{info['host']}:{info['port']}") from the conftest module; delete the
entire fixture definition and any related unused references (e.g.,
connection_url) so tests no longer include an unused fixture dependency, and run
the test suite to confirm nothing else depends on connection_url.
- Around line 34-41: customer_pdf is loaded and sanitized but never used; either
remove the unused load or actually register it for tests. Fix by either deleting
the customer_pdf read_parquet and its attrs-cleaning lines (removing dead code),
or convert customer_pdf to a Spark DataFrame and persist/register it the same
way orders_pdf is handled (e.g., mirror the transformation used for orders_pdf
and call the same table/save routine). Update references to
file_path("resource/tpch/data/customer.parquet") and the variable customer_pdf
accordingly so there is no unused variable left.
In @ibis-server/tests/routers/v3/connector/spark/test_functions.py:
- Around line 64-69: Remove the redundant autouse fixture definition
set_remote_function_list_path from this test file because the same fixture is
already provided in conftest.py; locate and delete the local function named
set_remote_function_list_path (the pytest.fixture block that sets and yields
config.set_remote_function_list_path) so tests rely on the shared conftest.py
fixture and avoid conflicting autouse behavior.
In @ibis-server/tests/routers/v3/connector/spark/test_metadata.py:
- Line 1: The test defines v2_base_url = "/v2/connector/spark" which conflicts
with conftest.py's base_url = "/v3/connector/spark"; update the test to use the
v3 endpoint or import and reuse the shared base_url from conftest.py.
Specifically, replace v2_base_url with the correct "/v3/connector/spark" (or
remove the local definition and import base_url from conftest) so
functions/tests referencing v2_base_url (symbol: v2_base_url) use the consistent
v3 base URL used by conftest (symbol: base_url).
🧹 Nitpick comments (10)
.github/workflows/ibis-ci-spark.yml (1)
35-45: Consider adding a readiness check for the Wren engine.The Wren JAVA engine is started in the background, but there's no verification that it's ready before the tests begin. This could lead to race conditions where tests start before the engine is fully initialized.
🔎 Suggested readiness check
- name: Start Wren JAVA engine working-directory: ./wren-core-legacy run: | mkdir etc echo "node.environment=production" >> etc/config.properties echo "wren.directory=./etc/mdl" >> etc/config.properties echo "wren.experimental-enable-dynamic-fields=true" >> etc/config.properties ./mvnw clean install -B -DskipTests -P exec-jar java -Dconfig=etc/config.properties \ --add-opens=java.base/java.nio=ALL-UNNAMED \ -jar ./wren-server/target/wren-server-*-executable.jar & + echo "Waiting for Wren engine to be ready..." + timeout 60 bash -c 'until curl -sf http://localhost:8080/v1/health > /dev/null 2>&1; do echo "Waiting..."; sleep 2; done' || (echo "Timeout waiting for Wren engine"; exit 1) + echo "Wren engine is ready!"Note: Adjust the health check endpoint path if different from
/v1/health.ibis-server/tests/routers/v3/connector/spark/Dockerfile (1)
6-8: Consider reducing the overly permissive 777 permissions.The
chmod -R 777grants read, write, and execute permissions to all users. Even in a test environment, this is unnecessarily permissive and could mask potential permission-related issues.🔎 Suggested permission adjustment
# Create warehouse and work directories with proper permissions RUN mkdir -p /opt/spark/work/warehouse && \ chown -R spark:spark /opt/spark/work && \ - chmod -R 777 /opt/spark/work + chmod -R 775 /opt/spark/workThis maintains group write access while removing world write permissions, which should still be sufficient for the test environment.
ibis-server/tests/routers/v3/connector/spark/docker-compose.yml (1)
40-40: Consider documenting the Spark version choice.The Spark Connect package versions (
spark-connect_2.12:3.5.7andderby:10.14.2.0) are hard-coded. While this ensures reproducibility, consider adding a comment explaining the version choice or documenting it in the project's setup guide, especially if there are compatibility constraints with the PyArrow workaround mentioned in the PR description.ibis-server/app/model/connector.py (1)
733-743: Improve exception handling in close method.The close method uses a bare
except Exceptionthat suppresses all errors. While this prevents crashes during cleanup, it may hide legitimate issues. Consider logging the exception or at least being more specific about which exceptions to suppress.🔎 Proposed improvement
def close(self) -> None: if self._closed: return try: # Spark Connect client-side cleanup self.connection.stop() - except Exception: - pass + except Exception as e: + logger.warning(f"Error closing Spark connection: {e}") finally: self._closed = Trueibis-server/tests/routers/v3/connector/spark/test_query.py (1)
54-56: Consider adding pytest marker for Spark tests.If the test framework uses pytest markers to conditionally run tests (e.g.,
@pytest.mark.spark), you may want to add them to these test functions to allow selective execution when Spark infrastructure is available.ibis-server/tests/routers/v3/connector/spark/test_metadata.py (1)
69-72: Hardcoded Spark version makes test fragile.The assertion checks for
"3.5.7"which couples the test to a specific Spark version. When the Docker image or CI environment is updated, this test will fail. Consider asserting on the presence of "spark" (case-insensitive) or a version pattern instead.🔎 Proposed fix
# Verify it contains "Spark" in the version string version_data = response.json() version_str = version_data if isinstance(version_data, str) else str(version_data) - assert "3.5.7" in version_str.lower() + # Check for Spark version pattern (e.g., "3.x.x") rather than exact version + assert "spark" in version_str.lower() or any( + c.isdigit() for c in version_str + ), f"Expected version string, got: {version_str}"ibis-server/tests/routers/v3/connector/spark/test_functions.py (1)
72-100: Test manually modifies config that autouse fixture also manages.This test calls
config.set_remote_function_list_path(None)andconfig.set_remote_function_list_path(function_list_path)while the autouse fixture in conftest.py is also managing this state. This creates a race between the test's explicit calls and the fixture's setup/teardown. Consider disabling the autouse fixture for this specific test or documenting the intentional override.One approach is to use
pytest.mark.usefixturesto explicitly control fixture usage or restructure to avoid the conflict.ibis-server/tests/routers/v3/connector/spark/conftest.py (2)
52-59: Redundant exception handling in cleanup.The
except Exception as e: raise epattern is redundant—it catches an exception only to immediately re-raise it. Thefinallyblock will execute regardless.🔎 Proposed fix
def cleanup(): # Drop test tables - try: - spark.sql("DROP TABLE IF EXISTS default.orders") - except Exception as e: - raise e - finally: - spark.stop() + try: + spark.sql("DROP TABLE IF EXISTS default.orders") + finally: + spark.stop()
65-70: Fixture parameter used only for ordering dependency.The
spark_connectparameter ensures the Spark session is established before returning connection info. This is a valid pattern, but a brief comment would clarify the intent.@pytest.fixture(scope="module") -def connection_info(spark_connect: SparkSession) -> dict[str, str]: +def connection_info(spark_connect: SparkSession) -> dict[str, str]: # spark_connect ensures session is ready return {ibis-server/app/model/metadata/spark.py (1)
177-179: Missing empty result check in get_version.If the query returns an empty result (unlikely but possible),
df.column(0)[0]will raise an IndexError. Consider adding a guard.🔎 Proposed fix
def get_version(self) -> str: df = self.connector.query("SELECT version()") - return df.column(0)[0].as_py() + if df.num_rows == 0: + return "unknown" + return df.column(0)[0].as_py()
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
ibis-server/poetry.lockis excluded by!**/*.lockibis-server/resources/function_list/spark.csvis excluded by!**/*.csv
📒 Files selected for processing (16)
.github/workflows/ibis-ci-spark.ymlibis-server/app/model/__init__.pyibis-server/app/model/connector.pyibis-server/app/model/data_source.pyibis-server/app/model/metadata/factory.pyibis-server/app/model/metadata/spark.pyibis-server/pyproject.tomlibis-server/tests/routers/v3/connector/spark/Dockerfileibis-server/tests/routers/v3/connector/spark/__init__.pyibis-server/tests/routers/v3/connector/spark/conftest.pyibis-server/tests/routers/v3/connector/spark/docker-compose.ymlibis-server/tests/routers/v3/connector/spark/test_functions.pyibis-server/tests/routers/v3/connector/spark/test_metadata.pyibis-server/tests/routers/v3/connector/spark/test_query.pywren-core-base/manifest-macro/src/lib.rswren-core-base/src/mdl/manifest.rs
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-29T05:49:45.513Z
Learnt from: goldmedal
Repo: Canner/wren-engine PR: 1301
File: ibis-server/pyproject.toml:83-83
Timestamp: 2025-08-29T05:49:45.513Z
Learning: Polars dependency in ibis-server/pyproject.toml is correctly placed in dev dependencies because it's only used by helper tools in the tools/data_source/ directory, not in the main production application runtime.
Applied to files:
ibis-server/pyproject.toml
🧬 Code graph analysis (7)
ibis-server/tests/routers/v3/connector/spark/test_query.py (4)
wren-core-base/manifest-macro/src/lib.rs (1)
manifest(26-56)ibis-server/tests/routers/v3/connector/spark/test_functions.py (1)
manifest_str(60-61)ibis-server/tests/conftest.py (1)
client(18-23)ibis-server/tests/routers/v3/connector/spark/conftest.py (1)
connection_info(66-70)
ibis-server/app/model/metadata/factory.py (2)
ibis-server/app/model/metadata/spark.py (1)
SparkMetadata(46-218)ibis-server/app/model/data_source.py (1)
DataSource(65-229)
wren-core-base/src/mdl/manifest.rs (1)
ibis-server/app/model/data_source.py (1)
DataSource(65-229)
ibis-server/app/model/connector.py (2)
ibis-server/app/model/__init__.py (1)
SparkConnectionInfo(454-459)ibis-server/app/model/data_source.py (1)
DataSource(65-229)
ibis-server/app/model/metadata/spark.py (3)
ibis-server/app/model/connector.py (11)
Connector(81-185)query(108-141)query(190-191)query(209-214)query(330-335)query(452-459)query(538-539)query(573-581)query(670-678)query(710-728)query(774-783)ibis-server/app/model/metadata/dto.py (5)
Constraint(125-131)ConstraintType(119-122)RustWrenEngineColumnType(39-84)Table(106-111)TableProperties(96-103)wren-core-base/src/mdl/manifest.rs (1)
description(125-127)
ibis-server/app/model/data_source.py (1)
ibis-server/app/model/__init__.py (2)
QuerySparkDTO(76-77)SparkConnectionInfo(454-459)
ibis-server/tests/routers/v3/connector/spark/test_functions.py (4)
ibis-server/app/config.py (1)
get_config(103-104)ibis-server/tests/conftest.py (2)
file_path(10-11)client(18-23)ibis-server/tests/routers/v3/connector/spark/test_query.py (1)
manifest_str(55-56)ibis-server/tests/routers/v3/connector/spark/conftest.py (2)
set_remote_function_list_path(81-85)connection_info(66-70)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: ci
- GitHub Check: ci
🔇 Additional comments (24)
ibis-server/pyproject.toml (2)
106-106: LGTM!The spark marker is correctly added following the established pattern for other data source markers.
54-54: pyspark 3.5.7 is valid and secure.Version 3.5.7 exists on PyPI and is a maintenance release containing security and correctness fixes. No direct vulnerabilities are listed for this version on public security scanners.
wren-core-base/src/mdl/manifest.rs (1)
151-151: LGTM!The Spark variant is correctly integrated into both Display and FromStr implementations, following the exact pattern of existing data sources with case-insensitive parsing and uppercase display format.
Also applies to: 179-179
.github/workflows/ibis-ci-spark.yml (3)
47-53: LGTM!The Spark cluster startup includes proper timeout handling, readiness verification with log checks, and status verification steps. The 180-second timeout is reasonable for cluster initialization.
Also applies to: 55-59
86-99: LGTM!The test execution and failure handling are well-implemented. The WREN_ENGINE_ENDPOINT is correctly passed, and comprehensive log collection on failure provides good debugging support.
101-104: LGTM!The cleanup step correctly uses
if: always()to ensure the Spark cluster is torn down regardless of test outcome, preventing resource leaks in the CI environment.ibis-server/app/model/metadata/factory.py (1)
21-21: LGTM!SparkMetadata is correctly imported and mapped for DataSource.spark, following the established pattern used for all other data sources.
Also applies to: 41-41
ibis-server/tests/routers/v3/connector/spark/Dockerfile (2)
1-1: LGTM!The base image version 3.5.7 correctly matches the pyspark dependency version in pyproject.toml, ensuring compatibility between the client library and the Spark server.
3-3: LGTM!Switching to root to create directories with proper ownership, then switching back to the spark user follows Docker security best practices for running containers as non-root.
Also applies to: 10-10
ibis-server/app/model/__init__.py (1)
75-78: LGTM! Well-structured Spark connection models.The new
QuerySparkDTOandSparkConnectionInfoclasses follow the established patterns for other data sources. The connection info properly usesSecretStrfor sensitive fields and includes helpful descriptions.Also applies to: 454-460, 644-644
wren-core-base/manifest-macro/src/lib.rs (1)
113-114: LGTM! Spark variant properly added to DataSource enum.The new
Sparkvariant follows the established pattern with appropriate serde alias configuration.ibis-server/app/model/connector.py (1)
710-728: LGTM! SPARK-54068 workaround is properly implemented.The workaround for the PyArrow ≥22.0.0 serialization issue correctly filters out non-serializable
metricsandobserved_metricsattributes before converting to Arrow format. The implementation aligns with the known issue described in the PR objectives.ibis-server/tests/routers/v3/connector/spark/test_query.py (1)
59-179: LGTM! Comprehensive test coverage for Spark query endpoint.The test suite thoroughly covers:
- Successful query execution with data validation
- Limit parameter behavior
- Error handling for invalid manifests and missing fields
- Dry-run mode for query validation
- Invalid SQL detection in dry-run mode
All assertions properly validate response structure and status codes.
ibis-server/tests/routers/v3/connector/spark/test_metadata.py (2)
4-35: LGTM!The test comprehensively validates the table metadata structure including properties, columns, and type information.
37-58: LGTM!The test correctly handles both cases where constraints are present or absent, which aligns with the documented Spark/Hive limited constraint support.
ibis-server/app/model/data_source.py (5)
50-56: LGTM!The imports for
QuerySparkDTOandSparkConnectionInfoare correctly added alongside other data source types.
81-81: LGTM!The
sparkenum value is correctly added to theDataSourceenum.
190-191: LGTM!The Spark case correctly delegates to
SparkConnectionInfo.model_validate(data), following the established pattern for other data sources.
249-249: LGTM!
sparkis correctly mapped toQuerySparkDTOinDataSourceExtension.
259-262: LGTM!Spark is correctly added to the set of data sources that don't use the ibis backend directly, consistent with the
SparkConnectorimplementation inconnector.py.ibis-server/tests/routers/v3/connector/spark/test_functions.py (1)
103-136: LGTM!The scalar and aggregate function tests are well-structured with clear assertions on the expected response format.
ibis-server/tests/routers/v3/connector/spark/conftest.py (1)
25-31: LGTM!The Spark session setup with remote connection is correctly configured for the test environment.
ibis-server/app/model/metadata/spark.py (2)
17-43: LGTM!The type mapping is comprehensive and covers Spark's core data types with sensible mappings. The mapping of complex types (
map,struct) to JSON is a reasonable approximation.
195-217: LGTM!The type transformation correctly handles parameterized types by extracting the base type before lookup. The warning for unknown types is helpful for debugging.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @ibis-server/tests/routers/v3/connector/spark/conftest.py:
- Around line 34-41: Remove the dead code that loads and sanitizes customer_pdf:
delete the lines that assign customer_pdf = pd.read_parquet(...) and the
conditional block that clears customer_pdf.attrs; leave the orders_pdf loading
and its SPARK-54068 workaround intact so only the used DataFrame (orders_pdf) is
kept.
🧹 Nitpick comments (2)
ibis-server/tests/routers/v3/connector/spark/conftest.py (2)
52-59: Simplify exception handling in cleanup.The try-except-raise pattern adds no value since the exception is immediately re-raised. The
finallyblock will execute regardless, so the except block is unnecessary.🔎 Proposed fix
def cleanup(): # Drop test tables try: spark.sql("DROP TABLE IF EXISTS default.orders") - except Exception as e: - raise e finally: spark.stop()
65-70: Consider clarifying the dependency onspark_connect.The
spark_connectparameter ensures the Spark session is initialized before this fixture runs, but it's not actually used in the function body. While this is a valid pytest pattern for ordering fixtures, it might be clearer to either:
- Add a comment explaining the dependency is for ordering only, or
- Use
request.getfixturevalue('spark_connect')if explicit ordering is needed
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
ibis-server/app/model/connector.pyibis-server/app/model/metadata/spark.pyibis-server/tests/routers/v3/connector/spark/conftest.py
🧰 Additional context used
🧠 Learnings (4)
📚 Learning: 2025-01-07T03:56:21.741Z
Learnt from: goldmedal
Repo: Canner/wren-engine PR: 1029
File: ibis-server/app/model/metadata/object_storage.py:44-44
Timestamp: 2025-01-07T03:56:21.741Z
Learning: When working with DuckDB in Python, use `conn.execute("DESCRIBE SELECT * FROM table").fetchall()` to get column types instead of accessing DataFrame-style attributes like `dtype` or `dtypes`.
Applied to files:
ibis-server/app/model/metadata/spark.py
📚 Learning: 2025-06-18T02:23:34.040Z
Learnt from: goldmedal
Repo: Canner/wren-engine PR: 1224
File: ibis-server/app/util.py:49-56
Timestamp: 2025-06-18T02:23:34.040Z
Learning: DuckDB supports querying PyArrow Tables directly in SQL queries without needing to register them. When a pa.Table object is referenced in a FROM clause (e.g., "SELECT ... FROM df" where df is a pa.Table), DuckDB automatically handles the PyArrow object without requiring conn.register().
Applied to files:
ibis-server/app/model/metadata/spark.py
📚 Learning: 2025-06-18T02:23:34.040Z
Learnt from: goldmedal
Repo: Canner/wren-engine PR: 1224
File: ibis-server/app/util.py:49-56
Timestamp: 2025-06-18T02:23:34.040Z
Learning: DuckDB supports querying PyArrow Tables directly in SQL queries without needing to register them. When a pa.Table object is referenced in a FROM clause (e.g., "SELECT ... FROM df" where df is a pa.Table), DuckDB automatically handles the PyArrow object via its "replacement scan" mechanism that recognizes Python variables referencing Arrow objects as SQL tables. No conn.register() call is required.
Applied to files:
ibis-server/app/model/metadata/spark.py
📚 Learning: 2025-06-18T02:12:43.570Z
Learnt from: goldmedal
Repo: Canner/wren-engine PR: 1224
File: ibis-server/app/util.py:50-57
Timestamp: 2025-06-18T02:12:43.570Z
Learning: In the `to_json` function in `ibis-server/app/util.py`, the code intentionally uses `fetch_df()` to get a pandas DataFrame and then calls `to_dict(orient='split')` because this specific format is required for `orjson` serialization. The pandas conversion step is necessary to generate the correct dictionary structure for orjson.
Applied to files:
ibis-server/app/model/metadata/spark.py
🧬 Code graph analysis (2)
ibis-server/app/model/connector.py (3)
ibis-server/app/model/__init__.py (1)
SparkConnectionInfo(454-459)ibis-server/app/model/data_source.py (1)
DataSource(65-229)ibis-server/tests/routers/v3/connector/spark/conftest.py (1)
connection_info(66-70)
ibis-server/app/model/metadata/spark.py (4)
ibis-server/app/model/__init__.py (1)
SparkConnectionInfo(454-459)ibis-server/app/model/data_source.py (1)
DataSource(65-229)ibis-server/app/model/metadata/dto.py (5)
Constraint(125-131)ConstraintType(119-122)RustWrenEngineColumnType(39-84)Table(106-111)TableProperties(96-103)ibis-server/app/model/metadata/metadata.py (1)
Metadata(6-24)
🔇 Additional comments (12)
ibis-server/tests/routers/v3/connector/spark/conftest.py (3)
1-22: LGTM!The imports, module markers, and constants are appropriate for Spark test configuration.
15-19: LGTM!The pytest hook correctly applies the spark marker to all tests in this directory.
73-78: LGTM!The autouse fixture correctly sets up and tears down the remote function list path for tests.
ibis-server/app/model/connector.py (2)
101-102: LGTM! Factory wiring follows established pattern.The Spark data source integration into the connector factory is consistent with other data sources and correctly instantiates
SparkConnector.
693-743: Well-structured Spark Connect implementation.The
SparkConnectorclass properly:
- Establishes Spark Connect sessions with correct endpoint format (
sc://host:port)- Implements the SPARK-54068 workaround for PyArrow ≥22.0.0 serialization issues
- Executes validation via
.count()terminal operation indry_run- Handles resource cleanup with guarded shutdown
ibis-server/app/model/metadata/spark.py (7)
1-14: LGTM! All required imports are present.The imports include all necessary components for metadata extraction, type mapping, and constraint handling.
18-44: Comprehensive Spark type mapping.The
SPARK_TYPE_MAPPINGdictionary covers all standard Spark/Hive data types with appropriate mappings toRustWrenEngineColumnType. The choice to map complex types (map, struct) to JSON is reasonable for visualization purposes.
47-50: Correctly uses DataSource enum.The constructor now properly passes
DataSource.sparkinstead of a string literal, resolving the previous issue.
52-112: Robust table discovery implementation.The
get_table_listmethod:
- Properly handles different Spark versions (namespace vs database field)
- Uses backticked identifiers to prevent SQL injection (line 70)
- Filters system schemas appropriately
- Stops column parsing at partition/metadata sections
- Includes comprehensive error handling with logging
114-176: Well-documented constraint retrieval with proper fallbacks.The
get_constraintsimplementation:
- Documents Spark's limited constraint support clearly
- Uses standard information_schema queries
- Correctly converts PyArrow Table via
.to_pandas().to_dict()(line 148)- Handles unsupported deployments gracefully with try-except and empty result fallback
178-180: LGTM! Version query uses PyArrow API correctly.The version retrieval properly accesses the PyArrow Table via
.column(0)[0].as_py()to extract the Spark version string.
182-219: Well-implemented helper methods.The helper methods provide:
- Consistent table name formatting
- Generated constraint names from component identifiers
- Type transformation with parameterized type handling and unknown type logging
|
@douenergy it's better to have a document for how to run the Spark test locally. Currently, I can't run the test by |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @ibis-server/README.md:
- Around line 77-115: The fenced code blocks in the README.md Spark tests
section lack language specifiers and trigger markdown-lint MD040; update every
triple-backtick block in that section to include a shell/bash identifier (e.g.,
```bash) for the docker compose, docker logs, just test spark, and docker
compose down -v blocks so all code fences have proper language specifiers and
satisfy the linter.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
ibis-server/poetry.lockis excluded by!**/*.lock
📒 Files selected for processing (1)
ibis-server/README.md
🧰 Additional context used
🪛 markdownlint-cli2 (0.18.1)
ibis-server/README.md
90-90: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
95-95: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
104-104: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
111-111: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: ci
- GitHub Check: ci
- GitHub Check: ci
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In @ibis-server/app/model/metadata/spark.py:
- Around line 164-166: The get_version method currently extracts the value via
PyArrow calls (df.column(0)[0].as_py()); change it to use the same
DataFrame-to-pandas flow as other methods by converting df to pandas and
returning the first cell (e.g., use df.to_pandas().iloc[0, 0] or equivalent).
Locate get_version and replace the PyArrow extraction with a .to_pandas() call
on the query result so it matches other methods' behavior.
In @ibis-server/tests/routers/v3/connector/spark/conftest.py:
- Around line 50-57: The cleanup function currently re-raises any exception
caught while dropping test tables, which causes teardown to fail; modify cleanup
(the cleanup function that calls spark.sql and spark.stop) to not re-raise the
exception — instead log the exception (e.g., using the test logger or
logging.exception) and proceed to the finally block where spark.stop() is always
called so teardown never fails the test run.
🧹 Nitpick comments (8)
ibis-server/app/model/metadata/spark.py (5)
17-43: Consider adding array type mapping.The type mapping covers most common Spark types, but
arrayis notably absent. Arrays are common in Spark schemas and should map to a suitable type (possiblyJSONsimilar tomapandstruct).📝 Suggested addition
"map": RustWrenEngineColumnType.JSON, "struct": RustWrenEngineColumnType.JSON, + "array": RustWrenEngineColumnType.JSON, "interval": RustWrenEngineColumnType.INTERVAL, }
131-135: Consider adding debug logging for expected failures.While silent failure is appropriate here (as documented, many Spark catalogs don't support constraints), adding debug-level logging would help troubleshoot unexpected failures.
🔍 Suggested enhancement
try: df = self.connector.query(sql).to_pandas() - except Exception: + except Exception as e: # Expected for spark_catalog and many deployments + logger.debug( + f"Catalog {catalog} does not support information_schema constraints: {e}" + ) continue
185-212: Consider extracting nullability information.All columns are currently marked as
notNull=False. Spark's DESCRIBE output may include nullability information that could be parsed to provide accurate null constraints.#!/bin/bash # Description: Verify if Spark DESCRIBE output includes nullability info # Search for Spark documentation on DESCRIBE output formatSpark SQL DESCRIBE TABLE output format nullable column
228-239: Type parsing may not handle complex nested types correctly.Line 230 splits on
(and<to extract the base type, which works for simple parameterized types but may fail for complex nested structures likearray<struct<field1:int,field2:string>>. While the fallback to UNKNOWN with logging is safe, consider whether these complex types need special handling.This is acceptable for the initial implementation, but documenting this limitation would be helpful for future maintenance.
70-73: Unnecessary defensive column name checking.Spark's SHOW TABLES command consistently returns a column named
tableName(as documented in Spark 3.5.1 and 4.0.0). The fallback check for atablecolumn appears to be unnecessary defensive programming that adds complexity without clear justification. Consider simplifying totable_name = row.get("tableName")and documenting any specific reason if support for alternative column names is actually needed.ibis-server/tests/routers/v3/connector/spark/test_metadata.py (1)
64-75: Hard-coded version check creates maintenance burden.Line 75 checks for the specific version "3.5.7", which will break when the test environment's Spark version changes. Consider making this more flexible by checking for a version pattern or using an environment variable.
🔄 Suggested improvement
- # Verify it contains "Spark" in the version string + # Verify it returns a valid Spark version version_data = response.json() version_str = version_data if isinstance(version_data, str) else str(version_data) - assert "3.5.7" in version_str.lower() + # Check for Spark version pattern (e.g., "3.x.x") + import re + assert re.search(r'\d+\.\d+\.\d+', version_str), f"Expected version format, got: {version_str}"ibis-server/tests/routers/v3/connector/spark/conftest.py (2)
27-30: Hard-coded connection string limits flexibility.The Spark Connect URL
sc://localhost:15002is hard-coded. Consider using environment variables or configuration to allow different test environments (CI vs local development).🔧 Suggested improvement
+ import os + spark_host = os.getenv("SPARK_HOST", "localhost") + spark_port = os.getenv("SPARK_PORT", "15002") spark = ( - SparkSession.builder.remote("sc://localhost:15002") + SparkSession.builder.remote(f"sc://{spark_host}:{spark_port}") .appName("pytest-spark") .getOrCreate() )
41-42: Document the purpose of attrs clearing.The code clears
attrsfrom the pandas DataFrame, but the reason isn't documented. If this is related to the SPARK-54068 issue mentioned in the PR description (PyArrow ≥22.0.0 serialization issue), please add a comment explaining the workaround.if hasattr(orders_pdf, "attrs"): + # Clear attrs to avoid serialization issues with PyArrow orders_pdf.attrs = {}
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
ibis-server/app/model/metadata/spark.pyibis-server/tests/routers/v3/connector/spark/conftest.pyibis-server/tests/routers/v3/connector/spark/test_metadata.py
🧰 Additional context used
🧠 Learnings (4)
📚 Learning: 2025-01-07T03:56:21.741Z
Learnt from: goldmedal
Repo: Canner/wren-engine PR: 1029
File: ibis-server/app/model/metadata/object_storage.py:44-44
Timestamp: 2025-01-07T03:56:21.741Z
Learning: When working with DuckDB in Python, use `conn.execute("DESCRIBE SELECT * FROM table").fetchall()` to get column types instead of accessing DataFrame-style attributes like `dtype` or `dtypes`.
Applied to files:
ibis-server/app/model/metadata/spark.py
📚 Learning: 2025-06-18T02:23:34.040Z
Learnt from: goldmedal
Repo: Canner/wren-engine PR: 1224
File: ibis-server/app/util.py:49-56
Timestamp: 2025-06-18T02:23:34.040Z
Learning: DuckDB supports querying PyArrow Tables directly in SQL queries without needing to register them. When a pa.Table object is referenced in a FROM clause (e.g., "SELECT ... FROM df" where df is a pa.Table), DuckDB automatically handles the PyArrow object without requiring conn.register().
Applied to files:
ibis-server/app/model/metadata/spark.py
📚 Learning: 2025-06-18T02:23:34.040Z
Learnt from: goldmedal
Repo: Canner/wren-engine PR: 1224
File: ibis-server/app/util.py:49-56
Timestamp: 2025-06-18T02:23:34.040Z
Learning: DuckDB supports querying PyArrow Tables directly in SQL queries without needing to register them. When a pa.Table object is referenced in a FROM clause (e.g., "SELECT ... FROM df" where df is a pa.Table), DuckDB automatically handles the PyArrow object via its "replacement scan" mechanism that recognizes Python variables referencing Arrow objects as SQL tables. No conn.register() call is required.
Applied to files:
ibis-server/app/model/metadata/spark.py
📚 Learning: 2025-06-18T02:12:43.570Z
Learnt from: goldmedal
Repo: Canner/wren-engine PR: 1224
File: ibis-server/app/util.py:50-57
Timestamp: 2025-06-18T02:12:43.570Z
Learning: In the `to_json` function in `ibis-server/app/util.py`, the code intentionally uses `fetch_df()` to get a pandas DataFrame and then calls `to_dict(orient='split')` because this specific format is required for `orjson` serialization. The pandas conversion step is necessary to generate the correct dictionary structure for orjson.
Applied to files:
ibis-server/app/model/metadata/spark.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: ci
- GitHub Check: ci
- GitHub Check: ci
🔇 Additional comments (7)
ibis-server/app/model/metadata/spark.py (2)
47-49: LGTM!Constructor properly initializes the parent class and creates a Spark connector with the provided connection info.
172-183: LGTM!The catalog/schema/table discovery helpers are well-implemented with proper identifier quoting using backticks to handle special characters safely.
ibis-server/tests/routers/v3/connector/spark/test_metadata.py (2)
4-38: Well-structured test with good validation.The test comprehensively validates the table metadata structure. The flexible type checking on Line 35 (accepting both "INTEGER" and "INT") properly handles potential variations in type representation across Spark versions.
40-61: LGTM!The test appropriately handles both scenarios (constraints present or absent) and includes clear documentation about Spark's typical behavior regarding foreign key constraints.
ibis-server/tests/routers/v3/connector/spark/conftest.py (3)
1-22: LGTM!The automatic marker application via
pytest_collection_modifyitemsensures all Spark tests in this directory are properly marked, allowing selective test execution with-m spark.
63-68: Ensure consistency with spark_connect connection string.The connection info uses hard-coded
localhost:15002, which should remain consistent with thespark_connectfixture. If the connection string inspark_connectbecomes configurable, this fixture should use the same configuration source.Consider extracting the connection details to a shared configuration or environment variables to maintain consistency between fixtures.
71-76: LGTM!The autouse fixture properly sets up and tears down the remote function list path configuration, ensuring test isolation and proper cleanup.
| def get_version(self) -> str: | ||
| df = self.connector.query("SELECT version()") | ||
| return df.column(0)[0].as_py() |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Find the Connector class and its query method
find . -type f -name "*.py" | xargs grep -l "class Connector" | head -5Repository: Canner/wren-engine
Length of output: 98
🏁 Script executed:
# Search for connector.query() usage patterns to verify return type handling
rg "connector\.query\(" --type=py -B 2 -A 3Repository: Canner/wren-engine
Length of output: 4727
🏁 Script executed:
# Also search for the Connector class definition and query method
rg "def query\(" --type=py -B 3 -A 5 | grep -A 8 "class Connector" || rg "def query\(" --type=py -A 5Repository: Canner/wren-engine
Length of output: 546
Use .to_pandas() for consistency with other methods in this file.
The get_version() method uses direct PyArrow API (.column(0)[0].as_py()) while all other methods in the same file use .to_pandas(). Change to return df.to_pandas().iloc[0, 0] or similar to maintain consistency.
🤖 Prompt for AI Agents
In @ibis-server/app/model/metadata/spark.py around lines 164 - 166, The
get_version method currently extracts the value via PyArrow calls
(df.column(0)[0].as_py()); change it to use the same DataFrame-to-pandas flow as
other methods by converting df to pandas and returning the first cell (e.g., use
df.to_pandas().iloc[0, 0] or equivalent). Locate get_version and replace the
PyArrow extraction with a .to_pandas() call on the query result so it matches
other methods' behavior.
| def cleanup(): | ||
| # Drop test tables | ||
| try: | ||
| spark.sql("DROP TABLE IF EXISTS default.orders") | ||
| except Exception as e: | ||
| raise e | ||
| finally: | ||
| spark.stop() |
There was a problem hiding this comment.
Simplify cleanup error handling.
The current cleanup logic raises exceptions after stopping Spark in the finally block. This pattern is unusual - typically, cleanup should log errors but not fail the test teardown. The raise e on Line 55 will cause test failures even when tests pass.
🐛 Suggested fix
def cleanup():
# Drop test tables
try:
spark.sql("DROP TABLE IF EXISTS default.orders")
except Exception as e:
- raise e
+ logger.warning(f"Failed to drop test table during cleanup: {e}")
finally:
spark.stop()Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In @ibis-server/tests/routers/v3/connector/spark/conftest.py around lines 50 -
57, The cleanup function currently re-raises any exception caught while dropping
test tables, which causes teardown to fail; modify cleanup (the cleanup function
that calls spark.sql and spark.stop) to not re-raise the exception — instead log
the exception (e.g., using the test logger or logging.exception) and proceed to
the finally block where spark.stop() is always called so teardown never fails
the test run.
goldmedal
left a comment
There was a problem hiding this comment.
Thanks @douenergy nice work 👍
Description
Added Spark connector support with
Hivecatalog integration. Implemented metadata extraction, query execution with PyArrow conversion, and comprehensive test suite. Includes Docker Compose setup for CI/CD testing.Known Issues
SPARK-54068:
PyArrow ≥22.0.0serialization issue with PlanMetrics. Workaround applied by filtering non-serializable attrs before toPandas() conversion. JIRASummary by CodeRabbit
New Features
Tests
Documentation
Chores
✏️ Tip: You can customize this high-level summary in your review settings.