feat(wren-engine): add Apache Doris connector support#1430
feat(wren-engine): add Apache Doris connector support#1430goldmedal merged 11 commits intoCanner:mainfrom
Conversation
- Add DorisConnectionInfo, QueryDorisDTO data models - Add DorisConnector with JSON column handling - Add DorisMetadata for table/column/constraint discovery - Add Doris sqlglot dialect for SQL transpilation - Add Doris function list and SQL knowledge resources - Add get_doris_connection() with autocommit compatibility workaround - Add Doris variant to wren-core DataSource enum (Rust) - Add Doris → MySQLDialect mapping in inner_dialect - Add Doris test scaffolding - Add doris pytest marker
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds Apache Doris support across the stack: sqlglot dialect export, Doris connector and DTOs, metadata extractor, DataSource wiring in Rust and core mappings, tests, local-run tooling, pytest marker, and documentation. Changes
Sequence DiagramsequenceDiagram
participant Client
participant Connector as DorisConnector
participant DataSourceExt as DataSourceExtension
participant DorisDB as Doris (ibis.mysql.connect)
participant MetadataFactory as MetadataFactory
participant DorisMetadata as DorisMetadata
Client->>Connector: init(DorisConnectionInfo)
Connector->>DataSourceExt: get_doris_connection(info)
DataSourceExt->>DorisDB: ibis.mysql.connect(host,port,db,charset,**kwargs)
DorisDB-->>DataSourceExt: BaseBackend (connection)
DataSourceExt-->>Connector: connection
Client->>Connector: execute_query(sql)
Connector->>DorisDB: run_query(sql)
DorisDB-->>Connector: result_table
Connector->>Connector: _handle_pyarrow_unsupported_type / _cast_json_columns
Connector-->>Client: transformed_result
Client->>MetadataFactory: get_metadata(DataSource.doris, info)
MetadataFactory->>DorisMetadata: new(info)
DorisMetadata->>DorisDB: query(information_schema)
DorisDB-->>DorisMetadata: rows
DorisMetadata->>DorisMetadata: _transform_column_type / assemble catalogs
DorisMetadata-->>MetadataFactory: metadata
MetadataFactory-->>Client: metadata
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@catpineapple Thanks for working on the Doris connector. There are some ruff-check need to be fixed. You can use the just command ( |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
ibis-server/app/model/connector.py (1)
362-396: DorisConnector implementation is correct.The connector properly extends
IbisConnectorand handles Doris-specific type conversions (Decimal, UUID, JSON) identically to MySQL, which is appropriate given Doris's MySQL protocol compatibility.Consider extracting shared logic with MySqlConnector.
The
_handle_pyarrow_unsupported_typeand_cast_json_columnsmethods are identical toMySqlConnector. You could extract a shared base class (e.g.,MySqlCompatibleConnector) to reduce duplication.♻️ Optional: Extract shared base class
class MySqlCompatibleConnector(IbisConnector): """Base connector for MySQL-compatible databases (MySQL, Doris).""" def _handle_pyarrow_unsupported_type(self, ibis_table: Table, **kwargs) -> Table: result_table = ibis_table for name, dtype in ibis_table.schema().items(): if isinstance(dtype, Decimal): result_table = self._round_decimal_columns( result_table=result_table, col_name=name, **kwargs ) elif isinstance(dtype, UUID): result_table = self._cast_uuid_columns( result_table=result_table, col_name=name ) elif isinstance(dtype, dt.JSON): result_table = self._cast_json_columns( result_table=result_table, col_name=name ) return result_table def _cast_json_columns(self, result_table: Table, col_name: str) -> Table: col = result_table[col_name] casted_col = col.cast("string") return result_table.mutate(**{col_name: casted_col}) class MySqlConnector(MySqlCompatibleConnector): def __init__(self, connection_info: ConnectionInfo): super().__init__(DataSource.mysql, connection_info) class DorisConnector(MySqlCompatibleConnector): """Doris connector - reuses MySQL protocol via ibis.mysql backend.""" def __init__(self, connection_info: ConnectionInfo): super().__init__(DataSource.doris, connection_info)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ibis-server/app/model/connector.py` around lines 362 - 396, The DorisConnector duplicates JSON/Decimal/UUID handling from MySqlConnector; extract a new base class MySqlCompatibleConnector containing the methods _handle_pyarrow_unsupported_type and _cast_json_columns (which call _round_decimal_columns and _cast_uuid_columns) and have both MySqlConnector and DorisConnector inherit from MySqlCompatibleConnector, adjusting their __init__ to call super().__init__(DataSource.mysql or DataSource.doris, connection_info) so the shared logic is centralized and duplication removed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ibis-server/app/model/__init__.py`:
- Around line 56-57: QueryDorisDTO currently accepts either ConnectionUrl or
DorisConnectionInfo via the connection_info field which lets generic
ConnectionUrl paths bypass the instance-scoped Doris handling; change
QueryDorisDTO so its connection_info only accepts DorisConnectionInfo (remove
ConnectionUrl from the union) and keep using the existing connection_info_field
default so requests for Doris cannot be routed through generic ibis.connect(...)
and will reach the Doris-specific get_doris_connection/get_autocommit logic.
In `@ibis-server/app/model/metadata/doris.py`:
- Around line 75-93: The SQL string interpolates self.database directly
(creating an injection risk); change the query in doris.py to avoid embedding
self.database by using a bound parameter (pass the database value separately to
the DB driver's execute call) or by obtaining the active database from the live
connection (e.g., call the connection to get current database instead of using
self.database). Locate the code that constructs the sql variable and update the
execute path to use a parameter placeholder for the WHERE clause or replace
self.database with the connection-derived database value, ensuring the driver
receives the database as a parameter rather than via f-string interpolation.
- Around line 179-198: _transform_column_type currently only strips
parenthetical precision and leaves angle-bracketed collection types like
"array<int>" or "map<string,int>" intact so DORIS_TYPE_MAPPING lookup falls back
to UNKNOWN; update the normalization of data_type in _transform_column_type to
also remove any angle-bracketed generics (e.g., apply a second re.sub or a
single regex that strips "<...>" in addition to "(...)" on
data_type.strip().lower()) so normalized_type becomes just "array", "map", or
"struct" and the existing DORIS_TYPE_MAPPING lookup returns the correct
RustWrenEngineColumnType (e.g., JSON).
---
Nitpick comments:
In `@ibis-server/app/model/connector.py`:
- Around line 362-396: The DorisConnector duplicates JSON/Decimal/UUID handling
from MySqlConnector; extract a new base class MySqlCompatibleConnector
containing the methods _handle_pyarrow_unsupported_type and _cast_json_columns
(which call _round_decimal_columns and _cast_uuid_columns) and have both
MySqlConnector and DorisConnector inherit from MySqlCompatibleConnector,
adjusting their __init__ to call super().__init__(DataSource.mysql or
DataSource.doris, connection_info) so the shared logic is centralized and
duplication removed.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: af6e6221-51db-4929-93ba-6f0661bad7b4
⛔ Files ignored due to path filters (1)
ibis-server/resources/function_list/doris.csvis excluded by!**/*.csv
📒 Files selected for processing (17)
ibis-server/app/custom_sqlglot/dialects/__init__.pyibis-server/app/custom_sqlglot/dialects/doris.pyibis-server/app/model/__init__.pyibis-server/app/model/connector.pyibis-server/app/model/data_source.pyibis-server/app/model/metadata/doris.pyibis-server/app/model/metadata/factory.pyibis-server/app/util.pyibis-server/pyproject.tomlibis-server/resources/knowledge/dialects/doris.txtibis-server/tests/routers/v3/connector/doris/__init__.pyibis-server/tests/routers/v3/connector/doris/conftest.pyibis-server/tests/routers/v3/connector/doris/test_query.pyibis-server/tools/query_local_run.pywren-core-base/manifest-macro/src/lib.rswren-core-base/src/mdl/manifest.rswren-core/core/src/mdl/dialect/inner_dialect.rs
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ibis-server/app/model/metadata/doris.py`:
- Line 74: The Doris implementation of get_table_list currently lacks the
parameters required by the Metadata base contract; update def get_table_list in
doris.py to match Metadata.get_table_list(self, filter_info=None, limit=None) so
it accepts filter_info and limit, then adapt the function body to use these
parameters (apply filtering/limit to the returned list or pass them through to
the underlying listing logic) or explicitly ignore them while preserving the
signature to avoid TypeError when callers pass those arguments.
- Around line 75-93: The SQL WHERE clause in the Doris metadata queries (used by
functions like get_table_list() and get_schema_list()) is inconsistent about
excluding system schemas; update the WHERE filters so all discovery endpoints
consistently exclude the same set (e.g., 'mysql', 'information_schema',
'performance_schema', 'sys', and '__internal_schema'). Change the WHERE
condition in the query that selects from information_schema.COLUMNS (and the
similar query at the other location) to include NOT IN
('mysql','information_schema','performance_schema','sys','__internal_schema') so
table/schema/column discovery yield consistent results.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 1f80105c-f7ad-4e47-9feb-464be1237144
📒 Files selected for processing (2)
ibis-server/app/model/__init__.pyibis-server/app/model/metadata/doris.py
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
ibis-server/app/model/metadata/doris.py (1)
74-74:⚠️ Potential issue | 🟠 MajorRestore the base
Metadata.get_table_list()signature.Line 74 still drops
filter_infoandlimit, so callers using the shared metadata interface can still hitTypeError. If Doris does not support those knobs yet, keep the parameters and ignore them rather than narrowing the contract.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ibis-server/app/model/metadata/doris.py` at line 74, The get_table_list method in doris.py narrowed the shared Metadata interface by removing parameters; restore the original Metadata.get_table_list(filter_info=None, limit=None) signature on def get_table_list(self, filter_info=None, limit=None) so callers don't get TypeError, and inside the DorisMetadata.get_table_list implementation simply accept those args and ignore or validate them (e.g., log/raise if unsupported) while returning the list of Table objects; keep the method name get_table_list and existing return behavior otherwise.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ibis-server/app/model/metadata/doris.py`:
- Around line 150-151: The code concatenates the raw limit argument into SQL
(the expression "sql += f\" LIMIT {limit}\""), which risks malformed SQL or
injection; update the SQL-building routine to validate and sanitize limit before
interpolation: ensure the limit variable is an integer (or coercible to int),
enforce a non-negative (and optionally max) bound, raise a clear ValueError if
validation fails, and only then append a safe numeric LIMIT to the sql string
(or switch to a parameterized query mechanism if available) so only validated
numeric values are ever inserted.
- Around line 75-93: The discovery SQL assigned to the sql variable returns
columns in arbitrary order causing unstable column ordering and composite
primaryKey assembly in the loop that builds columns and primaryKey; add an ORDER
BY clause to the query (e.g., ORDER BY c.TABLE_SCHEMA, c.TABLE_NAME,
c.ORDINAL_POSITION) so rows are grouped by table and sorted by column ordinal
before the code that assembles columns/primaryKey runs (look for the sql
variable and the loop that constructs columns/primaryKey).
---
Duplicate comments:
In `@ibis-server/app/model/metadata/doris.py`:
- Line 74: The get_table_list method in doris.py narrowed the shared Metadata
interface by removing parameters; restore the original
Metadata.get_table_list(filter_info=None, limit=None) signature on def
get_table_list(self, filter_info=None, limit=None) so callers don't get
TypeError, and inside the DorisMetadata.get_table_list implementation simply
accept those args and ignore or validate them (e.g., log/raise if unsupported)
while returning the list of Table objects; keep the method name get_table_list
and existing return behavior otherwise.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 5b3c4a4e-5f98-409a-bc6f-a8dd62c2407b
📒 Files selected for processing (1)
ibis-server/app/model/metadata/doris.py
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
ibis-server/app/model/metadata/doris.py (2)
153-154:⚠️ Potential issue | 🟠 MajorValidate
limitbefore interpolating it into SQL.A raw runtime value is being concatenated into the query here. Coerce it to an integer and reject negatives so callers get a clear error instead of malformed SQL.
Suggested fix
""" if limit is not None: - sql += f" LIMIT {limit}" + try: + validated_limit = int(limit) + except (TypeError, ValueError) as exc: + raise ValueError("limit must be an integer") from exc + if validated_limit < 0: + raise ValueError("limit must be non-negative") + sql += f" LIMIT {validated_limit}" response = self.connection.sql(sql).to_pandas()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ibis-server/app/model/metadata/doris.py` around lines 153 - 154, The code concatenates the raw limit into the SQL string (variable limit in the SQL-building block in doris.py); coerce limit to an int and validate it is non-negative before interpolation, raising a ValueError for invalid inputs so malformed SQL cannot be produced; then interpolate the sanitized integer (e.g., use int(limit)) into the existing SQL assembly logic that appends " LIMIT {limit}" so only validated numeric values are used.
78-96:⚠️ Potential issue | 🟠 MajorAdd a deterministic
ORDER BYto the discovery query.
columnsorder and compositeprimaryKeyassembly currently depend oninformation_schemarow order, which is not stable. Sort the result set before the Python loop consumes it.Suggested fix
WHERE - c.TABLE_SCHEMA NOT IN ('information_schema', '__internal_schema', 'mysql', 'performance_schema', 'sys'); + c.TABLE_SCHEMA NOT IN ('information_schema', '__internal_schema', 'mysql', 'performance_schema', 'sys') + ORDER BY + c.TABLE_SCHEMA, + c.TABLE_NAME, + c.ORDINAL_POSITION; """🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ibis-server/app/model/metadata/doris.py` around lines 78 - 96, The discovery SQL assigned to the variable sql currently lacks a deterministic ORDER BY, causing unstable column ordering; update that SQL string to append an ORDER BY clause (e.g., ORDER BY c.TABLE_SCHEMA, c.TABLE_NAME, c.ORDINAL_POSITION, c.COLUMN_NAME) so rows are consistently sorted before the Python code that consumes sql builds the columns and composite primaryKey; locate the sql variable in doris.py and modify the query accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ibis-server/app/model/metadata/doris.py`:
- Around line 107-117: Change the dummy empty-string primary key to an actual
None so absent keys are represented as null: in the Table constructor calls
(e.g., where unique_tables[schema_table] = Table(..., primaryKey="") and the
similar block around the later Table(...) at lines ~131-138), set
primaryKey=None instead of "" so downstream code can distinguish “no key” from
an empty string; ensure the Table field name is exactly primaryKey when making
this change.
---
Duplicate comments:
In `@ibis-server/app/model/metadata/doris.py`:
- Around line 153-154: The code concatenates the raw limit into the SQL string
(variable limit in the SQL-building block in doris.py); coerce limit to an int
and validate it is non-negative before interpolation, raising a ValueError for
invalid inputs so malformed SQL cannot be produced; then interpolate the
sanitized integer (e.g., use int(limit)) into the existing SQL assembly logic
that appends " LIMIT {limit}" so only validated numeric values are used.
- Around line 78-96: The discovery SQL assigned to the variable sql currently
lacks a deterministic ORDER BY, causing unstable column ordering; update that
SQL string to append an ORDER BY clause (e.g., ORDER BY c.TABLE_SCHEMA,
c.TABLE_NAME, c.ORDINAL_POSITION, c.COLUMN_NAME) so rows are consistently sorted
before the Python code that consumes sql builds the columns and composite
primaryKey; locate the sql variable in doris.py and modify the query
accordingly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 140b2e26-0576-4f2f-8bff-33094a3ad009
📒 Files selected for processing (1)
ibis-server/app/model/metadata/doris.py
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
ibis-server/app/model/metadata/doris.py (1)
171-178: Consider narrowing the exception handling.Catching bare
Exceptioncan mask unexpected errors. IfCURRENT_CATALOG()fails due to a connection issue or permission error, silently falling back to "internal" may hide real problems from operators.Consider catching more specific exceptions (e.g., database driver exceptions or Ibis-specific errors) so that genuine failures propagate appropriately.
♻️ Suggested refinement
try: catalog_name = ( self.connection.sql("SELECT CURRENT_CATALOG()") .to_pandas() .iloc[0, 0] ) - except Exception: + except Exception as e: + logger.debug(f"CURRENT_CATALOG() not supported, falling back to 'internal': {e}") catalog_name = "internal"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ibis-server/app/model/metadata/doris.py` around lines 171 - 178, Replace the bare "except Exception" around the CURRENT_CATALOG() call so only expected DB/ibis failures are swallowed; catch specific exceptions like IbisError and DB driver errors (e.g., DatabaseError/OperationalError/ProgrammingError) for the block that sets catalog_name from self.connection.sql(...). If an unexpected exception occurs, re-raise it instead of silently falling back to "internal" so real errors surface to operators. Ensure references: the try block that calls self.connection.sql("SELECT CURRENT_CATALOG()").to_pandas().iloc[0, 0] and the except clause are updated accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ibis-server/app/model/metadata/doris.py`:
- Around line 75-77: The get_table_list method currently ignores its filter_info
and limit parameters; update get_table_list to apply filtering and limiting
similar to get_schema_list: build the SQL query in get_table_list to add WHERE
conditions when FilterInfo fields (e.g., schema/database name and table name
patterns) are provided and append a LIMIT clause when limit is not None, using
the same parameterized query approach used elsewhere in this class; ensure you
reference and reuse the same parameter names and mapping logic as in
get_schema_list so callers receive a filtered/limited list of Table objects
according to the provided filter_info and limit.
---
Nitpick comments:
In `@ibis-server/app/model/metadata/doris.py`:
- Around line 171-178: Replace the bare "except Exception" around the
CURRENT_CATALOG() call so only expected DB/ibis failures are swallowed; catch
specific exceptions like IbisError and DB driver errors (e.g.,
DatabaseError/OperationalError/ProgrammingError) for the block that sets
catalog_name from self.connection.sql(...). If an unexpected exception occurs,
re-raise it instead of silently falling back to "internal" so real errors
surface to operators. Ensure references: the try block that calls
self.connection.sql("SELECT CURRENT_CATALOG()").to_pandas().iloc[0, 0] and the
except clause are updated accordingly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: b9d6ed0a-9065-473b-b4d9-fa4e8ceb9eec
📒 Files selected for processing (1)
ibis-server/app/model/metadata/doris.py
|
Hi @catpineapple, this PR makes sense to me 👍. Could you add a document about how to set up the environment for Apache Doris testing? You can refer to how Spark did it. |
- Add docker-compose.yml for Doris test environment - Rewrite conftest.py with proper test data setup/teardown - Rewrite test_query.py with full query tests (basic, limit, dry-run, validation) - Add test_functions.py for function list and scalar/aggregate function tests - Add test_metadata.py for table listing, constraints, and version tests - Add pymysql to dev dependencies for Doris test data loading
Co-authored-by: Jax Liu <liugs963@gmail.com>
goldmedal
left a comment
There was a problem hiding this comment.
Thanks @catpineapple 👍
What this PR does:
Update WrenAI module to #2150
Summary by CodeRabbit
New Features
Documentation
Tests
Chores