-
-
Notifications
You must be signed in to change notification settings - Fork 95
#3588 fix: gRPC query and streaming query now propagate the language parameter #3589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| # gRPC Query Language Support | ||
|
|
||
| ## Problem | ||
|
|
||
| The gRPC client (`RemoteGrpcDatabase`) ignores the `language` parameter for query operations. Only `command()` correctly propagates the language. This prevents using Cypher, Gremlin, or any non-SQL language through the gRPC query and streaming query paths. | ||
|
|
||
| The bug spans three layers: | ||
|
|
||
| 1. **Proto**: `ExecuteQueryRequest` is missing a `language` field entirely | ||
| 2. **Client**: `query()` can't pass language (proto missing it); `queryStream()` and `streamQuery()` never call `.setLanguage()` despite the proto supporting it | ||
| 3. **Server**: `executeQuery()` hardcodes `db.query("sql", ...)`; all three `streamQuery` modes (`streamCursor`, `streamMaterialized`, `streamPaged`) hardcode `db.query("sql", ...)` | ||
|
|
||
| ## Design Decisions | ||
|
|
||
| - Add a `language` field to `ExecuteQueryRequest` (additive, backward-compatible) | ||
| - Keep using `db.query(language, ...)` on the server for the Query RPC (caller chose read-only) | ||
| - Default to `"sql"` when the language field is empty/unset (backward-compatible) | ||
|
|
||
| ## Changes | ||
|
|
||
| ### 1. Proto (`grpc/src/main/proto/arcadedb-server.proto`) | ||
|
|
||
| Add `string language = 9;` to `ExecuteQueryRequest`: | ||
|
|
||
| ```protobuf | ||
| message ExecuteQueryRequest { | ||
| string database = 1; | ||
| string query = 2; | ||
| map<string, GrpcValue> parameters = 3; | ||
| DatabaseCredentials credentials = 4; | ||
| TransactionContext transaction = 5; | ||
| int32 limit = 6; | ||
| int32 timeout_ms = 7; | ||
| ProjectionSettings projectionSettings = 8; | ||
| string language = 9; // "sql" if empty (default) | ||
| } | ||
| ``` | ||
|
|
||
| ### 2. Server (`grpcw/.../ArcadeDbGrpcService.java`) | ||
|
|
||
| **`executeQuery()`**: Replace hardcoded `"sql"` with language from request, defaulting to `"sql"` when empty. | ||
|
|
||
| **`streamQuery()`**: Extract language from `StreamQueryRequest.getLanguage()` (proto field 7, already exists), resolve default, and pass to `streamCursor`/`streamMaterialized`/`streamPaged`. Each mode method gains a `String language` parameter. | ||
|
|
||
| ### 3. Client (`grpc-client/.../RemoteGrpcDatabase.java`) | ||
|
|
||
| - `query()` path (line 556): Add `.setLanguage(language)` to `ExecuteQueryRequest` builder | ||
| - `queryStream()` path (line 780): Add `.setLanguage(language)` to `StreamQueryRequest` builder | ||
| - Private `streamQuery()` (line 1767): Add `.setLanguage("sql")` since it's SQL-only by design | ||
|
|
||
| ### Testing | ||
|
|
||
| - Existing gRPC e2e tests verify backward compatibility (SQL still works) | ||
| - Add test that runs a query with a non-SQL language through `query()` and `queryStream()` to verify language propagation |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| # gRPC Query Language Support - Implementation Plan | ||
|
|
||
| ## Step 1: Proto change | ||
|
|
||
| - File: `grpc/src/main/proto/arcadedb-server.proto` | ||
| - Add `string language = 9;` to `ExecuteQueryRequest` (after `projectionSettings`) | ||
| - Rebuild proto: `cd grpc && mvn clean install -DskipTests` | ||
|
|
||
| ## Step 2: Server - `executeQuery()` language support | ||
|
|
||
| - File: `grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java` | ||
| - In `executeQuery()` (~line 823): extract language from `request.getLanguage()`, default to `"sql"` | ||
| - Replace `db.query("sql", ...)` with `db.query(language, ...)` | ||
|
|
||
| ## Step 3: Server - `streamQuery()` language support | ||
|
|
||
| - File: `grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java` | ||
| - In `streamQuery()`: extract language from `request.getLanguage()`, default to `"sql"` | ||
| - Add `String language` parameter to `streamCursor()`, `streamMaterialized()`, `streamPaged()` | ||
| - Replace hardcoded `"sql"` in each mode's `db.query()` call | ||
| - Build server: `cd grpcw && mvn clean install -DskipTests` | ||
|
|
||
| ## Step 4: Client - wire language through query paths | ||
|
|
||
| - File: `grpc-client/src/main/java/com/arcadedb/remote/grpc/RemoteGrpcDatabase.java` | ||
| - `query()` at line 556: add `.setLanguage(language)` to `ExecuteQueryRequest` builder | ||
| - `queryStream()` at line 780: add `.setLanguage(language)` to `StreamQueryRequest` builder | ||
| - Private `streamQuery()` at line 1767: add `.setLanguage("sql")` to `StreamQueryRequest` builder | ||
| - Build client: `cd grpc-client && mvn clean install -DskipTests` | ||
|
|
||
| ## Step 5: Test | ||
|
|
||
| - Add e2e test verifying a non-SQL query (e.g. Cypher `MATCH (n) RETURN n LIMIT 1`) works via gRPC `query()` | ||
| - Run existing gRPC e2e tests to verify no regressions |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -252,8 +252,7 @@ private ExecuteCommandResponse executeCommandInternal(ExecuteCommandRequest req, | |
| try { | ||
| final Map<String, Object> params = GrpcTypeConverter.convertParameters(req.getParametersMap()); | ||
|
|
||
| // Language defaults to "sql" when empty | ||
| final String language = (req.getLanguage() == null || req.getLanguage().isEmpty()) ? "sql" : req.getLanguage(); | ||
| final String language = langOrDefault(req.getLanguage()); | ||
|
|
||
| // Transaction: begin if requested | ||
| final boolean hasTx = req.hasTransaction(); | ||
|
|
@@ -818,9 +817,11 @@ public void executeQuery(ExecuteQueryRequest request, StreamObserver<ExecuteQuer | |
| // Execute the query | ||
| long startTime = System.currentTimeMillis(); | ||
|
|
||
| LogManager.instance().log(this, Level.FINE, "executeQuery(): query = %s", request.getQuery()); | ||
| final String language = langOrDefault(request.getLanguage()); | ||
|
|
||
| ResultSet resultSet = database.query("sql", request.getQuery(), | ||
| LogManager.instance().log(this, Level.FINE, "executeQuery(): language = %s query = %s", language, request.getQuery()); | ||
|
|
||
| ResultSet resultSet = database.query(language, request.getQuery(), | ||
| GrpcTypeConverter.convertParameters(request.getParametersMap())); | ||
|
|
||
| LogManager.instance() | ||
|
|
@@ -1105,12 +1106,20 @@ public void streamQuery(StreamQueryRequest request, StreamObserver<QueryResult> | |
| beganHere = true; | ||
| } | ||
|
|
||
| final String language = langOrDefault(request.getLanguage()); | ||
|
|
||
| // --- Dispatch on mode (helpers do NOT manage transactions) --- | ||
| // PAGED mode uses SQL-specific SKIP/LIMIT wrapping, so fall back to CURSOR for non-SQL languages | ||
| switch (request.getRetrievalMode()) { | ||
| case MATERIALIZE_ALL -> streamMaterialized(db, request, batchSize, scso, cancelled, projectionConfig); | ||
| case PAGED -> streamPaged(db, request, batchSize, scso, cancelled, projectionConfig); | ||
| case CURSOR -> streamCursor(db, request, batchSize, scso, cancelled, projectionConfig); | ||
| default -> streamCursor(db, request, batchSize, scso, cancelled, projectionConfig); | ||
| case MATERIALIZE_ALL -> streamMaterialized(db, request, batchSize, scso, cancelled, projectionConfig, language); | ||
| case PAGED -> { | ||
| if (!"sql".equalsIgnoreCase(language)) | ||
| streamCursor(db, request, batchSize, scso, cancelled, projectionConfig, language); | ||
| else | ||
| streamPaged(db, request, batchSize, scso, cancelled, projectionConfig, language); | ||
| } | ||
| case CURSOR -> streamCursor(db, request, batchSize, scso, cancelled, projectionConfig, language); | ||
| default -> streamCursor(db, request, batchSize, scso, cancelled, projectionConfig, language); | ||
| } | ||
|
|
||
| // If the client cancelled mid-stream, choose rollback unless caller explicitly | ||
|
|
@@ -1171,14 +1180,14 @@ public void streamQuery(StreamQueryRequest request, StreamObserver<QueryResult> | |
| */ | ||
| private void streamCursor(Database db, StreamQueryRequest request, int batchSize, | ||
| ServerCallStreamObserver<QueryResult> scso, | ||
| AtomicBoolean cancelled, ProjectionConfig projectionConfig) { | ||
| AtomicBoolean cancelled, ProjectionConfig projectionConfig, String language) { | ||
|
|
||
| long running = 0L; | ||
|
|
||
| QueryResult.Builder batch = QueryResult.newBuilder(); | ||
| int inBatch = 0; | ||
|
|
||
| try (ResultSet rs = db.query("sql", request.getQuery(), | ||
| try (ResultSet rs = db.query(language, request.getQuery(), | ||
| GrpcTypeConverter.convertParameters(request.getParametersMap()))) { | ||
|
|
||
| while (rs.hasNext()) { | ||
|
|
@@ -1242,11 +1251,11 @@ private void streamCursor(Database db, StreamQueryRequest request, int batchSize | |
| */ | ||
| private void streamMaterialized(Database db, StreamQueryRequest request, int batchSize, | ||
| ServerCallStreamObserver<QueryResult> scso, | ||
| AtomicBoolean cancelled, ProjectionConfig projectionConfig) { | ||
| AtomicBoolean cancelled, ProjectionConfig projectionConfig, String language) { | ||
|
|
||
| final List<GrpcRecord> all = new ArrayList<>(); | ||
|
|
||
| try (ResultSet rs = db.query("sql", request.getQuery(), | ||
| try (ResultSet rs = db.query(language, request.getQuery(), | ||
| GrpcTypeConverter.convertParameters(request.getParametersMap()))) { | ||
|
|
||
| while (rs.hasNext()) { | ||
|
|
@@ -1295,7 +1304,7 @@ private void streamMaterialized(Database db, StreamQueryRequest request, int bat | |
| */ | ||
| private void streamPaged(Database db, StreamQueryRequest request, int batchSize, | ||
| ServerCallStreamObserver<QueryResult> scso, | ||
| AtomicBoolean cancelled, ProjectionConfig projectionConfig) { | ||
| AtomicBoolean cancelled, ProjectionConfig projectionConfig, String language) { | ||
|
|
||
| final String pagedSql = wrapWithSkipLimit(request.getQuery()); // see helper below | ||
| int offset = 0; | ||
|
|
@@ -1313,7 +1322,7 @@ private void streamPaged(Database db, StreamQueryRequest request, int batchSize, | |
| int count = 0; | ||
| QueryResult.Builder b = QueryResult.newBuilder(); | ||
|
|
||
| try (ResultSet rs = db.query("sql", pagedSql, params)) { | ||
| try (ResultSet rs = db.query(language, pagedSql, params)) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In |
||
| while (rs.hasNext()) { | ||
| if (cancelled.get()) | ||
| return; | ||
|
|
@@ -3063,6 +3072,10 @@ private String generateTransactionId() { | |
| return "tx_" + System.nanoTime(); | ||
| } | ||
|
|
||
| private static String langOrDefault(String language) { | ||
| return (language == null || language.isEmpty()) ? "sql" : language; | ||
| } | ||
|
|
||
| // ---- Debug helpers ---- | ||
| private static String summarizeJava(Object o) { | ||
| if (o == null) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By adding the
languageparameter, this method now incorrectly suggests it supports paged streaming for any language. However, the implementation callswrapWithSkipLimitwhich generates SQL-specific syntax for pagination. This will cause queries in other languages like Gremlin or Cypher to fail because the wrapped query is not valid in those languages.To fix this, you should add a check at the beginning of the method to throw an
UnsupportedOperationExceptionif the language is not SQL, making it clear that this mode is not yet supported for other languages.