From f6e0e2181cd8b86fc32c91a90c932a9981eb9e3a Mon Sep 17 00:00:00 2001 From: acarbonetto Date: Tue, 27 Jun 2023 13:58:42 -0700 Subject: [PATCH 01/13] Update documentation for _routing Signed-off-by: acarbonetto From 21318f1044b4693de9ef652ca2b0d3325a5a0932 Mon Sep 17 00:00:00 2001 From: acarbonetto Date: Tue, 4 Jul 2023 21:54:52 -0700 Subject: [PATCH 02/13] Proof of Concept: request routing shard through SQL partition Signed-off-by: acarbonetto --- .../org/opensearch/sql/analysis/Analyzer.java | 9 ++++-- .../org/opensearch/sql/ast/tree/Relation.java | 20 +++++++++++++ .../opensearch/sql/storage/StorageEngine.java | 3 +- .../sql/sql/StandalonePaginationIT.java | 2 +- .../sql/legacy/plugin/RestSqlAction.java | 2 +- .../sql/legacy/request/SqlRequest.java | 8 ++++- .../sql/legacy/request/SqlRequestFactory.java | 26 +++++++++++++++- .../RestSQLQueryActionCursorFallbackTest.java | 4 ++- .../request/OpenSearchQueryRequest.java | 28 ++++++++++++----- .../request/OpenSearchRequestBuilder.java | 7 +++-- .../request/OpenSearchScrollRequest.java | 16 +++++++++- .../opensearch/storage/OpenSearchIndex.java | 11 +++++-- .../storage/OpenSearchStorageEngine.java | 6 ++-- .../request/OpenSearchQueryRequestTest.java | 9 ++++-- .../request/OpenSearchRequestBuilderTest.java | 22 +++++++++----- .../request/OpenSearchScrollRequestTest.java | 22 +++++++++----- .../storage/OpenSearchIndexTest.java | 28 ++++++++++++----- .../storage/OpenSearchStorageEngineTest.java | 4 +-- .../storage/scan/OpenSearchIndexScanTest.java | 30 +++++++++++-------- .../storage/PrometheusStorageEngine.java | 3 +- .../storage/PrometheusStorageEngineTest.java | 8 ++--- sql/src/main/antlr/OpenSearchSQLParser.g4 | 6 +++- .../sql/sql/domain/SQLQueryRequest.java | 9 ++++-- .../opensearch/sql/sql/parser/AstBuilder.java | 14 ++++++++- .../opensearch/sql/sql/SQLServiceTest.java | 7 +++-- .../sql/sql/domain/SQLQueryRequestTest.java | 4 ++- 26 files changed, 233 insertions(+), 75 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java index 2c4647004ce..d184d6e7594 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java +++ b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java @@ -143,6 +143,7 @@ public LogicalPlan analyze(UnresolvedPlan unresolved, AnalysisContext context) { @Override public LogicalPlan visitRelation(Relation node, AnalysisContext context) { QualifiedName qualifiedName = node.getTableQualifiedName(); + String partitionName = node.getTablePartitionKeys(); DataSourceSchemaIdentifierNameResolver dataSourceSchemaIdentifierNameResolver = new DataSourceSchemaIdentifierNameResolver(dataSourceService, qualifiedName.getParts()); String tableName = dataSourceSchemaIdentifierNameResolver.getIdentifierName(); @@ -156,9 +157,11 @@ public LogicalPlan visitRelation(Relation node, AnalysisContext context) { .getDataSource(dataSourceSchemaIdentifierNameResolver.getDataSourceName()) .getStorageEngine() .getTable(new DataSourceSchemaName( - dataSourceSchemaIdentifierNameResolver.getDataSourceName(), - dataSourceSchemaIdentifierNameResolver.getSchemaName()), - dataSourceSchemaIdentifierNameResolver.getIdentifierName()); + dataSourceSchemaIdentifierNameResolver.getDataSourceName(), + dataSourceSchemaIdentifierNameResolver.getSchemaName() + ), + dataSourceSchemaIdentifierNameResolver.getIdentifierName(), + partitionName); } table.getFieldTypes().forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v)); table.getReservedFieldTypes().forEach( diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/Relation.java b/core/src/main/java/org/opensearch/sql/ast/tree/Relation.java index 8c3868329ff..9eddac30a8a 100644 --- a/core/src/main/java/org/opensearch/sql/ast/tree/Relation.java +++ b/core/src/main/java/org/opensearch/sql/ast/tree/Relation.java @@ -35,8 +35,13 @@ public Relation(UnresolvedExpression tableName) { } public Relation(UnresolvedExpression tableName, String alias) { + this(tableName, alias, null); + } + + public Relation(UnresolvedExpression tableName, String alias, List partitionKeys) { this.tableName = Arrays.asList(tableName); this.alias = alias; + this.partitionKeys = partitionKeys; } /** @@ -44,6 +49,12 @@ public Relation(UnresolvedExpression tableName, String alias) { */ private String alias; + + /** + * Optional partition key(s) for the relation. + */ + private List partitionKeys; + /** * Return table name. * @@ -88,6 +99,15 @@ public QualifiedName getTableQualifiedName() { } } + /** + * Retrieve the partition keys associated with the table/relation + * + * @return TablePartitionKeys. + */ + public String getTablePartitionKeys() { + return String.join(COMMA, partitionKeys); + } + @Override public List getChild() { return ImmutableList.of(); diff --git a/core/src/main/java/org/opensearch/sql/storage/StorageEngine.java b/core/src/main/java/org/opensearch/sql/storage/StorageEngine.java index ffcc0911dee..3b1b000e778 100644 --- a/core/src/main/java/org/opensearch/sql/storage/StorageEngine.java +++ b/core/src/main/java/org/opensearch/sql/storage/StorageEngine.java @@ -8,6 +8,7 @@ import java.util.Collection; import java.util.Collections; +import javax.annotation.Nullable; import org.opensearch.sql.DataSourceSchemaName; import org.opensearch.sql.expression.function.FunctionResolver; @@ -19,7 +20,7 @@ public interface StorageEngine { /** * Get {@link Table} from storage engine. */ - Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName); + Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName, @Nullable String partition); /** * Get list of datasource related functions. diff --git a/integ-test/src/test/java/org/opensearch/sql/sql/StandalonePaginationIT.java b/integ-test/src/test/java/org/opensearch/sql/sql/StandalonePaginationIT.java index aad39c40744..aa0ae4729a0 100644 --- a/integ-test/src/test/java/org/opensearch/sql/sql/StandalonePaginationIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/sql/StandalonePaginationIT.java @@ -112,7 +112,7 @@ public void onFailure(Exception e) { } // act 1, asserts in firstResponder - var t = new OpenSearchIndex(client, defaultSettings(), "test"); + var t = new OpenSearchIndex(client, defaultSettings(), "test", "routingId"); LogicalPlan p = new LogicalPaginate(1, List.of( new LogicalProject( new LogicalRelation("test", t), List.of( diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java index 9a15cc9e215..df18ec80f16 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java @@ -148,7 +148,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli // Route request to new query engine if it's supported already SQLQueryRequest newSqlRequest = new SQLQueryRequest(sqlRequest.getJsonContent(), - sqlRequest.getSql(), request.path(), request.params(), sqlRequest.cursor()); + sqlRequest.getSql(), request.path(), request.params(), sqlRequest.cursor(), sqlRequest.routingIds()); return newSqlQueryHandler.prepareRequest(newSqlRequest, (restChannel, exception) -> { try{ diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/request/SqlRequest.java b/legacy/src/main/java/org/opensearch/sql/legacy/request/SqlRequest.java index 605ef3c958b..194c16a87e0 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/request/SqlRequest.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/request/SqlRequest.java @@ -8,6 +8,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.List; import org.json.JSONException; import org.json.JSONObject; import org.opensearch.common.settings.Settings; @@ -28,6 +29,7 @@ public class SqlRequest { JSONObject jsonContent; String cursor; Integer fetchSize; + private List routingIds; public SqlRequest(final String sql, final JSONObject jsonContent) { this.sql = sql; @@ -38,10 +40,12 @@ public SqlRequest(final String cursor) { this.cursor = cursor; } - public SqlRequest(final String sql, final Integer fetchSize, final JSONObject jsonContent) { + public SqlRequest(final String sql, final Integer fetchSize, final JSONObject jsonContent, + final List routingIds) { this.sql = sql; this.fetchSize = fetchSize; this.jsonContent = jsonContent; + this.routingIds = routingIds; } private static boolean isValidJson(String json) { @@ -65,6 +69,8 @@ public Integer fetchSize() { return this.fetchSize; } + public List routingIds() { return this.routingIds; } + public JSONObject getJsonContent() { return this.jsonContent; } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/request/SqlRequestFactory.java b/legacy/src/main/java/org/opensearch/sql/legacy/request/SqlRequestFactory.java index 4c5d207be85..2c6dbea6d47 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/request/SqlRequestFactory.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/request/SqlRequestFactory.java @@ -24,6 +24,7 @@ public class SqlRequestFactory { public static final String SQL_CURSOR_FIELD_NAME = "cursor"; public static final String SQL_FETCH_FIELD_NAME = "fetch_size"; + public static final String ROUTING_FIELD_NAME = "routing"; public static SqlRequest getSqlRequest(RestRequest request) { switch (request.method()) { @@ -63,7 +64,22 @@ private static SqlRequest parseSqlRequestFromPayload(RestRequest restRequest) { List parameters = parseParameters(paramArray); return new PreparedStatementRequest(sql, validateAndGetFetchSize(jsonContent), jsonContent, parameters); } - return new SqlRequest(sql, validateAndGetFetchSize(jsonContent), jsonContent); + + List routingIds = List.of(); + if (jsonContent.has(ROUTING_FIELD_NAME)) { + try { + routingIds = List.of(jsonContent.getString(ROUTING_FIELD_NAME)); + } catch (JSONException ignored) { + try { + JSONArray routingIdArray = jsonContent.getJSONArray(ROUTING_FIELD_NAME); + routingIds = parseRoutingIds(routingIdArray); + } catch (JSONException jsonException) { + throw new IllegalArgumentException(ROUTING_FIELD_NAME + " parameter must be defined as a string or array value", jsonException); + } + } + } + + return new SqlRequest(sql, validateAndGetFetchSize(jsonContent), jsonContent, routingIds); } @@ -82,6 +98,14 @@ private static Integer validateAndGetFetchSize(JSONObject jsonContent) { return fetchSize.orElse(0); } + private static List parseRoutingIds(JSONArray array) { + List routingIds = List.of(); + for (int i = 0; i < array.length(); i++) { + routingIds.add(array.getString(i)); + } + return routingIds; + } + private static List parseParameters( JSONArray paramsJsonArray) { List parameters = new ArrayList<>(); diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionCursorFallbackTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionCursorFallbackTest.java index a11f4c47d7a..989299fc38d 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionCursorFallbackTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionCursorFallbackTest.java @@ -11,6 +11,7 @@ import static org.opensearch.sql.legacy.plugin.RestSqlAction.QUERY_API_ENDPOINT; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; @@ -98,8 +99,9 @@ private static SQLQueryRequest createSqlQueryRequest(String query, Optional searchA searchDone = true; return new OpenSearchResponse( searchAction.apply(new SearchRequest() - .indices(indexName.getIndexNames()) - .source(sourceBuilder)), exprValueFactory, includes); + .indices(indexName.getIndexNames()) + .source(sourceBuilder) + .routing(getRoutingId().getIndexNames())), exprValueFactory, includes); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java index bec133f8344..de47f34d73c 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java @@ -96,24 +96,25 @@ public OpenSearchRequestBuilder(int requestedTotalSize, * @return query request or scroll request */ public OpenSearchRequest build(OpenSearchRequest.IndexName indexName, + OpenSearchRequest.IndexName routingId, int maxResultWindow, TimeValue scrollTimeout) { int size = requestedTotalSize; if (pageSize == null) { if (startFrom + size > maxResultWindow) { sourceBuilder.size(maxResultWindow - startFrom); return new OpenSearchScrollRequest( - indexName, scrollTimeout, sourceBuilder, exprValueFactory); + indexName, routingId, scrollTimeout, sourceBuilder, exprValueFactory); } else { sourceBuilder.from(startFrom); sourceBuilder.size(requestedTotalSize); - return new OpenSearchQueryRequest(indexName, sourceBuilder, exprValueFactory); + return new OpenSearchQueryRequest(indexName, routingId, sourceBuilder, exprValueFactory); } } else { if (startFrom != 0) { throw new UnsupportedOperationException("Non-zero offset is not supported with pagination"); } sourceBuilder.size(pageSize); - return new OpenSearchScrollRequest(indexName, scrollTimeout, + return new OpenSearchScrollRequest(indexName, routingId, scrollTimeout, sourceBuilder, exprValueFactory); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java index 9ffcc42ff74..33bbd69dc18 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java @@ -52,6 +52,12 @@ public class OpenSearchScrollRequest implements OpenSearchRequest { */ private final IndexName indexName; + /** + * Routing Ids used for the request + * {@link OpenSearchRequest.IndexName}. + */ + private final IndexName routingId; + /** Index name. */ @EqualsAndHashCode.Exclude @ToString.Exclude @@ -75,14 +81,17 @@ public class OpenSearchScrollRequest implements OpenSearchRequest { /** Constructor. */ public OpenSearchScrollRequest(IndexName indexName, + IndexName routingId, TimeValue scrollTimeout, SearchSourceBuilder sourceBuilder, OpenSearchExprValueFactory exprValueFactory) { this.indexName = indexName; + this.routingId = routingId; this.scrollTimeout = scrollTimeout; this.exprValueFactory = exprValueFactory; this.initialSearchRequest = new SearchRequest() .indices(indexName.getIndexNames()) + .routing(routingId.getIndexNames()) .scroll(scrollTimeout) .source(sourceBuilder); @@ -168,6 +177,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(scrollId); out.writeStringCollection(includes); indexName.writeTo(out); + routingId.writeTo(out); } /** @@ -183,7 +193,11 @@ public OpenSearchScrollRequest(StreamInput in, OpenSearchStorageEngine engine) scrollId = in.readString(); includes = in.readStringList(); indexName = new IndexName(in); - OpenSearchIndex index = (OpenSearchIndex) engine.getTable(null, indexName.toString()); + routingId = new IndexName(in); + OpenSearchIndex index = (OpenSearchIndex) engine.getTable( + null, + indexName.toString(), + routingId.toString()); exprValueFactory = new OpenSearchExprValueFactory(index.getFieldOpenSearchTypes()); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index 62617f744e6..68960a19042 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -66,6 +66,12 @@ public class OpenSearchIndex implements Table { */ private final OpenSearchRequest.IndexName indexName; + /** + * Stores the routing id for the request + * {@link OpenSearchRequest.IndexName}. + */ + private final OpenSearchRequest.IndexName routingId; + /** * The cached mapping of field and type in index. */ @@ -84,10 +90,11 @@ public class OpenSearchIndex implements Table { /** * Constructor. */ - public OpenSearchIndex(OpenSearchClient client, Settings settings, String indexName) { + public OpenSearchIndex(OpenSearchClient client, Settings settings, String indexName, String routingId) { this.client = client; this.settings = settings; this.indexName = new OpenSearchRequest.IndexName(indexName); + this.routingId = new OpenSearchRequest.IndexName(routingId); } @Override @@ -180,7 +187,7 @@ public TableScanBuilder createScanBuilder() { createExprValueFactory()); Function createScanOperator = requestBuilder -> new OpenSearchIndexScan(client, requestBuilder.getMaxResponseSize(), - requestBuilder.build(indexName, getMaxResultWindow(), cursorKeepAlive)); + requestBuilder.build(indexName, routingId, getMaxResultWindow(), cursorKeepAlive)); return new OpenSearchIndexScanBuilder(builder, createScanOperator); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java index c915fa549bd..24ea8b0febf 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java @@ -8,6 +8,7 @@ import static org.opensearch.sql.utils.SystemIndexUtils.isSystemIndex; +import javax.annotation.Nullable; import lombok.Getter; import lombok.RequiredArgsConstructor; import org.opensearch.sql.DataSourceSchemaName; @@ -28,11 +29,12 @@ public class OpenSearchStorageEngine implements StorageEngine { private final Settings settings; @Override - public Table getTable(DataSourceSchemaName dataSourceSchemaName, String name) { + public Table getTable(DataSourceSchemaName dataSourceSchemaName, String name, @Nullable String routingId) { if (isSystemIndex(name)) { + // TODO: handle routingId on system tables too? return new OpenSearchSystemIndex(client, name); } else { - return new OpenSearchIndex(client, settings, name); + return new OpenSearchIndex(client, settings, name, routingId == null ? "" : routingId); } } } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequestTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequestTest.java index cf548d44f96..1d60ee8b641 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequestTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequestTest.java @@ -68,15 +68,16 @@ public class OpenSearchQueryRequestTest { private OpenSearchExprValueFactory factory; private final OpenSearchQueryRequest request = - new OpenSearchQueryRequest("test", 200, factory); + new OpenSearchQueryRequest("test", "key", 200, factory); private final OpenSearchQueryRequest remoteRequest = - new OpenSearchQueryRequest("ccs:test", 200, factory); + new OpenSearchQueryRequest("ccs:test", "ccs:key", 200, factory); @Test void search() { OpenSearchQueryRequest request = new OpenSearchQueryRequest( new OpenSearchRequest.IndexName("test"), + new OpenSearchRequest.IndexName("key"), sourceBuilder, factory ); @@ -99,6 +100,7 @@ void search() { void search_withoutContext() { OpenSearchQueryRequest request = new OpenSearchQueryRequest( new OpenSearchRequest.IndexName("test"), + new OpenSearchRequest.IndexName("key"), sourceBuilder, factory ); @@ -117,6 +119,7 @@ void search_withoutContext() { void search_withIncludes() { OpenSearchQueryRequest request = new OpenSearchQueryRequest( new OpenSearchRequest.IndexName("test"), + new OpenSearchRequest.IndexName("key"), sourceBuilder, factory ); @@ -150,6 +153,7 @@ void searchRequest() { assertSearchRequest(new SearchRequest() .indices("test") + .routing("key") .source(new SearchSourceBuilder() .timeout(DEFAULT_QUERY_TIMEOUT) .from(0) @@ -165,6 +169,7 @@ void searchCrossClusterRequest() { assertSearchRequest( new SearchRequest() .indices("ccs:test") + .routing("key") .source(new SearchSourceBuilder() .timeout(DEFAULT_QUERY_TIMEOUT) .from(0) diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java index e8d15bd0bbd..e6a71a8454d 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java @@ -74,6 +74,9 @@ class OpenSearchRequestBuilderTest { private static final OpenSearchRequest.IndexName indexName = new OpenSearchRequest.IndexName("test"); + private static final OpenSearchRequest.IndexName partitionKey + = new OpenSearchRequest.IndexName("key"); + @Mock private OpenSearchExprValueFactory exprValueFactory; @@ -94,13 +97,14 @@ void build_query_request() { assertEquals( new OpenSearchQueryRequest( new OpenSearchRequest.IndexName("test"), + new OpenSearchRequest.IndexName("key"), new SearchSourceBuilder() .from(offset) .size(limit) .timeout(DEFAULT_QUERY_TIMEOUT) .trackScores(true), exprValueFactory), - requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT)); + requestBuilder.build(indexName, partitionKey, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT)); } @Test @@ -111,13 +115,15 @@ void build_scroll_request_with_correct_size() { assertEquals( new OpenSearchScrollRequest( - new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1), + new OpenSearchRequest.IndexName("test"), + new OpenSearchRequest.IndexName("key"), + TimeValue.timeValueMinutes(1), new SearchSourceBuilder() .from(offset) .size(MAX_RESULT_WINDOW - offset) .timeout(DEFAULT_QUERY_TIMEOUT), exprValueFactory), - requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT)); + requestBuilder.build(indexName, partitionKey, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT)); } @Test @@ -125,7 +131,8 @@ void test_push_down_query() { QueryBuilder query = QueryBuilders.termQuery("intA", 1); requestBuilder.pushDownFilter(query); - var r = requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT); + var r = requestBuilder.build( + indexName, partitionKey, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT); Function querySearch = searchRequest -> { assertEquals( new SearchSourceBuilder() @@ -197,8 +204,8 @@ void assertSearchSourceBuilder(SearchSourceBuilder expected, Function scrollSearch = searchScrollRequest -> { throw new UnsupportedOperationException(); }; - requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT).search( - querySearch, scrollSearch); + requestBuilder.build(indexName, partitionKey, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT) + .search(querySearch, scrollSearch); } @Test @@ -428,7 +435,8 @@ void exception_when_non_zero_offset_and_page_size() { requestBuilder.pushDownPageSize(3); requestBuilder.pushDownLimit(300, 2); assertThrows(UnsupportedOperationException.class, - () -> requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT)); + () -> requestBuilder.build( + indexName, partitionKey, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT)); } @Test diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequestTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequestTest.java index 63c6a5ca7dd..2ee77c73615 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequestTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequestTest.java @@ -54,6 +54,9 @@ class OpenSearchScrollRequestTest { public static final OpenSearchRequest.IndexName INDEX_NAME = new OpenSearchRequest.IndexName("test"); + + public static final OpenSearchRequest.IndexName ROUTING_ID + = new OpenSearchRequest.IndexName("shard"); public static final TimeValue SCROLL_TIMEOUT = TimeValue.timeValueMinutes(1); @Mock private SearchResponse searchResponse; @@ -72,13 +75,13 @@ class OpenSearchScrollRequestTest { private final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); private final OpenSearchScrollRequest request = new OpenSearchScrollRequest( - INDEX_NAME, SCROLL_TIMEOUT, + INDEX_NAME, ROUTING_ID, SCROLL_TIMEOUT, searchSourceBuilder, factory); @Test void constructor() { searchSourceBuilder.fetchSource(new String[] {"test"}, null); - var request = new OpenSearchScrollRequest(INDEX_NAME, SCROLL_TIMEOUT, + var request = new OpenSearchScrollRequest(INDEX_NAME, ROUTING_ID, SCROLL_TIMEOUT, searchSourceBuilder, factory); assertNotEquals(List.of(), request.getIncludes()); } @@ -86,7 +89,7 @@ void constructor() { @Test void constructor2() { searchSourceBuilder.fetchSource(new String[]{"test"}, null); - var request = new OpenSearchScrollRequest(INDEX_NAME, SCROLL_TIMEOUT, searchSourceBuilder, + var request = new OpenSearchScrollRequest(INDEX_NAME, ROUTING_ID, SCROLL_TIMEOUT, searchSourceBuilder, factory); assertNotEquals(List.of(), request.getIncludes()); } @@ -98,6 +101,7 @@ void searchRequest() { assertEquals( new SearchRequest() .indices("test") + .routing("shard") .scroll(TimeValue.timeValueMinutes(1)) .source(new SearchSourceBuilder().query(QueryBuilders.termQuery("name", "John"))), searchRequest); @@ -132,6 +136,7 @@ void scrollRequest() { void search() { OpenSearchScrollRequest request = new OpenSearchScrollRequest( new OpenSearchRequest.IndexName("test"), + new OpenSearchRequest.IndexName("shard"), TimeValue.timeValueMinutes(1), sourceBuilder, factory @@ -148,6 +153,7 @@ void search() { void search_without_context() { OpenSearchScrollRequest request = new OpenSearchScrollRequest( new OpenSearchRequest.IndexName("test"), + new OpenSearchRequest.IndexName("shard"), TimeValue.timeValueMinutes(1), sourceBuilder, factory @@ -167,6 +173,7 @@ void search_without_scroll_and_initial_request_should_throw() { // Steps: serialize a not used request, deserialize it, then use OpenSearchScrollRequest request = new OpenSearchScrollRequest( new OpenSearchRequest.IndexName("test"), + ROUTING_ID, TimeValue.timeValueMinutes(1), sourceBuilder, factory @@ -177,7 +184,7 @@ void search_without_scroll_and_initial_request_should_throw() { var inStream = new BytesStreamInput(outStream.bytes().toBytesRef().bytes); var indexMock = mock(OpenSearchIndex.class); var engine = mock(OpenSearchStorageEngine.class); - when(engine.getTable(any(), any())).thenReturn(indexMock); + when(engine.getTable(any(), any(), any())).thenReturn(indexMock); var request2 = new OpenSearchScrollRequest(inStream, engine); assertAll( () -> assertFalse(request2.isScroll()), @@ -191,6 +198,7 @@ void search_without_scroll_and_initial_request_should_throw() { void search_withoutIncludes() { OpenSearchScrollRequest request = new OpenSearchScrollRequest( new OpenSearchRequest.IndexName("test"), + ROUTING_ID, TimeValue.timeValueMinutes(1), sourceBuilder, factory @@ -286,7 +294,7 @@ void serialize_deserialize_no_needClean() { var inStream = new BytesStreamInput(stream.bytes().toBytesRef().bytes); var indexMock = mock(OpenSearchIndex.class); var engine = mock(OpenSearchStorageEngine.class); - when(engine.getTable(any(), any())).thenReturn(indexMock); + when(engine.getTable(any(), any(), any())).thenReturn(indexMock); var newRequest = new OpenSearchScrollRequest(inStream, engine); assertEquals(request, newRequest); assertEquals("", newRequest.getScrollId()); @@ -309,7 +317,7 @@ void serialize_deserialize_needClean() { var inStream = new BytesStreamInput(stream.bytes().toBytesRef().bytes); var indexMock = mock(OpenSearchIndex.class); var engine = mock(OpenSearchStorageEngine.class); - when(engine.getTable(any(), any())).thenReturn(indexMock); + when(engine.getTable(any(), any(), any())).thenReturn(indexMock); var newRequest = new OpenSearchScrollRequest(inStream, engine); assertEquals(request, newRequest); assertEquals("", newRequest.getScrollId()); @@ -336,6 +344,6 @@ void includes() { void assertIncludes(List expected, SearchSourceBuilder sourceBuilder) { assertEquals(expected, new OpenSearchScrollRequest( - INDEX_NAME, SCROLL_TIMEOUT, sourceBuilder, factory).getIncludes()); + INDEX_NAME, ROUTING_ID, SCROLL_TIMEOUT, sourceBuilder, factory).getIncludes()); } } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java index 39af59b6cdc..1a11f9bf357 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java @@ -66,6 +66,9 @@ class OpenSearchIndexTest { public static final OpenSearchRequest.IndexName INDEX_NAME = new OpenSearchRequest.IndexName("test"); + public static final OpenSearchRequest.IndexName PARTITION_KEY + = new OpenSearchRequest.IndexName("key"); + @Mock private OpenSearchClient client; @@ -82,7 +85,7 @@ class OpenSearchIndexTest { @BeforeEach void setUp() { - this.index = new OpenSearchIndex(client, settings, "test"); + this.index = new OpenSearchIndex(client, settings, "test", "routing"); } @Test @@ -162,7 +165,7 @@ void checkCacheUsedForFieldMappings() { when(client.getIndexMappings("test")).thenReturn( ImmutableMap.of("test", mapping)); - OpenSearchIndex index = new OpenSearchIndex(client, settings, "test"); + OpenSearchIndex index = new OpenSearchIndex(client, settings, "test", "routing"); assertThat(index.getFieldTypes(), allOf( aMapWithSize(1), hasEntry("name", STRING))); @@ -204,8 +207,10 @@ void implementRelationOperatorOnly() { LogicalPlan plan = index.createScanBuilder(); Integer maxResultWindow = index.getMaxResultWindow(); final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE_LIMIT, exprValueFactory); - assertEquals(new OpenSearchIndexScan(client, - 200, requestBuilder.build(INDEX_NAME, maxResultWindow, SCROLL_TIMEOUT)), + assertEquals(new OpenSearchIndexScan( + client, + 200, + requestBuilder.build(INDEX_NAME, PARTITION_KEY, maxResultWindow, SCROLL_TIMEOUT)), index.implement(index.optimize(plan))); } @@ -217,7 +222,8 @@ void implementRelationOperatorWithOptimization() { Integer maxResultWindow = index.getMaxResultWindow(); final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE_LIMIT, exprValueFactory); assertEquals(new OpenSearchIndexScan(client, 200, - requestBuilder.build(INDEX_NAME, maxResultWindow, SCROLL_TIMEOUT)), index.implement(plan)); + requestBuilder.build( + INDEX_NAME, PARTITION_KEY, maxResultWindow, SCROLL_TIMEOUT)), index.implement(plan)); } @Test @@ -258,9 +264,15 @@ void implementOtherLogicalOperators() { PhysicalPlanDSL.eval( PhysicalPlanDSL.remove( PhysicalPlanDSL.rename( - new OpenSearchIndexScan(client, - QUERY_SIZE_LIMIT, requestBuilder.build(INDEX_NAME, maxResultWindow, - SCROLL_TIMEOUT)), + new OpenSearchIndexScan(client, + QUERY_SIZE_LIMIT, + requestBuilder.build( + INDEX_NAME, + PARTITION_KEY, + maxResultWindow, + SCROLL_TIMEOUT + ) + ), mappings), exclude), newEvalField), diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngineTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngineTest.java index 1089e7e2520..b962260d707 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngineTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngineTest.java @@ -35,7 +35,7 @@ class OpenSearchStorageEngineTest { public void getTable() { OpenSearchStorageEngine engine = new OpenSearchStorageEngine(client, settings); Table table = engine.getTable(new DataSourceSchemaName(DEFAULT_DATASOURCE_NAME, "default"), - "test"); + "test", "shard_test"); assertAll( () -> assertNotNull(table), () -> assertTrue(table instanceof OpenSearchIndex) @@ -46,7 +46,7 @@ public void getTable() { public void getSystemTable() { OpenSearchStorageEngine engine = new OpenSearchStorageEngine(client, settings); Table table = engine.getTable(new DataSourceSchemaName(DEFAULT_DATASOURCE_NAME, "default"), - TABLE_INFO); + TABLE_INFO, "shard_test"); assertAll( () -> assertNotNull(table), () -> assertTrue(table instanceof OpenSearchSystemIndex) diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java index 9e96e3cb6a9..1fcbb3e804c 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java @@ -68,6 +68,9 @@ class OpenSearchIndexScanTest { public static final int QUERY_SIZE = 200; public static final OpenSearchRequest.IndexName INDEX_NAME = new OpenSearchRequest.IndexName("employees"); + + public static final OpenSearchRequest.IndexName ROUTING_ID + = new OpenSearchRequest.IndexName("test_shard"); public static final int MAX_RESULT_WINDOW = 10000; public static final TimeValue CURSOR_KEEP_ALIVE = TimeValue.timeValueMinutes(1); @Mock @@ -111,9 +114,9 @@ void serialize() { var engine = mock(OpenSearchStorageEngine.class); var index = mock(OpenSearchIndex.class); when(engine.getClient()).thenReturn(client); - when(engine.getTable(any(), any())).thenReturn(index); + when(engine.getTable(any(), any(), any())).thenReturn(index); var request = new OpenSearchScrollRequest( - INDEX_NAME, CURSOR_KEEP_ALIVE, searchSourceBuilder, factory); + INDEX_NAME, ROUTING_ID, CURSOR_KEEP_ALIVE, searchSourceBuilder, factory); request.setScrollId("valid-id"); // make a response, so OpenSearchResponse::isEmpty would return true and unset needClean var response = mock(SearchResponse.class); @@ -146,7 +149,7 @@ void query_empty_result() { final var name = new OpenSearchRequest.IndexName("test"); final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE, exprValueFactory); try (OpenSearchIndexScan indexScan = new OpenSearchIndexScan(client, - QUERY_SIZE, requestBuilder.build(name, MAX_RESULT_WINDOW, CURSOR_KEEP_ALIVE))) { + QUERY_SIZE, requestBuilder.build(name, ROUTING_ID, MAX_RESULT_WINDOW, CURSOR_KEEP_ALIVE))) { indexScan.open(); assertFalse(indexScan.hasNext()); } @@ -162,7 +165,7 @@ void query_all_results_with_query() { final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE, exprValueFactory); try (OpenSearchIndexScan indexScan = new OpenSearchIndexScan(client, - 10, requestBuilder.build(INDEX_NAME, 10000, CURSOR_KEEP_ALIVE))) { + 10, requestBuilder.build(INDEX_NAME, ROUTING_ID, 10000, CURSOR_KEEP_ALIVE))) { indexScan.open(); assertAll( @@ -184,6 +187,9 @@ void query_all_results_with_query() { static final OpenSearchRequest.IndexName EMPLOYEES_INDEX = new OpenSearchRequest.IndexName("employees"); + static final OpenSearchRequest.IndexName EMPLOYEES_PARTITION_KEY + = new OpenSearchRequest.IndexName("employkeys"); + @Test void query_all_results_with_scroll() { mockResponse(client, @@ -192,7 +198,7 @@ void query_all_results_with_scroll() { final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE, exprValueFactory); try (OpenSearchIndexScan indexScan = new OpenSearchIndexScan(client, - 10, requestBuilder.build(INDEX_NAME, 10000, CURSOR_KEEP_ALIVE))) { + 10, requestBuilder.build(INDEX_NAME, ROUTING_ID, 10000, CURSOR_KEEP_ALIVE))) { indexScan.open(); assertAll( @@ -222,7 +228,7 @@ void query_some_results_with_query() { final int limit = 3; OpenSearchRequestBuilder builder = new OpenSearchRequestBuilder(0, exprValueFactory); try (OpenSearchIndexScan indexScan = new OpenSearchIndexScan(client, - limit, builder.build(INDEX_NAME, MAX_RESULT_WINDOW, CURSOR_KEEP_ALIVE))) { + limit, builder.build(INDEX_NAME, ROUTING_ID, MAX_RESULT_WINDOW, CURSOR_KEEP_ALIVE))) { indexScan.open(); assertAll( @@ -246,7 +252,7 @@ void query_some_results_with_scroll() { mockTwoPageResponse(client); final var requestuilder = new OpenSearchRequestBuilder(10, exprValueFactory); try (OpenSearchIndexScan indexScan = new OpenSearchIndexScan(client, - 3, requestuilder.build(INDEX_NAME, MAX_RESULT_WINDOW, CURSOR_KEEP_ALIVE))) { + 3, requestuilder.build(INDEX_NAME, ROUTING_ID, MAX_RESULT_WINDOW, CURSOR_KEEP_ALIVE))) { indexScan.open(); assertAll( @@ -282,7 +288,7 @@ void query_results_limited_by_query_size() { final int defaultQuerySize = 2; final var requestBuilder = new OpenSearchRequestBuilder(defaultQuerySize, exprValueFactory); try (OpenSearchIndexScan indexScan = new OpenSearchIndexScan(client, - defaultQuerySize, requestBuilder.build(INDEX_NAME, QUERY_SIZE, CURSOR_KEEP_ALIVE))) { + defaultQuerySize, requestBuilder.build(INDEX_NAME, ROUTING_ID, QUERY_SIZE, CURSOR_KEEP_ALIVE))) { indexScan.open(); assertAll( @@ -381,11 +387,11 @@ PushDownAssertion shouldQueryHighlight(QueryBuilder query, HighlightBuilder high .highlighter(highlight) .sort(DOC_FIELD_NAME, ASC); OpenSearchRequest request = - new OpenSearchQueryRequest(EMPLOYEES_INDEX, sourceBuilder, factory); + new OpenSearchQueryRequest(EMPLOYEES_INDEX, EMPLOYEES_PARTITION_KEY, sourceBuilder, factory); when(client.search(request)).thenReturn(response); var indexScan = new OpenSearchIndexScan(client, - QUERY_SIZE, requestBuilder.build(EMPLOYEES_INDEX, 10000, CURSOR_KEEP_ALIVE)); + QUERY_SIZE, requestBuilder.build(EMPLOYEES_INDEX, ROUTING_ID, 10000, CURSOR_KEEP_ALIVE)); indexScan.open(); return this; } @@ -397,10 +403,10 @@ PushDownAssertion shouldQuery(QueryBuilder expected) { .size(QUERY_SIZE) .timeout(CURSOR_KEEP_ALIVE) .sort(DOC_FIELD_NAME, ASC); - OpenSearchRequest request = new OpenSearchQueryRequest(EMPLOYEES_INDEX, builder, factory); + OpenSearchRequest request = new OpenSearchQueryRequest(EMPLOYEES_INDEX, EMPLOYEES_PARTITION_KEY, builder, factory); when(client.search(request)).thenReturn(response); var indexScan = new OpenSearchIndexScan(client, - 10000, requestBuilder.build(EMPLOYEES_INDEX, 10000, CURSOR_KEEP_ALIVE)); + 10000, requestBuilder.build(EMPLOYEES_INDEX, ROUTING_ID, 10000, CURSOR_KEEP_ALIVE)); indexScan.open(); return this; } diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngine.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngine.java index e19b369a97c..ba7e2d255dd 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngine.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngine.java @@ -13,6 +13,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import org.opensearch.sql.DataSourceSchemaName; import org.opensearch.sql.exception.SemanticCheckException; @@ -43,7 +44,7 @@ public Collection getFunctions() { } @Override - public Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName) { + public Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName, @Nullable String ignored) { if (isSystemIndex(tableName)) { return new PrometheusSystemTable(prometheusClient, dataSourceSchemaName, tableName); } else if (INFORMATION_SCHEMA_NAME.equals(dataSourceSchemaName.getSchemaName())) { diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngineTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngineTest.java index 4e8d4703738..03858ef31f0 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngineTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngineTest.java @@ -35,7 +35,7 @@ class PrometheusStorageEngineTest { @Test public void getTable() { PrometheusStorageEngine engine = new PrometheusStorageEngine(client); - Table table = engine.getTable(new DataSourceSchemaName("prometheus", "default"), "test"); + Table table = engine.getTable(new DataSourceSchemaName("prometheus", "default"), "test", null); assertNotNull(table); assertTrue(table instanceof PrometheusMetricTable); } @@ -57,7 +57,7 @@ public void getFunctions() { @Test public void getSystemTable() { PrometheusStorageEngine engine = new PrometheusStorageEngine(client); - Table table = engine.getTable(new DataSourceSchemaName("prometheus", "default"), TABLE_INFO); + Table table = engine.getTable(new DataSourceSchemaName("prometheus", "default"), TABLE_INFO, "ignored"); assertNotNull(table); assertTrue(table instanceof PrometheusSystemTable); } @@ -66,7 +66,7 @@ public void getSystemTable() { public void getSystemTableForAllTablesInfo() { PrometheusStorageEngine engine = new PrometheusStorageEngine(client); Table table - = engine.getTable(new DataSourceSchemaName("prometheus", "information_schema"), "tables"); + = engine.getTable(new DataSourceSchemaName("prometheus", "information_schema"), "tables", "ignored"); assertNotNull(table); assertTrue(table instanceof PrometheusSystemTable); } @@ -76,7 +76,7 @@ public void getSystemTableWithWrongInformationSchemaTable() { PrometheusStorageEngine engine = new PrometheusStorageEngine(client); SemanticCheckException exception = assertThrows(SemanticCheckException.class, () -> engine.getTable(new DataSourceSchemaName("prometheus", "information_schema"), - "test")); + "test", "ignored")); assertEquals("Information Schema doesn't contain test table", exception.getMessage()); } diff --git a/sql/src/main/antlr/OpenSearchSQLParser.g4 b/sql/src/main/antlr/OpenSearchSQLParser.g4 index 2c3defb9f1c..d0d1de6a685 100644 --- a/sql/src/main/antlr/OpenSearchSQLParser.g4 +++ b/sql/src/main/antlr/OpenSearchSQLParser.g4 @@ -115,10 +115,14 @@ fromClause ; relation - : tableName (AS? alias)? #tableAsRelation + : tableName (AS? alias)? (partitionRelationClause)? #tableAsRelation | LR_BRACKET subquery=querySpecification RR_BRACKET AS? alias #subqueryAsRelation ; +partitionRelationClause + : PARTITION LR_BRACKET functionArgs RR_BRACKET + ; + whereClause : WHERE expression ; diff --git a/sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java b/sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java index c9321f57758..dccf3994285 100644 --- a/sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java +++ b/sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java @@ -7,6 +7,7 @@ package org.opensearch.sql.sql.domain; import java.util.Collections; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; @@ -28,8 +29,9 @@ @RequiredArgsConstructor public class SQLQueryRequest { private static final String QUERY_FIELD_CURSOR = "cursor"; + private static final String QUERY_FIELD_ROUTING = "routing"; private static final Set SUPPORTED_FIELDS = Set.of( - "query", "fetch_size", "parameters", QUERY_FIELD_CURSOR); + "query", "fetch_size", "parameters", QUERY_FIELD_CURSOR, QUERY_FIELD_ROUTING); private static final String QUERY_PARAMS_FORMAT = "format"; private static final String QUERY_PARAMS_SANITIZE = "sanitize"; @@ -65,11 +67,13 @@ public class SQLQueryRequest { private String cursor; + private List routingIds; + /** * Constructor of SQLQueryRequest that passes request params. */ public SQLQueryRequest(JSONObject jsonContent, String query, String path, - Map params, String cursor) { + Map params, String cursor, List routingIds) { this.jsonContent = jsonContent; this.query = query; this.path = path; @@ -77,6 +81,7 @@ public SQLQueryRequest(JSONObject jsonContent, String query, String path, this.format = getFormat(params); this.sanitize = shouldSanitize(params); this.cursor = cursor; + this.routingIds = routingIds; } /** diff --git a/sql/src/main/java/org/opensearch/sql/sql/parser/AstBuilder.java b/sql/src/main/java/org/opensearch/sql/sql/parser/AstBuilder.java index 020889c082c..d197bf22fc2 100644 --- a/sql/src/main/java/org/opensearch/sql/sql/parser/AstBuilder.java +++ b/sql/src/main/java/org/opensearch/sql/sql/parser/AstBuilder.java @@ -21,8 +21,11 @@ import com.google.common.collect.ImmutableList; import java.util.Collections; +import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; +import org.antlr.v4.runtime.RuleContext; import org.antlr.v4.runtime.tree.ParseTree; import org.opensearch.sql.ast.expression.Alias; import org.opensearch.sql.ast.expression.AllFields; @@ -188,7 +191,16 @@ private void verifySupportsCondition(UnresolvedExpression func) { public UnresolvedPlan visitTableAsRelation(TableAsRelationContext ctx) { String tableAlias = (ctx.alias() == null) ? null : StringUtils.unquoteIdentifier(ctx.alias().getText()); - return new Relation(visitAstExpression(ctx.tableName()), tableAlias); + if(ctx.partitionRelationClause() == null) { + return new Relation(visitAstExpression(ctx.tableName()), tableAlias); + } + return new Relation( + visitAstExpression(ctx.tableName()), + tableAlias, + ctx.partitionRelationClause().functionArgs().functionArg().stream() + .map(RuleContext::getText) + .collect(Collectors.toList()) + ); } @Override diff --git a/sql/src/test/java/org/opensearch/sql/sql/SQLServiceTest.java b/sql/src/test/java/org/opensearch/sql/sql/SQLServiceTest.java index f4342d877df..175d4d82c53 100644 --- a/sql/src/test/java/org/opensearch/sql/sql/SQLServiceTest.java +++ b/sql/src/test/java/org/opensearch/sql/sql/SQLServiceTest.java @@ -13,6 +13,7 @@ import static org.mockito.Mockito.doAnswer; import static org.opensearch.sql.executor.ExecutionEngine.QueryResponse; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.json.JSONObject; @@ -80,7 +81,7 @@ public void onFailure(Exception e) { @Test public void can_execute_cursor_query() { sqlService.execute( - new SQLQueryRequest(new JSONObject(), null, QUERY, Map.of("format", "jdbc"), "n:cursor"), + new SQLQueryRequest(new JSONObject(), null, QUERY, Map.of("format", "jdbc"), "n:cursor", List.of()), new ResponseListener<>() { @Override public void onResponse(QueryResponse response) { @@ -98,7 +99,7 @@ public void onFailure(Exception e) { public void can_execute_close_cursor_query() { sqlService.execute( new SQLQueryRequest(new JSONObject(), null, QUERY + "/close", - Map.of("format", "jdbc"), "n:cursor"), + Map.of("format", "jdbc"), "n:cursor", List.of()), new ResponseListener<>() { @Override public void onResponse(QueryResponse response) { @@ -154,7 +155,7 @@ public void onFailure(Exception e) { @Test public void cannot_explain_cursor_query() { sqlService.explain(new SQLQueryRequest(new JSONObject(), null, EXPLAIN, - Map.of("format", "jdbc"), "n:cursor"), + Map.of("format", "jdbc"), "n:cursor", List.of()), new ResponseListener() { @Override public void onResponse(ExplainResponse response) { diff --git a/sql/src/test/java/org/opensearch/sql/sql/domain/SQLQueryRequestTest.java b/sql/src/test/java/org/opensearch/sql/sql/domain/SQLQueryRequestTest.java index 1ffa4f0fa8f..7a492131985 100644 --- a/sql/src/test/java/org/opensearch/sql/sql/domain/SQLQueryRequestTest.java +++ b/sql/src/test/java/org/opensearch/sql/sql/domain/SQLQueryRequestTest.java @@ -14,6 +14,7 @@ import com.google.common.collect.ImmutableMap; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.json.JSONObject; import org.junit.jupiter.api.DisplayNameGeneration; @@ -289,6 +290,7 @@ private static class SQLQueryRequestBuilder { private String format; private String cursor; private Map params = new HashMap<>(); + private List routingIds = List.of(); static SQLQueryRequestBuilder request(String query) { SQLQueryRequestBuilder builder = new SQLQueryRequestBuilder(); @@ -326,7 +328,7 @@ SQLQueryRequest build() { params.put("format", format); } return new SQLQueryRequest(jsonContent == null ? null : new JSONObject(jsonContent), - query, path, params, cursor); + query, path, params, cursor, routingIds); } } From 1b6e54308eaaaf4d327b9a82e25e2758cf07fffb Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Thu, 13 Jul 2023 17:44:02 -0700 Subject: [PATCH 03/13] Fix core refactor: StreamIO from common to core.common (#296) * Fix core refactor: StreamIO from common to core.common Signed-off-by: acarbonetto * Fix core refactor: StreamIO from common to core.common Signed-off-by: acarbonetto --------- Signed-off-by: acarbonetto From 382e955091b0b20a8ae1907e6d77b9981ed18171 Mon Sep 17 00:00:00 2001 From: acarbonetto Date: Mon, 17 Jul 2023 19:48:56 -0700 Subject: [PATCH 04/13] Add IT tests for multi-cluster Signed-off-by: acarbonetto --- .../org/opensearch/sql/ast/tree/Relation.java | 4 +- integ-test/build.gradle | 62 ++++++ .../sql/legacy/OpenSearchSQLRestTestCase.java | 14 +- .../CrossClusterSearchIT.java | 145 +++++++++++++ .../multiClusterSearch/ShardingSearchIT.java | 191 ++++++++++++++++++ .../sql/ppl/CrossClusterSearchIT.java | 2 +- .../org/opensearch/sql/sql/IdentifierIT.java | 58 +++++- .../request/OpenSearchQueryRequest.java | 11 +- .../request/OpenSearchScrollRequest.java | 4 +- .../response/OpenSearchResponse.java | 5 +- .../opensearch/storage/OpenSearchIndex.java | 2 +- .../storage/OpenSearchStorageEngine.java | 2 +- .../sql/spark/storage/SparkStorageEngine.java | 2 +- .../spark/storage/SparkStorageEngineTest.java | 2 +- 14 files changed, 481 insertions(+), 23 deletions(-) create mode 100644 integ-test/src/test/java/org/opensearch/sql/multiClusterSearch/CrossClusterSearchIT.java create mode 100644 integ-test/src/test/java/org/opensearch/sql/multiClusterSearch/ShardingSearchIT.java diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/Relation.java b/core/src/main/java/org/opensearch/sql/ast/tree/Relation.java index 9eddac30a8a..b55f297cb2f 100644 --- a/core/src/main/java/org/opensearch/sql/ast/tree/Relation.java +++ b/core/src/main/java/org/opensearch/sql/ast/tree/Relation.java @@ -102,10 +102,10 @@ public QualifiedName getTableQualifiedName() { /** * Retrieve the partition keys associated with the table/relation * - * @return TablePartitionKeys. + * @return TablePartitionKeys | null */ public String getTablePartitionKeys() { - return String.join(COMMA, partitionKeys); + return partitionKeys == null ? null : String.join(COMMA, partitionKeys); } @Override diff --git a/integ-test/build.gradle b/integ-test/build.gradle index 4a5f2015e0d..06d3682efe1 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -481,6 +481,68 @@ task bwcTestSuite(type: StandaloneRestIntegTestTask) { dependsOn tasks.named("${baseName}#fullRestartClusterTask") } +testClusters { + multiClusterSearch { + testDistribution = "ARCHIVE" + numberOfNodes = 3 + plugin ":opensearch-sql-plugin" + } +} + +task multiClusterSearch(type: RestIntegTestTask) { + useCluster testClusters.multiClusterSearch + + testLogging { + events "passed", "skipped", "failed" + } + + // Set properties for connection to clusters and between clusters + doFirst { + getClusters().forEach { cluster -> + String allTransportSocketURI = cluster.nodes.stream().flatMap { node -> + node.getAllTransportPortURI().stream() + }.collect(Collectors.joining(",")) + String allHttpSocketURI = cluster.nodes.stream().flatMap { node -> + node.getAllHttpSocketURI().stream() + }.collect(Collectors.joining(",")) + + systemProperty "tests.rest.${cluster.name}.http_hosts", "${-> allHttpSocketURI}" + systemProperty "tests.rest.${cluster.name}.transport_hosts", "${-> allTransportSocketURI}" + } + } + + dependsOn ':opensearch-sql-plugin:bundlePlugin' + + systemProperty 'tests.security.manager', 'false' + systemProperty('project.root', project.projectDir.absolutePath) + + systemProperty "https", System.getProperty("https") + systemProperty "user", System.getProperty("user") + systemProperty "password", System.getProperty("password") + + // Set default query size limit + systemProperty 'defaultQuerySizeLimit', '10000' + + // Tell the test JVM if the cluster JVM is running under a debugger so that tests can use longer timeouts for + // requests. The 'doFirst' delays reading the debug setting on the cluster till execution time. + doFirst { + if (System.getProperty("debug-jvm") != null) { + setDebug(true); + } + systemProperty 'cluster.debug', getDebug() + } + + + if (System.getProperty("test.debug") != null) { + jvmArgs '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5006' + } + + filter { + includeTestsMatching "org.opensearch.sql.multiClusterSearch.*IT" + } +} + + def opensearch_tmp_dir = rootProject.file('build/private/es_tmp').absoluteFile opensearch_tmp_dir.mkdirs() diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/OpenSearchSQLRestTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/OpenSearchSQLRestTestCase.java index e057c589690..7bc4872d5c9 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/OpenSearchSQLRestTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/OpenSearchSQLRestTestCase.java @@ -50,6 +50,8 @@ public abstract class OpenSearchSQLRestTestCase extends OpenSearchRestTestCase { private static final Logger LOG = LogManager.getLogger(); public static final String REMOTE_CLUSTER = "remoteCluster"; + + public static final String MULTI_REMOTE_CLUSTER = "multiClusterSearch"; public static final String MATCH_ALL_REMOTE_CLUSTER = "*"; private static RestClient remoteClient; @@ -105,10 +107,10 @@ protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOE } // Modified from initClient in OpenSearchRestTestCase - public void initRemoteClient() throws IOException { + public void initRemoteClient(String clusterName) throws IOException { if (remoteClient == null) { assert remoteAdminClient == null; - String cluster = getTestRestCluster(REMOTE_CLUSTER); + String cluster = getTestRestCluster(clusterName); String[] stringUrls = cluster.split(","); List hosts = new ArrayList<>(stringUrls.length); for (String stringUrl : stringUrls) { @@ -252,14 +254,14 @@ protected static void configureHttpsClient(RestClientBuilder builder, Settings s * Initialize rest client to remote cluster, * and create a connection to it from the coordinating cluster. */ - public void configureMultiClusters() throws IOException { - initRemoteClient(); + public void configureMultiClusters(String clusterName) throws IOException { + initRemoteClient(clusterName); Request connectionRequest = new Request("PUT", "_cluster/settings"); String connectionSetting = "{\"persistent\": {\"cluster\": {\"remote\": {\"" - + REMOTE_CLUSTER + + clusterName + "\": {\"seeds\": [\"" - + getTestTransportCluster(REMOTE_CLUSTER).split(",")[0] + + getTestTransportCluster(clusterName).split(",")[0] + "\"]}}}}}"; connectionRequest.setJsonEntity(connectionSetting); adminClient().performRequest(connectionRequest); diff --git a/integ-test/src/test/java/org/opensearch/sql/multiClusterSearch/CrossClusterSearchIT.java b/integ-test/src/test/java/org/opensearch/sql/multiClusterSearch/CrossClusterSearchIT.java new file mode 100644 index 00000000000..4a24f4a8c9f --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/multiClusterSearch/CrossClusterSearchIT.java @@ -0,0 +1,145 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + + +package org.opensearch.sql.multiClusterSearch; + +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_DOG; +import static org.opensearch.sql.util.MatcherUtils.columnName; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.verifyColumn; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; + +import java.io.IOException; +import org.json.JSONObject; +import org.junit.Rule; +import org.junit.jupiter.api.Test; +import org.junit.rules.ExpectedException; +import org.opensearch.client.ResponseException; +import org.opensearch.sql.ppl.PPLIntegTestCase; + +public class CrossClusterSearchIT extends PPLIntegTestCase { + + @Rule + public ExpectedException exceptionRule = ExpectedException.none(); + + private final static String TEST_INDEX_BANK_REMOTE = REMOTE_CLUSTER + ":" + TEST_INDEX_BANK; + private final static String TEST_INDEX_DOG_REMOTE = REMOTE_CLUSTER + ":" + TEST_INDEX_DOG; + private final static String TEST_INDEX_DOG_MATCH_ALL_REMOTE = MATCH_ALL_REMOTE_CLUSTER + ":" + TEST_INDEX_DOG; + private final static String TEST_INDEX_ACCOUNT_REMOTE = REMOTE_CLUSTER + ":" + TEST_INDEX_ACCOUNT; + + @Override + public void init() throws IOException { + configureMultiClusters(MULTI_REMOTE_CLUSTER); + loadIndex(Index.BANK); + loadIndex(Index.BANK, remoteClient()); + loadIndex(Index.DOG); + loadIndex(Index.DOG, remoteClient()); + loadIndex(Index.ACCOUNT, remoteClient()); + } + + @Test + public void testCrossClusterSearchAllFields() throws IOException { + JSONObject result = executeQuery(String.format("search source=%s", TEST_INDEX_DOG_REMOTE)); + verifyColumn(result, columnName("dog_name"), columnName("holdersName"), columnName("age")); + } + + @Test + public void testMatchAllCrossClusterSearchAllFields() throws IOException { + JSONObject result = executeQuery(String.format("search source=%s", TEST_INDEX_DOG_MATCH_ALL_REMOTE)); + verifyColumn(result, columnName("dog_name"), columnName("holdersName"), columnName("age")); + } + + @Test + public void testCrossClusterSearchWithoutLocalFieldMappingShouldFail() throws IOException { + exceptionRule.expect(ResponseException.class); + exceptionRule.expectMessage("400 Bad Request"); + exceptionRule.expectMessage("IndexNotFoundException"); + + executeQuery(String.format("search source=%s", TEST_INDEX_ACCOUNT_REMOTE)); + } + + @Test + public void testCrossClusterSearchCommandWithLogicalExpression() throws IOException { + JSONObject result = executeQuery(String.format( + "search source=%s firstname='Hattie' | fields firstname", TEST_INDEX_BANK_REMOTE)); + verifyDataRows(result, rows("Hattie")); + } + + @Test + public void testCrossClusterSearchMultiClusters() throws IOException { + JSONObject result = executeQuery(String.format( + "search source=%s,%s firstname='Hattie' | fields firstname", TEST_INDEX_BANK_REMOTE, TEST_INDEX_BANK)); + verifyDataRows(result, + rows("Hattie"), + rows("Hattie")); + } + + @Test + public void testCrossClusterDescribeAllFields() throws IOException { + JSONObject result = executeQuery(String.format("describe %s", TEST_INDEX_DOG_REMOTE)); + verifyColumn( + result, + columnName("TABLE_CAT"), + columnName("TABLE_SCHEM"), + columnName("TABLE_NAME"), + columnName("COLUMN_NAME"), + columnName("DATA_TYPE"), + columnName("TYPE_NAME"), + columnName("COLUMN_SIZE"), + columnName("BUFFER_LENGTH"), + columnName("DECIMAL_DIGITS"), + columnName("NUM_PREC_RADIX"), + columnName("NULLABLE"), + columnName("REMARKS"), + columnName("COLUMN_DEF"), + columnName("SQL_DATA_TYPE"), + columnName("SQL_DATETIME_SUB"), + columnName("CHAR_OCTET_LENGTH"), + columnName("ORDINAL_POSITION"), + columnName("IS_NULLABLE"), + columnName("SCOPE_CATALOG"), + columnName("SCOPE_SCHEMA"), + columnName("SCOPE_TABLE"), + columnName("SOURCE_DATA_TYPE"), + columnName("IS_AUTOINCREMENT"), + columnName("IS_GENERATEDCOLUMN") + ); + } + + @Test + public void testMatchAllCrossClusterDescribeAllFields() throws IOException { + JSONObject result = executeQuery(String.format("describe %s", TEST_INDEX_DOG_MATCH_ALL_REMOTE)); + verifyColumn( + result, + columnName("TABLE_CAT"), + columnName("TABLE_SCHEM"), + columnName("TABLE_NAME"), + columnName("COLUMN_NAME"), + columnName("DATA_TYPE"), + columnName("TYPE_NAME"), + columnName("COLUMN_SIZE"), + columnName("BUFFER_LENGTH"), + columnName("DECIMAL_DIGITS"), + columnName("NUM_PREC_RADIX"), + columnName("NULLABLE"), + columnName("REMARKS"), + columnName("COLUMN_DEF"), + columnName("SQL_DATA_TYPE"), + columnName("SQL_DATETIME_SUB"), + columnName("CHAR_OCTET_LENGTH"), + columnName("ORDINAL_POSITION"), + columnName("IS_NULLABLE"), + columnName("SCOPE_CATALOG"), + columnName("SCOPE_SCHEMA"), + columnName("SCOPE_TABLE"), + columnName("SOURCE_DATA_TYPE"), + columnName("IS_AUTOINCREMENT"), + columnName("IS_GENERATEDCOLUMN") + ); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/multiClusterSearch/ShardingSearchIT.java b/integ-test/src/test/java/org/opensearch/sql/multiClusterSearch/ShardingSearchIT.java new file mode 100644 index 00000000000..d13b4f1c56c --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/multiClusterSearch/ShardingSearchIT.java @@ -0,0 +1,191 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + + +package org.opensearch.sql.multiClusterSearch; + +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifySchema; +import static org.opensearch.sql.util.TestUtils.createHiddenIndexByRestClient; +import static org.opensearch.sql.util.TestUtils.performRequest; + +import java.io.IOException; +import org.json.JSONObject; +import org.junit.Rule; +import org.junit.jupiter.api.Test; +import org.junit.rules.ExpectedException; +import org.opensearch.client.Request; +import org.opensearch.sql.legacy.SQLIntegTestCase; + +public class ShardingSearchIT extends SQLIntegTestCase { + + @Rule + public ExpectedException exceptionRule = ExpectedException.none(); + + private final static String MAPPING = "{ " + + "\"settings\": {" + + " \"number_of_shards\": 3" + + "}," + + "\"mappings\" : {" + + " \"_routing\": { \"required\": true }," + + " \"properties\" : { " + + " \"age\" : { \"type\" : \"long\" } } } }"; + + @Override + public void init() throws IOException { + configureMultiClusters(MULTI_REMOTE_CLUSTER); + } + + @Test + public void testMetafieldIdentifierRoutingTest() throws IOException { + // create an index, but the contents doesn't really matter + String index = "test.routing_partition"; + new Index(index, MAPPING) + .addDocWithShardId("{\"age\": 31}", "test0", "test0") + .addDocWithShardId("{\"age\": 31}", "test1", "test1") + .addDocWithShardId("{\"age\": 32}", "test2", "test2") + .addDocWithShardId("{\"age\": 33}", "test3", "test3") + .addDocWithShardId("{\"age\": 34}", "test4", "test4") + .addDocWithShardId("{\"age\": 35}", "test5", "test5"); + + // Execute using field metadata values filtering on the routing shard hash id + String query = "SELECT age, _id, _index, _routing " + + "FROM " + index + " partition(test4)"; + final JSONObject result = new JSONObject(executeQuery(query, "jdbc")); + + // Verify that the metadata values are returned when requested + verifySchema(result, + schema("age", null, "long"), + schema("_id", null, "keyword"), + schema("_index", null, "keyword"), + schema("_routing", null, "keyword")); + assertTrue(result.getJSONArray("schema").length() == 4); + + // expect AT LEAST one result as we're requested all data on a single shard, + // but multiple _routing hashes can point to the same shard + var datarows = result.getJSONArray("datarows"); + assertTrue(datarows.length() > 0); + } + + @Test + public void testMetafieldIdentifierRoutingWhereTest() throws IOException { + // create an index, but the contents doesn't really matter + String index = "test.routing_where"; + new Index(index, MAPPING) + .addDocWithShardId("{\"age\": 31}", "test0", "test0") + .addDocWithShardId("{\"age\": 31}", "test1", "test1") + .addDocWithShardId("{\"age\": 32}", "test2", "test2") + .addDocWithShardId("{\"age\": 33}", "test3", "test3") + .addDocWithShardId("{\"age\": 34}", "test4", "test4") + .addDocWithShardId("{\"age\": 35}", "test5", "test5"); + + // Execute using field metadata values filtering on the routing shard hash id + String query = "SELECT age, _id, _index, _routing " + + "FROM " + index + " partition(test4) " + + "WHERE _routing='test4'"; + final JSONObject result = new JSONObject(executeQuery(query, "jdbc")); + + // Verify that the metadata values are returned when requested + verifySchema(result, + schema("age", null, "long"), + schema("_id", null, "keyword"), + schema("_index", null, "keyword"), + schema("_routing", null, "keyword")); + assertTrue(result.getJSONArray("schema").length() == 4); + + // expect exactly one result as we're filtering on the _routing shard + var datarows = result.getJSONArray("datarows"); + assertEquals(1, datarows.length()); + + assertEquals("test4", datarows.getJSONArray(0).getString(1)); + assertEquals(index, datarows.getJSONArray(0).getString(2)); + assertEquals("test4", datarows.getJSONArray(0).getString(3)); + } + + @Test + public void testMetafieldIdentifierRoutingWithoutPartitionTest() throws IOException { + // create an index with 3 shards + String index = "test.routing_empty"; + new Index(index, MAPPING) + .addDocWithShardId("{\"age\": 31}", "test0", "test0") + .addDocWithShardId("{\"age\": 31}", "test1", "test1") + .addDocWithShardId("{\"age\": 32}", "test2", "test2") + .addDocWithShardId("{\"age\": 33}", "test3", "test3") + .addDocWithShardId("{\"age\": 34}", "test4", "test4") + .addDocWithShardId("{\"age\": 35}", "test5", "test5"); + + // Execute using field metadata values filtering on the routing shard hash id + String query = "SELECT age, _id, _index, _routing " + + "FROM " + index; + final JSONObject result = new JSONObject(executeQuery(query, "jdbc")); + + // Verify that the metadata values are returned when requested + verifySchema(result, + schema("age", null, "long"), + schema("_id", null, "keyword"), + schema("_index", null, "keyword"), + schema("_routing", null, "keyword")); + assertTrue(result.getJSONArray("schema").length() == 4); + + // expect that when partition/shard is not specified, all data is returned + var datarows = result.getJSONArray("datarows"); + assertEquals(6, datarows.length()); + } + + @Test + public void testWithoutRoutingThrowsTest() throws IOException { + Index index = new Index("test.routing_error", MAPPING); + + // routing is required when pushing data + final IllegalStateException exception = expectThrows(IllegalStateException.class, + () -> index.addDoc("{\"age\": 31}", "test0")); + } + + /** + * Index abstraction for test code readability. + */ + private static class Index { + + private final String indexName; + + Index(String indexName) throws IOException { + this.indexName = indexName; + + if (indexName.startsWith(".")) { + createHiddenIndexByRestClient(client(), indexName, ""); + } else { + executeRequest(new Request("PUT", "/" + indexName)); + } + } + + Index(String indexName, String mapping) throws IOException { + this.indexName = indexName; + + Request createIndex = new Request("PUT", "/" + indexName); + createIndex.setJsonEntity(mapping); + executeRequest(createIndex); + } + + void addDoc(String doc) { + Request indexDoc = new Request("POST", String.format("/%s/_doc?refresh=true", indexName)); + indexDoc.setJsonEntity(doc); + performRequest(client(), indexDoc); + } + + public Index addDoc(String doc, String id) { + Request indexDoc = new Request("POST", String.format("/%s/_doc/%s?refresh=true", indexName, id)); + indexDoc.setJsonEntity(doc); + performRequest(client(), indexDoc); + return this; + } + + public Index addDocWithShardId(String doc, String id, String routing) { + Request indexDoc = new Request("POST", String.format("/%s/_doc/%s?refresh=true&routing=%s", indexName, id, routing)); + indexDoc.setJsonEntity(doc); + performRequest(client(), indexDoc); + return this; + } + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/CrossClusterSearchIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/CrossClusterSearchIT.java index a8e686a8930..8f5a464819f 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/CrossClusterSearchIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/CrossClusterSearchIT.java @@ -33,7 +33,7 @@ public class CrossClusterSearchIT extends PPLIntegTestCase { @Override public void init() throws IOException { - configureMultiClusters(); + configureMultiClusters(REMOTE_CLUSTER); loadIndex(Index.BANK); loadIndex(Index.BANK, remoteClient()); loadIndex(Index.DOG); diff --git a/integ-test/src/test/java/org/opensearch/sql/sql/IdentifierIT.java b/integ-test/src/test/java/org/opensearch/sql/sql/IdentifierIT.java index 22632cc4def..1789927e5eb 100644 --- a/integ-test/src/test/java/org/opensearch/sql/sql/IdentifierIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/sql/IdentifierIT.java @@ -103,9 +103,14 @@ public void testMetafieldIdentifierTest() throws IOException { @Test public void testMetafieldIdentifierRoutingSelectTest() throws IOException { - // create an index, but the contents doesn't really matter + // create an index with routing required String index = "test.routing_select"; - String mapping = "{\"_routing\": {\"required\": true }}"; + + final String mapping = "{ " + + "\"mappings\" : {" + + " \"_routing\": { \"required\": true }," + + " \"properties\" : { " + + " \"age\" : { \"type\" : \"long\" } } } }"; new Index(index, mapping) .addDocWithShardId("{\"age\": 31}", "test0", "test0") .addDocWithShardId("{\"age\": 31}", "test1", "test1") @@ -139,11 +144,56 @@ public void testMetafieldIdentifierRoutingSelectTest() throws IOException { } } + @Test + public void testMetafieldIdentifierRoutingPartitionTest() throws IOException { + // create an index with routing required + String index = "test.routing_partition"; + final String mapping = "{ " + + "\"mappings\" : {" + + " \"_routing\": { \"required\": true }," + + " \"properties\" : { " + + " \"age\" : { \"type\" : \"long\" } } } }"; + new Index(index, mapping) + .addDocWithShardId("{\"age\": 31}", "test0", "test0") + .addDocWithShardId("{\"age\": 31}", "test1", "test1") + .addDocWithShardId("{\"age\": 32}", "test2", "test2") + .addDocWithShardId("{\"age\": 33}", "test3", "test3") + .addDocWithShardId("{\"age\": 34}", "test4", "test4") + .addDocWithShardId("{\"age\": 35}", "test5", "test5"); + + // Execute using field metadata values filtering on the routing shard hash id + String query = "SELECT age, _id, _index, _routing " + + "FROM " + index + " PARTITION(test4,test5)"; + final JSONObject result = new JSONObject(executeQuery(query, "jdbc")); + + // Verify that the metadata values are returned when requested + verifySchema(result, + schema("age", null, "long"), + schema("_id", null, "keyword"), + schema("_index", null, "keyword"), + schema("_routing", null, "keyword")); + assertTrue(result.getJSONArray("schema").length() == 4); + + var datarows = result.getJSONArray("datarows"); + assertEquals(2, datarows.length()); + + // note that _routing in the SELECT clause returns the shard + for (int i = 0; i < 2; i++) { + assertEquals("test" + i, datarows.getJSONArray(i).getString(1)); + assertEquals(index, datarows.getJSONArray(i).getString(2)); + assertTrue(datarows.getJSONArray(i).getString(3).contains("[" + index + "]")); + } + } + @Test public void testMetafieldIdentifierRoutingFilterTest() throws IOException { // create an index, but the contents doesn't really matter String index = "test.routing_filter"; - String mapping = "{\"_routing\": {\"required\": true }}"; + final String mapping = "{ " + + "\"mappings\" : {" + + " \"_routing\": { \"required\": true }," + + " \"properties\" : { " + + " \"age\" : { \"type\" : \"long\" } } } }"; new Index(index, mapping) .addDocWithShardId("{\"age\": 31}", "test1", "test1") .addDocWithShardId("{\"age\": 32}", "test2", "test2") @@ -233,7 +283,7 @@ private static class Index { Request createIndex = new Request("PUT", "/" + indexName); createIndex.setJsonEntity(mapping); - executeRequest(new Request("PUT", "/" + indexName)); + executeRequest(createIndex); } void addDoc(String doc) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java index 53c6e93854c..44538d493a7 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java @@ -112,11 +112,14 @@ public OpenSearchResponse search(Function searchA return new OpenSearchResponse(SearchHits.empty(), exprValueFactory, includes); } else { searchDone = true; + SearchRequest searchRequest = new SearchRequest() + .indices(indexName.getIndexNames()) + .source(sourceBuilder); + if (getRoutingId() != null) { + searchRequest.routing(getRoutingId().getIndexNames()); + } return new OpenSearchResponse( - searchAction.apply(new SearchRequest() - .indices(indexName.getIndexNames()) - .source(sourceBuilder) - .routing(getRoutingId().getIndexNames())), exprValueFactory, includes); + searchAction.apply(searchRequest), exprValueFactory, includes); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java index 33bbd69dc18..8588469246d 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java @@ -91,9 +91,11 @@ public OpenSearchScrollRequest(IndexName indexName, this.exprValueFactory = exprValueFactory; this.initialSearchRequest = new SearchRequest() .indices(indexName.getIndexNames()) - .routing(routingId.getIndexNames()) .scroll(scrollTimeout) .source(sourceBuilder); + if (routingId != null) { + this.initialSearchRequest.routing(routingId.getIndexNames()); + } includes = sourceBuilder.fetchSource() == null ? List.of() diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java index 03abfbf6c11..4bef38642ce 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java @@ -189,7 +189,10 @@ private void addMetaDataFieldsToBuilder( } else if (metaDataField.equals(METADATA_FIELD_SORT)) { builder.put(METADATA_FIELD_SORT, new ExprLongValue(hit.getSeqNo())); } else { // if (metaDataField.equals(METADATA_FIELD_ROUTING)){ - builder.put(METADATA_FIELD_ROUTING, new ExprStringValue(hit.getShard().toString())); + var routing = hit.getFields().getOrDefault("_routing", null); + if (routing != null) { + builder.put(METADATA_FIELD_ROUTING, new ExprStringValue(routing.getValue())); + } } }); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index 68960a19042..c1ad26c8eff 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -94,7 +94,7 @@ public OpenSearchIndex(OpenSearchClient client, Settings settings, String indexN this.client = client; this.settings = settings; this.indexName = new OpenSearchRequest.IndexName(indexName); - this.routingId = new OpenSearchRequest.IndexName(routingId); + this.routingId = routingId == null ? null : new OpenSearchRequest.IndexName(routingId); } @Override diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java index 24ea8b0febf..d4052b129c4 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java @@ -34,7 +34,7 @@ public Table getTable(DataSourceSchemaName dataSourceSchemaName, String name, @N // TODO: handle routingId on system tables too? return new OpenSearchSystemIndex(client, name); } else { - return new OpenSearchIndex(client, settings, name, routingId == null ? "" : routingId); + return new OpenSearchIndex(client, settings, name, routingId); } } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java index a5e35ecc4cb..e9e4b9bbc52 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java +++ b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java @@ -29,7 +29,7 @@ public Collection getFunctions() { } @Override - public Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName) { + public Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName, String parition) { throw new RuntimeException("Unable to get table from storage engine."); } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java index d42e123678d..1ca218b8c8d 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageEngineTest.java @@ -40,7 +40,7 @@ public void getFunctions() { public void getTable() { SparkStorageEngine engine = new SparkStorageEngine(client); RuntimeException exception = assertThrows(RuntimeException.class, - () -> engine.getTable(new DataSourceSchemaName("spark", "default"), "")); + () -> engine.getTable(new DataSourceSchemaName("spark", "default"), "", null)); assertEquals("Unable to get table from storage engine.", exception.getMessage()); } } From 6ca8fec159168c9bd92869dac77aae6a59d29d6e Mon Sep 17 00:00:00 2001 From: acarbonetto Date: Wed, 26 Jul 2023 16:38:56 -0700 Subject: [PATCH 05/13] Fix cross-cluster tests Signed-off-by: acarbonetto --- .../sql/multiClusterSearch/CrossClusterSearchIT.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/integ-test/src/test/java/org/opensearch/sql/multiClusterSearch/CrossClusterSearchIT.java b/integ-test/src/test/java/org/opensearch/sql/multiClusterSearch/CrossClusterSearchIT.java index 4a24f4a8c9f..f3713241b3a 100644 --- a/integ-test/src/test/java/org/opensearch/sql/multiClusterSearch/CrossClusterSearchIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/multiClusterSearch/CrossClusterSearchIT.java @@ -27,10 +27,10 @@ public class CrossClusterSearchIT extends PPLIntegTestCase { @Rule public ExpectedException exceptionRule = ExpectedException.none(); - private final static String TEST_INDEX_BANK_REMOTE = REMOTE_CLUSTER + ":" + TEST_INDEX_BANK; - private final static String TEST_INDEX_DOG_REMOTE = REMOTE_CLUSTER + ":" + TEST_INDEX_DOG; + private final static String TEST_INDEX_BANK_REMOTE = MULTI_REMOTE_CLUSTER + ":" + TEST_INDEX_BANK; + private final static String TEST_INDEX_DOG_REMOTE = MULTI_REMOTE_CLUSTER + ":" + TEST_INDEX_DOG; private final static String TEST_INDEX_DOG_MATCH_ALL_REMOTE = MATCH_ALL_REMOTE_CLUSTER + ":" + TEST_INDEX_DOG; - private final static String TEST_INDEX_ACCOUNT_REMOTE = REMOTE_CLUSTER + ":" + TEST_INDEX_ACCOUNT; + private final static String TEST_INDEX_ACCOUNT_REMOTE = MULTI_REMOTE_CLUSTER + ":" + TEST_INDEX_ACCOUNT; @Override public void init() throws IOException { From 8edcc54489b86a41e3e50eb66f4f0ccc397819cd Mon Sep 17 00:00:00 2001 From: acarbonetto Date: Wed, 26 Jul 2023 17:53:43 -0700 Subject: [PATCH 06/13] Revert query parameter 'routing' Signed-off-by: acarbonetto --- .../org/opensearch/sql/ast/tree/Relation.java | 3 ++- .../sql/legacy/plugin/RestSqlAction.java | 2 +- .../sql/legacy/request/SqlRequest.java | 8 +------- .../sql/legacy/request/SqlRequestFactory.java | 17 +---------------- .../RestSQLQueryActionCursorFallbackTest.java | 2 +- .../sql/sql/domain/SQLQueryRequest.java | 7 ++----- .../org/opensearch/sql/sql/SQLServiceTest.java | 6 +++--- .../sql/sql/domain/SQLQueryRequestTest.java | 4 +--- 8 files changed, 12 insertions(+), 37 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/Relation.java b/core/src/main/java/org/opensearch/sql/ast/tree/Relation.java index b55f297cb2f..cb5cc52aea1 100644 --- a/core/src/main/java/org/opensearch/sql/ast/tree/Relation.java +++ b/core/src/main/java/org/opensearch/sql/ast/tree/Relation.java @@ -105,7 +105,8 @@ public QualifiedName getTableQualifiedName() { * @return TablePartitionKeys | null */ public String getTablePartitionKeys() { - return partitionKeys == null ? null : String.join(COMMA, partitionKeys); + return partitionKeys == null ? + null : String.join(COMMA, partitionKeys); } @Override diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java index df18ec80f16..9a15cc9e215 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java @@ -148,7 +148,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli // Route request to new query engine if it's supported already SQLQueryRequest newSqlRequest = new SQLQueryRequest(sqlRequest.getJsonContent(), - sqlRequest.getSql(), request.path(), request.params(), sqlRequest.cursor(), sqlRequest.routingIds()); + sqlRequest.getSql(), request.path(), request.params(), sqlRequest.cursor()); return newSqlQueryHandler.prepareRequest(newSqlRequest, (restChannel, exception) -> { try{ diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/request/SqlRequest.java b/legacy/src/main/java/org/opensearch/sql/legacy/request/SqlRequest.java index 194c16a87e0..605ef3c958b 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/request/SqlRequest.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/request/SqlRequest.java @@ -8,7 +8,6 @@ import java.io.IOException; import java.util.Collections; -import java.util.List; import org.json.JSONException; import org.json.JSONObject; import org.opensearch.common.settings.Settings; @@ -29,7 +28,6 @@ public class SqlRequest { JSONObject jsonContent; String cursor; Integer fetchSize; - private List routingIds; public SqlRequest(final String sql, final JSONObject jsonContent) { this.sql = sql; @@ -40,12 +38,10 @@ public SqlRequest(final String cursor) { this.cursor = cursor; } - public SqlRequest(final String sql, final Integer fetchSize, final JSONObject jsonContent, - final List routingIds) { + public SqlRequest(final String sql, final Integer fetchSize, final JSONObject jsonContent) { this.sql = sql; this.fetchSize = fetchSize; this.jsonContent = jsonContent; - this.routingIds = routingIds; } private static boolean isValidJson(String json) { @@ -69,8 +65,6 @@ public Integer fetchSize() { return this.fetchSize; } - public List routingIds() { return this.routingIds; } - public JSONObject getJsonContent() { return this.jsonContent; } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/request/SqlRequestFactory.java b/legacy/src/main/java/org/opensearch/sql/legacy/request/SqlRequestFactory.java index 2c6dbea6d47..752db234c7e 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/request/SqlRequestFactory.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/request/SqlRequestFactory.java @@ -24,7 +24,6 @@ public class SqlRequestFactory { public static final String SQL_CURSOR_FIELD_NAME = "cursor"; public static final String SQL_FETCH_FIELD_NAME = "fetch_size"; - public static final String ROUTING_FIELD_NAME = "routing"; public static SqlRequest getSqlRequest(RestRequest request) { switch (request.method()) { @@ -65,21 +64,7 @@ private static SqlRequest parseSqlRequestFromPayload(RestRequest restRequest) { return new PreparedStatementRequest(sql, validateAndGetFetchSize(jsonContent), jsonContent, parameters); } - List routingIds = List.of(); - if (jsonContent.has(ROUTING_FIELD_NAME)) { - try { - routingIds = List.of(jsonContent.getString(ROUTING_FIELD_NAME)); - } catch (JSONException ignored) { - try { - JSONArray routingIdArray = jsonContent.getJSONArray(ROUTING_FIELD_NAME); - routingIds = parseRoutingIds(routingIdArray); - } catch (JSONException jsonException) { - throw new IllegalArgumentException(ROUTING_FIELD_NAME + " parameter must be defined as a string or array value", jsonException); - } - } - } - - return new SqlRequest(sql, validateAndGetFetchSize(jsonContent), jsonContent, routingIds); + return new SqlRequest(sql, validateAndGetFetchSize(jsonContent), jsonContent); } diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionCursorFallbackTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionCursorFallbackTest.java index 989299fc38d..9d13b3a791c 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionCursorFallbackTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionCursorFallbackTest.java @@ -101,7 +101,7 @@ private static SQLQueryRequest createSqlQueryRequest(String query, Optional SUPPORTED_FIELDS = Set.of( - "query", "fetch_size", "parameters", QUERY_FIELD_CURSOR, QUERY_FIELD_ROUTING); + "query", "fetch_size", "parameters", QUERY_FIELD_CURSOR); private static final String QUERY_PARAMS_FORMAT = "format"; private static final String QUERY_PARAMS_SANITIZE = "sanitize"; @@ -67,13 +66,12 @@ public class SQLQueryRequest { private String cursor; - private List routingIds; /** * Constructor of SQLQueryRequest that passes request params. */ public SQLQueryRequest(JSONObject jsonContent, String query, String path, - Map params, String cursor, List routingIds) { + Map params, String cursor) { this.jsonContent = jsonContent; this.query = query; this.path = path; @@ -81,7 +79,6 @@ public SQLQueryRequest(JSONObject jsonContent, String query, String path, this.format = getFormat(params); this.sanitize = shouldSanitize(params); this.cursor = cursor; - this.routingIds = routingIds; } /** diff --git a/sql/src/test/java/org/opensearch/sql/sql/SQLServiceTest.java b/sql/src/test/java/org/opensearch/sql/sql/SQLServiceTest.java index 175d4d82c53..7512f19cfb4 100644 --- a/sql/src/test/java/org/opensearch/sql/sql/SQLServiceTest.java +++ b/sql/src/test/java/org/opensearch/sql/sql/SQLServiceTest.java @@ -81,7 +81,7 @@ public void onFailure(Exception e) { @Test public void can_execute_cursor_query() { sqlService.execute( - new SQLQueryRequest(new JSONObject(), null, QUERY, Map.of("format", "jdbc"), "n:cursor", List.of()), + new SQLQueryRequest(new JSONObject(), null, QUERY, Map.of("format", "jdbc"), "n:cursor"), new ResponseListener<>() { @Override public void onResponse(QueryResponse response) { @@ -99,7 +99,7 @@ public void onFailure(Exception e) { public void can_execute_close_cursor_query() { sqlService.execute( new SQLQueryRequest(new JSONObject(), null, QUERY + "/close", - Map.of("format", "jdbc"), "n:cursor", List.of()), + Map.of("format", "jdbc"), "n:cursor"), new ResponseListener<>() { @Override public void onResponse(QueryResponse response) { @@ -155,7 +155,7 @@ public void onFailure(Exception e) { @Test public void cannot_explain_cursor_query() { sqlService.explain(new SQLQueryRequest(new JSONObject(), null, EXPLAIN, - Map.of("format", "jdbc"), "n:cursor", List.of()), + Map.of("format", "jdbc"), "n:cursor"), new ResponseListener() { @Override public void onResponse(ExplainResponse response) { diff --git a/sql/src/test/java/org/opensearch/sql/sql/domain/SQLQueryRequestTest.java b/sql/src/test/java/org/opensearch/sql/sql/domain/SQLQueryRequestTest.java index 7a492131985..1ffa4f0fa8f 100644 --- a/sql/src/test/java/org/opensearch/sql/sql/domain/SQLQueryRequestTest.java +++ b/sql/src/test/java/org/opensearch/sql/sql/domain/SQLQueryRequestTest.java @@ -14,7 +14,6 @@ import com.google.common.collect.ImmutableMap; import java.util.HashMap; -import java.util.List; import java.util.Map; import org.json.JSONObject; import org.junit.jupiter.api.DisplayNameGeneration; @@ -290,7 +289,6 @@ private static class SQLQueryRequestBuilder { private String format; private String cursor; private Map params = new HashMap<>(); - private List routingIds = List.of(); static SQLQueryRequestBuilder request(String query) { SQLQueryRequestBuilder builder = new SQLQueryRequestBuilder(); @@ -328,7 +326,7 @@ SQLQueryRequest build() { params.put("format", format); } return new SQLQueryRequest(jsonContent == null ? null : new JSONObject(jsonContent), - query, path, params, cursor, routingIds); + query, path, params, cursor); } } From 97379bb7a9db5f36d0ccaef3ad66a2a3b21b0392 Mon Sep 17 00:00:00 2001 From: acarbonetto Date: Thu, 27 Jul 2023 09:44:52 -0700 Subject: [PATCH 07/13] Fix unit tests with partition add Signed-off-by: acarbonetto --- .../sql/analysis/AnalyzerTestBase.java | 5 +++-- .../org/opensearch/sql/config/TestConfig.java | 3 ++- .../org/opensearch/sql/planner/PlannerTest.java | 4 ++-- .../sql/storage/StorageEngineTest.java | 2 +- .../client/OpenSearchNodeClientTest.java | 13 +++++++++---- .../client/OpenSearchRestClientTest.java | 17 +++++++++++------ .../executor/OpenSearchExecutionEngineTest.java | 2 +- .../OpenSearchExecutionProtectorTest.java | 2 +- .../request/OpenSearchQueryRequestTest.java | 2 +- .../response/OpenSearchResponseTest.java | 12 ++++-------- .../opensearch/storage/OpenSearchIndexTest.java | 4 +++- .../scan/OpenSearchIndexScanPaginationTest.java | 8 +++++--- .../storage/scan/OpenSearchIndexScanTest.java | 15 ++++----------- 13 files changed, 47 insertions(+), 42 deletions(-) diff --git a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java index b6e26000417..b5514c7807d 100644 --- a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java +++ b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java @@ -20,6 +20,7 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import javax.annotation.Nullable; import org.apache.commons.lang3.tuple.Pair; import org.opensearch.sql.DataSourceSchemaName; import org.opensearch.sql.analysis.symbol.Namespace; @@ -55,7 +56,7 @@ protected Map typeMapping() { } protected StorageEngine storageEngine() { - return (dataSourceSchemaName, tableName) -> table; + return (dataSourceSchemaName, tableName, partition) -> table; } protected StorageEngine prometheusStorageEngine() { @@ -85,7 +86,7 @@ public FunctionName getFunctionName() { } @Override - public Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName) { + public Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName, @Nullable String partition) { return table; } }; diff --git a/core/src/test/java/org/opensearch/sql/config/TestConfig.java b/core/src/test/java/org/opensearch/sql/config/TestConfig.java index 6179f020c29..4d86f47f119 100644 --- a/core/src/test/java/org/opensearch/sql/config/TestConfig.java +++ b/core/src/test/java/org/opensearch/sql/config/TestConfig.java @@ -8,6 +8,7 @@ import com.google.common.collect.ImmutableMap; import java.util.Map; +import javax.annotation.Nullable; import org.opensearch.sql.DataSourceSchemaName; import org.opensearch.sql.analysis.symbol.Namespace; import org.opensearch.sql.analysis.symbol.Symbol; @@ -66,7 +67,7 @@ public class TestConfig { protected StorageEngine storageEngine() { return new StorageEngine() { @Override - public Table getTable(DataSourceSchemaName dataSourceSchemaName, String name) { + public Table getTable(DataSourceSchemaName dataSourceSchemaName, String name, @Nullable String partition) { return new Table() { @Override public boolean exists() { diff --git a/core/src/test/java/org/opensearch/sql/planner/PlannerTest.java b/core/src/test/java/org/opensearch/sql/planner/PlannerTest.java index 64498f76cd8..5beaede6ed5 100644 --- a/core/src/test/java/org/opensearch/sql/planner/PlannerTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/PlannerTest.java @@ -58,7 +58,7 @@ public class PlannerTest extends PhysicalPlanTestBase { @BeforeEach public void setUp() { - when(storageEngine.getTable(any(), any())).thenReturn(new MockTable()); + when(storageEngine.getTable(any(), any(), any())).thenReturn(new MockTable()); } @Test @@ -82,7 +82,7 @@ public void planner_test() { LogicalPlanDSL.relation("schema", storageEngine.getTable( new DataSourceSchemaName(DEFAULT_DATASOURCE_NAME, "default"), - "schema")), + "schema", "partition")), DSL.equal(DSL.ref("response", INTEGER), DSL.literal(10)) ), ImmutableList.of(DSL.named("avg(response)", DSL.avg(DSL.ref("response", INTEGER)))), diff --git a/core/src/test/java/org/opensearch/sql/storage/StorageEngineTest.java b/core/src/test/java/org/opensearch/sql/storage/StorageEngineTest.java index 67014b76bdc..98176310d01 100644 --- a/core/src/test/java/org/opensearch/sql/storage/StorageEngineTest.java +++ b/core/src/test/java/org/opensearch/sql/storage/StorageEngineTest.java @@ -15,7 +15,7 @@ public class StorageEngineTest { @Test void testFunctionsMethod() { - StorageEngine k = (dataSourceSchemaName, tableName) -> null; + StorageEngine k = (dataSourceSchemaName, tableName, partition) -> null; Assertions.assertEquals(Collections.emptyList(), k.getFunctions()); } } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java index 9417a1de1da..85925fa923e 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java @@ -87,6 +87,11 @@ class OpenSearchNodeClientTest { private static final String TEST_MAPPING_FILE = "mappings/accounts.json"; private static final String TEST_MAPPING_SETTINGS_FILE = "mappings/accounts2.json"; + private static final OpenSearchRequest.IndexName INDEX_NAME = + new OpenSearchRequest.IndexName("test"); + private static final OpenSearchRequest.IndexName ROUTING_ID = + new OpenSearchRequest.IndexName("shard"); + @Mock(answer = RETURNS_DEEP_STUBS) private NodeClient nodeClient; @@ -322,7 +327,7 @@ void search() { // Verify response for first scroll request OpenSearchScrollRequest request = new OpenSearchScrollRequest( - new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1), + INDEX_NAME, ROUTING_ID, TimeValue.timeValueMinutes(1), new SearchSourceBuilder(), factory); OpenSearchResponse response1 = client.search(request); assertFalse(response1.isEmpty()); @@ -357,7 +362,7 @@ void cleanup() { when(requestBuilder.get()).thenReturn(null); OpenSearchScrollRequest request = new OpenSearchScrollRequest( - new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1), + INDEX_NAME, ROUTING_ID, TimeValue.timeValueMinutes(1), new SearchSourceBuilder(), factory); request.setScrollId("scroll123"); // Enforce cleaning by setting a private field. @@ -374,7 +379,7 @@ void cleanup() { @Test void cleanup_without_scrollId() { OpenSearchScrollRequest request = new OpenSearchScrollRequest( - new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1), + INDEX_NAME, ROUTING_ID, TimeValue.timeValueMinutes(1), new SearchSourceBuilder(), factory); client.cleanup(request); verify(nodeClient, never()).prepareClearScroll(); @@ -386,7 +391,7 @@ void cleanup_rethrows_exception() { when(nodeClient.prepareClearScroll()).thenThrow(new RuntimeException()); OpenSearchScrollRequest request = new OpenSearchScrollRequest( - new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1), + INDEX_NAME, ROUTING_ID, TimeValue.timeValueMinutes(1), new SearchSourceBuilder(), factory); request.setScrollId("scroll123"); // Enforce cleaning by setting a private field. diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java index cceb6de995e..75a1c3d5652 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java @@ -81,6 +81,11 @@ class OpenSearchRestClientTest { private static final String TEST_MAPPING_FILE = "mappings/accounts.json"; + + private static final OpenSearchRequest.IndexName INDEX_NAME = + new OpenSearchRequest.IndexName("test"); + private static final OpenSearchRequest.IndexName ROUTING_ID = + new OpenSearchRequest.IndexName("shard"); @Mock(answer = RETURNS_DEEP_STUBS) private RestHighLevelClient restClient; @@ -313,7 +318,7 @@ void search() throws IOException { // Verify response for first scroll request OpenSearchScrollRequest request = new OpenSearchScrollRequest( - new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1), + INDEX_NAME, ROUTING_ID, TimeValue.timeValueMinutes(1), new SearchSourceBuilder(), factory); OpenSearchResponse response1 = client.search(request); assertFalse(response1.isEmpty()); @@ -335,7 +340,7 @@ void search_with_IOException() throws IOException { assertThrows( IllegalStateException.class, () -> client.search(new OpenSearchScrollRequest( - new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1), + INDEX_NAME, ROUTING_ID, TimeValue.timeValueMinutes(1), new SearchSourceBuilder(), factory))); } @@ -357,7 +362,7 @@ void scroll_with_IOException() throws IOException { // First request run successfully OpenSearchScrollRequest scrollRequest = new OpenSearchScrollRequest( - new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1), + INDEX_NAME, ROUTING_ID, TimeValue.timeValueMinutes(1), new SearchSourceBuilder(), factory); client.search(scrollRequest); assertThrows( @@ -376,7 +381,7 @@ void schedule() { @SneakyThrows void cleanup() { OpenSearchScrollRequest request = new OpenSearchScrollRequest( - new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1), + INDEX_NAME, ROUTING_ID, TimeValue.timeValueMinutes(1), new SearchSourceBuilder(), factory); // Enforce cleaning by setting a private field. FieldUtils.writeField(request, "needClean", true, true); @@ -389,7 +394,7 @@ void cleanup() { @Test void cleanup_without_scrollId() throws IOException { OpenSearchScrollRequest request = new OpenSearchScrollRequest( - new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1), + INDEX_NAME, ROUTING_ID, TimeValue.timeValueMinutes(1), new SearchSourceBuilder(), factory); client.cleanup(request); verify(restClient, never()).clearScroll(any(), any()); @@ -401,7 +406,7 @@ void cleanup_with_IOException() { when(restClient.clearScroll(any(), any())).thenThrow(new IOException()); OpenSearchScrollRequest request = new OpenSearchScrollRequest( - new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1), + INDEX_NAME, ROUTING_ID, TimeValue.timeValueMinutes(1), new SearchSourceBuilder(), factory); // Enforce cleaning by setting a private field. FieldUtils.writeField(request, "needClean", true, true); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java index 330793a5d65..476138d6490 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java @@ -183,7 +183,7 @@ void explain_successfully() { final int maxResultWindow = 10000; final var requestBuilder = new OpenSearchRequestBuilder(defaultQuerySize, exprValueFactory); PhysicalPlan plan = new OpenSearchIndexScan(mock(OpenSearchClient.class), - maxResultWindow, requestBuilder.build(name, maxResultWindow, + maxResultWindow, requestBuilder.build(name, null, maxResultWindow, settings.getSettingValue(SQL_CURSOR_KEEP_ALIVE))); AtomicReference result = new AtomicReference<>(); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java index fd5e747b5f2..5b538dcc691 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java @@ -119,7 +119,7 @@ void test_protect_indexScan() { final var name = new OpenSearchRequest.IndexName(indexName); final var request = new OpenSearchRequestBuilder(querySizeLimit, exprValueFactory) - .build(name, maxResultWindow, + .build(name, null, maxResultWindow, settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE)); assertEquals( PhysicalPlanDSL.project( diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequestTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequestTest.java index 1d60ee8b641..86941fd8cb5 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequestTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequestTest.java @@ -169,7 +169,7 @@ void searchCrossClusterRequest() { assertSearchRequest( new SearchRequest() .indices("ccs:test") - .routing("key") + .routing("ccs:key") .source(new SearchSourceBuilder() .timeout(DEFAULT_QUERY_TIMEOUT) .from(0) diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/response/OpenSearchResponseTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/response/OpenSearchResponseTest.java index e77819a4538..665f8b6e07c 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/response/OpenSearchResponseTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/response/OpenSearchResponseTest.java @@ -15,8 +15,6 @@ import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; @@ -30,12 +28,11 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.document.DocumentField; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.text.Text; -import org.opensearch.core.index.shard.ShardId; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; -import org.opensearch.search.SearchShardTarget; import org.opensearch.search.aggregations.Aggregations; import org.opensearch.search.fetch.subphase.highlight.HighlightField; import org.opensearch.sql.data.model.ExprFloatValue; @@ -150,13 +147,12 @@ void iterator_metafields() { new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 3.75F)); - ShardId shardId = new ShardId("index", "indexUUID", 42); - SearchShardTarget shardTarget = new SearchShardTarget("node", shardId, null, null); + DocumentField routingField = new DocumentField("_routing", List.of("testShard")); when(searchHit1.getSourceAsString()).thenReturn("{\"id1\", 1}"); when(searchHit1.getId()).thenReturn("testId"); when(searchHit1.getIndex()).thenReturn("testIndex"); - when(searchHit1.getShard()).thenReturn(shardTarget); + when(searchHit1.getFields()).thenReturn(Map.of("_routing", routingField)); when(searchHit1.getScore()).thenReturn(3.75F); when(searchHit1.getSeqNo()).thenReturn(123456L); @@ -166,7 +162,7 @@ void iterator_metafields() { "id1", new ExprIntegerValue(1), "_index", new ExprStringValue("testIndex"), "_id", new ExprStringValue("testId"), - "_routing", new ExprStringValue(shardTarget.toString()), + "_routing", new ExprStringValue("testShard"), "_sort", new ExprLongValue(123456L), "_score", new ExprFloatValue(3.75F), "_maxscore", new ExprFloatValue(3.75F) diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java index 1a11f9bf357..35d5ab8eec8 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java @@ -85,7 +85,9 @@ class OpenSearchIndexTest { @BeforeEach void setUp() { - this.index = new OpenSearchIndex(client, settings, "test", "routing"); + this.index = new OpenSearchIndex(client, settings, + INDEX_NAME.getIndexNames()[0], + PARTITION_KEY.getIndexNames()[0]); } @Test diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanPaginationTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanPaginationTest.java index 67f0869d6eb..9afcdcc8f57 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanPaginationTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanPaginationTest.java @@ -46,6 +46,8 @@ public class OpenSearchIndexScanPaginationTest { public static final OpenSearchRequest.IndexName INDEX_NAME = new OpenSearchRequest.IndexName("test"); + public static final OpenSearchRequest.IndexName ROUTING_ID + = new OpenSearchRequest.IndexName("shard"); public static final int MAX_RESULT_WINDOW = 3; public static final TimeValue SCROLL_TIMEOUT = TimeValue.timeValueMinutes(4); @Mock @@ -71,7 +73,7 @@ void query_empty_result() { mockResponse(client); var builder = new OpenSearchRequestBuilder(QUERY_SIZE, exprValueFactory); try (var indexScan = new OpenSearchIndexScan(client, MAX_RESULT_WINDOW, - builder.build(INDEX_NAME, MAX_RESULT_WINDOW, SCROLL_TIMEOUT))) { + builder.build(INDEX_NAME, ROUTING_ID, MAX_RESULT_WINDOW, SCROLL_TIMEOUT))) { indexScan.open(); assertFalse(indexScan.hasNext()); } @@ -90,11 +92,11 @@ void dont_serialize_if_no_cursor() { OpenSearchRequestBuilder builder = mock(); OpenSearchRequest request = mock(); OpenSearchResponse response = mock(); - when(builder.build(any(), anyInt(), any())).thenReturn(request); + when(builder.build(any(), any(), anyInt(), any())).thenReturn(request); when(client.search(any())).thenReturn(response); try (var indexScan = new OpenSearchIndexScan(client, MAX_RESULT_WINDOW, - builder.build(INDEX_NAME, MAX_RESULT_WINDOW, SCROLL_TIMEOUT))) { + builder.build(INDEX_NAME, ROUTING_ID, MAX_RESULT_WINDOW, SCROLL_TIMEOUT))) { indexScan.open(); when(request.hasAnotherBatch()).thenReturn(false); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java index 1fcbb3e804c..af3f760c027 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java @@ -12,7 +12,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -184,12 +183,6 @@ void query_all_results_with_query() { verify(client).cleanup(any()); } - static final OpenSearchRequest.IndexName EMPLOYEES_INDEX - = new OpenSearchRequest.IndexName("employees"); - - static final OpenSearchRequest.IndexName EMPLOYEES_PARTITION_KEY - = new OpenSearchRequest.IndexName("employkeys"); - @Test void query_all_results_with_scroll() { mockResponse(client, @@ -387,11 +380,11 @@ PushDownAssertion shouldQueryHighlight(QueryBuilder query, HighlightBuilder high .highlighter(highlight) .sort(DOC_FIELD_NAME, ASC); OpenSearchRequest request = - new OpenSearchQueryRequest(EMPLOYEES_INDEX, EMPLOYEES_PARTITION_KEY, sourceBuilder, factory); + new OpenSearchQueryRequest(INDEX_NAME, ROUTING_ID, sourceBuilder, factory); when(client.search(request)).thenReturn(response); var indexScan = new OpenSearchIndexScan(client, - QUERY_SIZE, requestBuilder.build(EMPLOYEES_INDEX, ROUTING_ID, 10000, CURSOR_KEEP_ALIVE)); + QUERY_SIZE, requestBuilder.build(INDEX_NAME, ROUTING_ID, 10000, CURSOR_KEEP_ALIVE)); indexScan.open(); return this; } @@ -403,10 +396,10 @@ PushDownAssertion shouldQuery(QueryBuilder expected) { .size(QUERY_SIZE) .timeout(CURSOR_KEEP_ALIVE) .sort(DOC_FIELD_NAME, ASC); - OpenSearchRequest request = new OpenSearchQueryRequest(EMPLOYEES_INDEX, EMPLOYEES_PARTITION_KEY, builder, factory); + OpenSearchRequest request = new OpenSearchQueryRequest(INDEX_NAME, ROUTING_ID, builder, factory); when(client.search(request)).thenReturn(response); var indexScan = new OpenSearchIndexScan(client, - 10000, requestBuilder.build(EMPLOYEES_INDEX, ROUTING_ID, 10000, CURSOR_KEEP_ALIVE)); + 10000, requestBuilder.build(INDEX_NAME, ROUTING_ID, 10000, CURSOR_KEEP_ALIVE)); indexScan.open(); return this; } From 5de089761e0c91164c376e246a194f3fd2df63c5 Mon Sep 17 00:00:00 2001 From: acarbonetto Date: Thu, 27 Jul 2023 13:26:57 -0700 Subject: [PATCH 08/13] clean up checkstyle and add test coverage Signed-off-by: acarbonetto --- .../org/opensearch/sql/ast/dsl/AstDSL.java | 4 +++ .../org/opensearch/sql/ast/tree/Relation.java | 13 +++++-- .../opensearch/sql/storage/StorageEngine.java | 5 ++- .../sql/analysis/AnalyzerTestBase.java | 4 ++- .../org/opensearch/sql/config/TestConfig.java | 4 ++- .../request/OpenSearchQueryRequest.java | 2 +- .../response/OpenSearchResponse.java | 1 + .../opensearch/storage/OpenSearchIndex.java | 5 ++- .../storage/OpenSearchStorageEngine.java | 4 ++- .../request/OpenSearchQueryRequestTest.java | 4 +-- .../request/OpenSearchScrollRequestTest.java | 2 +- .../response/OpenSearchResponseTest.java | 5 +-- .../storage/OpenSearchIndexTest.java | 2 +- .../storage/scan/OpenSearchIndexScanTest.java | 15 ++++---- .../storage/PrometheusStorageEngine.java | 4 ++- .../storage/PrometheusStorageEngineTest.java | 10 ++++-- .../sql/spark/storage/SparkStorageEngine.java | 5 ++- .../opensearch/sql/sql/parser/AstBuilder.java | 2 +- .../sql/sql/parser/AstBuilderTest.java | 34 +++++++++++++++++++ 19 files changed, 99 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java index d5f10fcfd4b..cc8a9af83df 100644 --- a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java +++ b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java @@ -90,6 +90,10 @@ public UnresolvedPlan relation(String tableName, String alias) { return new Relation(qualifiedName(tableName), alias); } + public UnresolvedPlan relation(String tableName, String alias, List partitionKeys) { + return new Relation(qualifiedName(tableName), alias, partitionKeys); + } + public UnresolvedPlan tableFunction(List functionName, UnresolvedExpression... args) { return new TableFunction(new QualifiedName(functionName), Arrays.asList(args)); } diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/Relation.java b/core/src/main/java/org/opensearch/sql/ast/tree/Relation.java index cb5cc52aea1..8f464d3c9aa 100644 --- a/core/src/main/java/org/opensearch/sql/ast/tree/Relation.java +++ b/core/src/main/java/org/opensearch/sql/ast/tree/Relation.java @@ -38,6 +38,13 @@ public Relation(UnresolvedExpression tableName, String alias) { this(tableName, alias, null); } + /** + * Constructor with Partition Keys for the relation. + * + * @param tableName - name of the name relation + * @param alias - alias name for the relation + * @param partitionKeys - partition or routing keys for the relation shard + */ public Relation(UnresolvedExpression tableName, String alias, List partitionKeys) { this.tableName = Arrays.asList(tableName); this.alias = alias; @@ -100,13 +107,13 @@ public QualifiedName getTableQualifiedName() { } /** - * Retrieve the partition keys associated with the table/relation + * Retrieve the partition keys associated with the table/relation. * * @return TablePartitionKeys | null */ public String getTablePartitionKeys() { - return partitionKeys == null ? - null : String.join(COMMA, partitionKeys); + return partitionKeys == null + ? null : String.join(COMMA, partitionKeys); } @Override diff --git a/core/src/main/java/org/opensearch/sql/storage/StorageEngine.java b/core/src/main/java/org/opensearch/sql/storage/StorageEngine.java index 3b1b000e778..077442e2ae5 100644 --- a/core/src/main/java/org/opensearch/sql/storage/StorageEngine.java +++ b/core/src/main/java/org/opensearch/sql/storage/StorageEngine.java @@ -20,7 +20,10 @@ public interface StorageEngine { /** * Get {@link Table} from storage engine. */ - Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName, @Nullable String partition); + Table getTable( + DataSourceSchemaName dataSourceSchemaName, + String tableName, + @Nullable String partition); /** * Get list of datasource related functions. diff --git a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java index b5514c7807d..7f5c0de1811 100644 --- a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java +++ b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java @@ -86,7 +86,9 @@ public FunctionName getFunctionName() { } @Override - public Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName, @Nullable String partition) { + public Table getTable(DataSourceSchemaName dataSourceSchemaName, + String tableName, + @Nullable String partition) { return table; } }; diff --git a/core/src/test/java/org/opensearch/sql/config/TestConfig.java b/core/src/test/java/org/opensearch/sql/config/TestConfig.java index 4d86f47f119..3ccc8c9e483 100644 --- a/core/src/test/java/org/opensearch/sql/config/TestConfig.java +++ b/core/src/test/java/org/opensearch/sql/config/TestConfig.java @@ -67,7 +67,9 @@ public class TestConfig { protected StorageEngine storageEngine() { return new StorageEngine() { @Override - public Table getTable(DataSourceSchemaName dataSourceSchemaName, String name, @Nullable String partition) { + public Table getTable(DataSourceSchemaName dataSourceSchemaName, + String name, + @Nullable String partition) { return new Table() { @Override public boolean exists() { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java index 44538d493a7..09289e0b6f2 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java @@ -58,7 +58,7 @@ public class OpenSearchQueryRequest implements OpenSearchRequest { private boolean searchDone = false; /** - * + * Sharding or Routing ID for the OpenSearch request. */ private final IndexName routingId; diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java index 4bef38642ce..3fc42824b77 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java @@ -29,6 +29,7 @@ import org.opensearch.search.aggregations.Aggregations; import org.opensearch.sql.data.model.ExprFloatValue; import org.opensearch.sql.data.model.ExprLongValue; +import org.opensearch.sql.data.model.ExprNullValue; import org.opensearch.sql.data.model.ExprStringValue; import org.opensearch.sql.data.model.ExprTupleValue; import org.opensearch.sql.data.model.ExprValue; diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index c1ad26c8eff..e2ebe26205d 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -90,7 +90,10 @@ public class OpenSearchIndex implements Table { /** * Constructor. */ - public OpenSearchIndex(OpenSearchClient client, Settings settings, String indexName, String routingId) { + public OpenSearchIndex(OpenSearchClient client, + Settings settings, + String indexName, + String routingId) { this.client = client; this.settings = settings; this.indexName = new OpenSearchRequest.IndexName(indexName); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java index d4052b129c4..d054afe86d8 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java @@ -29,7 +29,9 @@ public class OpenSearchStorageEngine implements StorageEngine { private final Settings settings; @Override - public Table getTable(DataSourceSchemaName dataSourceSchemaName, String name, @Nullable String routingId) { + public Table getTable(DataSourceSchemaName dataSourceSchemaName, + String name, + @Nullable String routingId) { if (isSystemIndex(name)) { // TODO: handle routingId on system tables too? return new OpenSearchSystemIndex(client, name); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequestTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequestTest.java index 86941fd8cb5..8ca71cb0f59 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequestTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequestTest.java @@ -100,7 +100,7 @@ void search() { void search_withoutContext() { OpenSearchQueryRequest request = new OpenSearchQueryRequest( new OpenSearchRequest.IndexName("test"), - new OpenSearchRequest.IndexName("key"), + null, sourceBuilder, factory ); @@ -124,7 +124,7 @@ void search_withIncludes() { factory ); - String[] includes = {"_id", "_index"}; + String[] includes = {"_id", "_index", "_routing"}; when(sourceBuilder.fetchSource()).thenReturn(fetchSourceContext); when(fetchSourceContext.includes()).thenReturn(includes); when(searchAction.apply(any())).thenReturn(searchResponse); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequestTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequestTest.java index 2ee77c73615..b6369dc0e99 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequestTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequestTest.java @@ -89,7 +89,7 @@ void constructor() { @Test void constructor2() { searchSourceBuilder.fetchSource(new String[]{"test"}, null); - var request = new OpenSearchScrollRequest(INDEX_NAME, ROUTING_ID, SCROLL_TIMEOUT, searchSourceBuilder, + var request = new OpenSearchScrollRequest(INDEX_NAME, null, SCROLL_TIMEOUT, searchSourceBuilder, factory); assertNotEquals(List.of(), request.getIncludes()); } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/response/OpenSearchResponseTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/response/OpenSearchResponseTest.java index 665f8b6e07c..3bf7bb1bfe7 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/response/OpenSearchResponseTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/response/OpenSearchResponseTest.java @@ -38,6 +38,7 @@ import org.opensearch.sql.data.model.ExprFloatValue; import org.opensearch.sql.data.model.ExprIntegerValue; import org.opensearch.sql.data.model.ExprLongValue; +import org.opensearch.sql.data.model.ExprNullValue; import org.opensearch.sql.data.model.ExprStringValue; import org.opensearch.sql.data.model.ExprTupleValue; import org.opensearch.sql.data.model.ExprValue; @@ -213,7 +214,7 @@ void iterator_metafields_withoutIncludes() { } @Test - void iterator_metafields_scoreNaN() { + void iterator_metafields_nullNaN() { ExprTupleValue exprTupleHit = ExprTupleValue.fromExprValueMap(ImmutableMap.of( "id1", new ExprIntegerValue(1) @@ -234,7 +235,7 @@ void iterator_metafields_scoreNaN() { when(factory.construct(any(), anyBoolean())).thenReturn(exprTupleHit); - List includes = List.of("id1", "_index", "_id", "_sort", "_score", "_maxscore"); + List includes = List.of("id1", "_index", "_id", "_routing", "_sort", "_score", "_maxscore"); ExprTupleValue exprTupleResponse = ExprTupleValue.fromExprValueMap(ImmutableMap.of( "id1", new ExprIntegerValue(1), "_index", new ExprStringValue("testIndex"), diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java index 35d5ab8eec8..4474401bbcc 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java @@ -167,7 +167,7 @@ void checkCacheUsedForFieldMappings() { when(client.getIndexMappings("test")).thenReturn( ImmutableMap.of("test", mapping)); - OpenSearchIndex index = new OpenSearchIndex(client, settings, "test", "routing"); + OpenSearchIndex index = new OpenSearchIndex(client, settings, "test", null); assertThat(index.getFieldTypes(), allOf( aMapWithSize(1), hasEntry("name", STRING))); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java index af3f760c027..979385ee045 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java @@ -280,8 +280,9 @@ void query_results_limited_by_query_size() { final int defaultQuerySize = 2; final var requestBuilder = new OpenSearchRequestBuilder(defaultQuerySize, exprValueFactory); - try (OpenSearchIndexScan indexScan = new OpenSearchIndexScan(client, - defaultQuerySize, requestBuilder.build(INDEX_NAME, ROUTING_ID, QUERY_SIZE, CURSOR_KEEP_ALIVE))) { + try (OpenSearchIndexScan indexScan = + new OpenSearchIndexScan(client, defaultQuerySize, + requestBuilder.build(INDEX_NAME, ROUTING_ID, QUERY_SIZE, CURSOR_KEEP_ALIVE))) { indexScan.open(); assertAll( @@ -383,8 +384,8 @@ PushDownAssertion shouldQueryHighlight(QueryBuilder query, HighlightBuilder high new OpenSearchQueryRequest(INDEX_NAME, ROUTING_ID, sourceBuilder, factory); when(client.search(request)).thenReturn(response); - var indexScan = new OpenSearchIndexScan(client, - QUERY_SIZE, requestBuilder.build(INDEX_NAME, ROUTING_ID, 10000, CURSOR_KEEP_ALIVE)); + var indexScan = new OpenSearchIndexScan(client, QUERY_SIZE, + requestBuilder.build(INDEX_NAME, ROUTING_ID, 10000, CURSOR_KEEP_ALIVE)); indexScan.open(); return this; } @@ -396,10 +397,12 @@ PushDownAssertion shouldQuery(QueryBuilder expected) { .size(QUERY_SIZE) .timeout(CURSOR_KEEP_ALIVE) .sort(DOC_FIELD_NAME, ASC); - OpenSearchRequest request = new OpenSearchQueryRequest(INDEX_NAME, ROUTING_ID, builder, factory); + OpenSearchRequest request = new OpenSearchQueryRequest( + INDEX_NAME, ROUTING_ID, builder, factory); when(client.search(request)).thenReturn(response); var indexScan = new OpenSearchIndexScan(client, - 10000, requestBuilder.build(INDEX_NAME, ROUTING_ID, 10000, CURSOR_KEEP_ALIVE)); + 10000, + requestBuilder.build(INDEX_NAME, ROUTING_ID, 10000, CURSOR_KEEP_ALIVE)); indexScan.open(); return this; } diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngine.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngine.java index ba7e2d255dd..aa721bcaa37 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngine.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngine.java @@ -44,7 +44,9 @@ public Collection getFunctions() { } @Override - public Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName, @Nullable String ignored) { + public Table getTable(DataSourceSchemaName dataSourceSchemaName, + String tableName, + @Nullable String ignored) { if (isSystemIndex(tableName)) { return new PrometheusSystemTable(prometheusClient, dataSourceSchemaName, tableName); } else if (INFORMATION_SCHEMA_NAME.equals(dataSourceSchemaName.getSchemaName())) { diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngineTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngineTest.java index 03858ef31f0..95161700337 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngineTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngineTest.java @@ -57,7 +57,10 @@ public void getFunctions() { @Test public void getSystemTable() { PrometheusStorageEngine engine = new PrometheusStorageEngine(client); - Table table = engine.getTable(new DataSourceSchemaName("prometheus", "default"), TABLE_INFO, "ignored"); + Table table = engine.getTable( + new DataSourceSchemaName("prometheus", "default"), + TABLE_INFO, + "ignored"); assertNotNull(table); assertTrue(table instanceof PrometheusSystemTable); } @@ -66,7 +69,10 @@ public void getSystemTable() { public void getSystemTableForAllTablesInfo() { PrometheusStorageEngine engine = new PrometheusStorageEngine(client); Table table - = engine.getTable(new DataSourceSchemaName("prometheus", "information_schema"), "tables", "ignored"); + = engine.getTable( + new DataSourceSchemaName("prometheus", "information_schema"), + "tables", + "ignored"); assertNotNull(table); assertTrue(table instanceof PrometheusSystemTable); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java index e9e4b9bbc52..07ca96f4733 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java +++ b/spark/src/main/java/org/opensearch/sql/spark/storage/SparkStorageEngine.java @@ -7,6 +7,7 @@ import java.util.Collection; import java.util.Collections; +import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import org.opensearch.sql.DataSourceSchemaName; import org.opensearch.sql.expression.function.FunctionResolver; @@ -29,7 +30,9 @@ public Collection getFunctions() { } @Override - public Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName, String parition) { + public Table getTable(DataSourceSchemaName dataSourceSchemaName, + String tableName, + @Nullable String ignored) { throw new RuntimeException("Unable to get table from storage engine."); } } diff --git a/sql/src/main/java/org/opensearch/sql/sql/parser/AstBuilder.java b/sql/src/main/java/org/opensearch/sql/sql/parser/AstBuilder.java index d197bf22fc2..5675621a816 100644 --- a/sql/src/main/java/org/opensearch/sql/sql/parser/AstBuilder.java +++ b/sql/src/main/java/org/opensearch/sql/sql/parser/AstBuilder.java @@ -191,7 +191,7 @@ private void verifySupportsCondition(UnresolvedExpression func) { public UnresolvedPlan visitTableAsRelation(TableAsRelationContext ctx) { String tableAlias = (ctx.alias() == null) ? null : StringUtils.unquoteIdentifier(ctx.alias().getText()); - if(ctx.partitionRelationClause() == null) { + if (ctx.partitionRelationClause() == null) { return new Relation(visitAstExpression(ctx.tableName()), tableAlias); } return new Relation( diff --git a/sql/src/test/java/org/opensearch/sql/sql/parser/AstBuilderTest.java b/sql/src/test/java/org/opensearch/sql/sql/parser/AstBuilderTest.java index 3e56a897541..287fadc8250 100644 --- a/sql/src/test/java/org/opensearch/sql/sql/parser/AstBuilderTest.java +++ b/sql/src/test/java/org/opensearch/sql/sql/parser/AstBuilderTest.java @@ -33,6 +33,7 @@ import com.google.common.collect.ImmutableList; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.junit.jupiter.api.Test; import org.opensearch.sql.ast.dsl.AstDSL; @@ -124,6 +125,39 @@ public void can_build_select_fields_from_index() { ); } + @Test + public void can_build_select_fields_from_index_with_partition() { + assertEquals( + project( + relation("test", null, List.of("key")), + alias("age", qualifiedName("age")) + ), + buildAST("SELECT age FROM test PARTITION(key)") + ); + } + + @Test + public void can_build_select_fields_from_index_with_multiple_partitions() { + assertEquals( + project( + relation("test", null, List.of("key1", "key2", "key3")), + alias("age", qualifiedName("age")) + ), + buildAST("SELECT age FROM test PARTITION(key1, key2, key3)") + ); + } + + @Test + public void can_build_select_fields_from_index_with_quoted_partitions() { + assertEquals( + project( + relation("test", null, List.of("'key1'", "\"key2\"", "`key3`")), + alias("age", qualifiedName("age")) + ), + buildAST("SELECT age FROM test PARTITION('key1', \"key2\", `key3`)") + ); + } + @Test public void can_build_select_fields_with_alias() { assertEquals( From d543849f7fbfbf3ff3b209a50019d30e0b5724f1 Mon Sep 17 00:00:00 2001 From: acarbonetto Date: Thu, 27 Jul 2023 14:07:14 -0700 Subject: [PATCH 09/13] Fix IT tests Signed-off-by: acarbonetto --- docs/user/dql/basics.rst | 11 +++++++++++ .../opensearch/request/OpenSearchScrollRequest.java | 4 +++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/docs/user/dql/basics.rst b/docs/user/dql/basics.rst index a03ac4db700..00c3c29578f 100644 --- a/docs/user/dql/basics.rst +++ b/docs/user/dql/basics.rst @@ -363,6 +363,17 @@ SQL query:: { "query" : "SELECT account_number FROM accounts/account" } +Example 4: Selecting From Index using Partition Shard +----------------------------------------------------------- + +You can also specify a specific shard or partition to target using a routing hash key in ``PARTITION``. You can target multiple shards by providing a list separated by commas. + +SQL query:: + + POST /_plugins/_sql + { + "query" : "SELECT account_number FROM account PARTITION(shard1, shard2)" + } WHERE ===== diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java index 8588469246d..5098db91457 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java @@ -179,7 +179,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(scrollId); out.writeStringCollection(includes); indexName.writeTo(out); - routingId.writeTo(out); + if (routingId != null) { + routingId.writeTo(out); + } } /** From 283428c4664997fe462ccb72adcb98483da544ad Mon Sep 17 00:00:00 2001 From: acarbonetto Date: Thu, 27 Jul 2023 16:26:42 -0700 Subject: [PATCH 10/13] Add multi-cluster search tests to build Signed-off-by: acarbonetto --- .github/workflows/sql-test-workflow.yml | 3 +- integ-test/build.gradle | 3 ++ .../org/opensearch/sql/sql/IdentifierIT.java | 47 +------------------ .../ppl/explain_filter_agg_push.json | 2 +- .../ppl/explain_filter_push.json | 2 +- .../expectedOutput/ppl/explain_output.json | 2 +- .../expectedOutput/ppl/explain_sort_push.json | 2 +- 7 files changed, 11 insertions(+), 50 deletions(-) diff --git a/.github/workflows/sql-test-workflow.yml b/.github/workflows/sql-test-workflow.yml index cdc08c74803..573192e0a3a 100644 --- a/.github/workflows/sql-test-workflow.yml +++ b/.github/workflows/sql-test-workflow.yml @@ -64,8 +64,9 @@ jobs: ./gradlew :core:jacocoTestCoverageVerification || echo "* Jacoco failed for core" >> report.log ./gradlew :protocol:jacocoTestCoverageVerification || echo "* Jacoco failed for protocol" >> report.log ./gradlew :opensearch-sql-plugin:jacocoTestCoverageVerification || echo "* Jacoco failed for plugin" >> report.log - # Misc tests + # Misc Integration tests ./gradlew :integ-test:integTest || echo "* Integration test failed" >> report.log + ./gradlew :integ-test:multiClusterSearch || echo "* Multi-Cluster Search tests failed" >> report.log ./gradlew :doctest:doctest || echo "* Doctest failed" >> report.log ./scripts/bwctest.sh || echo "* Backward compatibility test failed" >> report.log diff --git a/integ-test/build.gradle b/integ-test/build.gradle index 06d3682efe1..237130cb99d 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -285,6 +285,9 @@ integTest { exclude 'org/opensearch/sql/doctest/**/*IT.class' exclude 'org/opensearch/sql/correctness/**' + // Skip to run these IT tests on a different cluster + exclude 'org/opensearch/sql/multiClusterSearch/**' + // Explain IT is dependent on internal implementation of old engine so it's not necessary // to run these with new engine and not necessary to make this consistent with old engine. exclude 'org/opensearch/sql/legacy/ExplainIT.class' diff --git a/integ-test/src/test/java/org/opensearch/sql/sql/IdentifierIT.java b/integ-test/src/test/java/org/opensearch/sql/sql/IdentifierIT.java index 1789927e5eb..1f76af11241 100644 --- a/integ-test/src/test/java/org/opensearch/sql/sql/IdentifierIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/sql/IdentifierIT.java @@ -140,48 +140,7 @@ public void testMetafieldIdentifierRoutingSelectTest() throws IOException { for (int i = 0; i < 6; i++) { assertEquals("test" + i, datarows.getJSONArray(i).getString(1)); assertEquals(index, datarows.getJSONArray(i).getString(2)); - assertTrue(datarows.getJSONArray(i).getString(3).contains("[" + index + "]")); - } - } - - @Test - public void testMetafieldIdentifierRoutingPartitionTest() throws IOException { - // create an index with routing required - String index = "test.routing_partition"; - final String mapping = "{ " + - "\"mappings\" : {" + - " \"_routing\": { \"required\": true }," + - " \"properties\" : { " + - " \"age\" : { \"type\" : \"long\" } } } }"; - new Index(index, mapping) - .addDocWithShardId("{\"age\": 31}", "test0", "test0") - .addDocWithShardId("{\"age\": 31}", "test1", "test1") - .addDocWithShardId("{\"age\": 32}", "test2", "test2") - .addDocWithShardId("{\"age\": 33}", "test3", "test3") - .addDocWithShardId("{\"age\": 34}", "test4", "test4") - .addDocWithShardId("{\"age\": 35}", "test5", "test5"); - - // Execute using field metadata values filtering on the routing shard hash id - String query = "SELECT age, _id, _index, _routing " - + "FROM " + index + " PARTITION(test4,test5)"; - final JSONObject result = new JSONObject(executeQuery(query, "jdbc")); - - // Verify that the metadata values are returned when requested - verifySchema(result, - schema("age", null, "long"), - schema("_id", null, "keyword"), - schema("_index", null, "keyword"), - schema("_routing", null, "keyword")); - assertTrue(result.getJSONArray("schema").length() == 4); - - var datarows = result.getJSONArray("datarows"); - assertEquals(2, datarows.length()); - - // note that _routing in the SELECT clause returns the shard - for (int i = 0; i < 2; i++) { - assertEquals("test" + i, datarows.getJSONArray(i).getString(1)); - assertEquals(index, datarows.getJSONArray(i).getString(2)); - assertTrue(datarows.getJSONArray(i).getString(3).contains("[" + index + "]")); + assertEquals("test" + i, datarows.getJSONArray(i).getString(3)); } } @@ -220,9 +179,7 @@ public void testMetafieldIdentifierRoutingFilterTest() throws IOException { assertEquals(1, datarows.length()); assertEquals("test4", datarows.getJSONArray(0).getString(0)); - // note that _routing in the SELECT clause returns the shard, not the routing hash id - assertTrue(datarows.getJSONArray(0).getString(2).contains("[" + index + "]")); - + assertEquals("test4", datarows.getJSONArray(0).getString(2)); } @Test diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_agg_push.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_agg_push.json index 568b397f07b..9c747abfc9d 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_agg_push.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_agg_push.json @@ -8,7 +8,7 @@ { "name": "OpenSearchIndexScan", "description": { - "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone\u003dfalse)" + "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone\u003dfalse, routingId\u003dnull)" }, "children": [] } diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_push.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_push.json index 0e7087aa1ff..f644dc73dc1 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_push.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_push.json @@ -8,7 +8,7 @@ { "name": "OpenSearchIndexScan", "description": { - "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"bool\":{\"filter\":[{\"range\":{\"balance\":{\"from\":10000,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":null,\"to\":40,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone\u003dfalse)" + "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"bool\":{\"filter\":[{\"range\":{\"balance\":{\"from\":10000,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":null,\"to\":40,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone\u003dfalse, routingId\u003dnull)" }, "children": [] } diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_output.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_output.json index 8d45714283d..01729d8b034 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_output.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_output.json @@ -31,7 +31,7 @@ { "name": "OpenSearchIndexScan", "description": { - "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone\u003dfalse)" + "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone\u003dfalse, routingId\u003dnull)" }, "children": [] } diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_sort_push.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_sort_push.json index af2a57e5363..f6c367b7e06 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_sort_push.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_sort_push.json @@ -8,7 +8,7 @@ { "name": "OpenSearchIndexScan", "description": { - "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone\u003dfalse)" + "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone\u003dfalse, routingId\u003dnull)" }, "children": [] } From b1928aed2c812b592cc6192f3da6a93980c1bbdc Mon Sep 17 00:00:00 2001 From: acarbonetto Date: Fri, 28 Jul 2023 08:26:15 -0700 Subject: [PATCH 11/13] Updates for comments Signed-off-by: acarbonetto --- .../opensearch/sql/legacy/request/SqlRequestFactory.java | 9 --------- .../plugin/RestSQLQueryActionCursorFallbackTest.java | 1 - 2 files changed, 10 deletions(-) diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/request/SqlRequestFactory.java b/legacy/src/main/java/org/opensearch/sql/legacy/request/SqlRequestFactory.java index 752db234c7e..4c5d207be85 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/request/SqlRequestFactory.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/request/SqlRequestFactory.java @@ -63,7 +63,6 @@ private static SqlRequest parseSqlRequestFromPayload(RestRequest restRequest) { List parameters = parseParameters(paramArray); return new PreparedStatementRequest(sql, validateAndGetFetchSize(jsonContent), jsonContent, parameters); } - return new SqlRequest(sql, validateAndGetFetchSize(jsonContent), jsonContent); } @@ -83,14 +82,6 @@ private static Integer validateAndGetFetchSize(JSONObject jsonContent) { return fetchSize.orElse(0); } - private static List parseRoutingIds(JSONArray array) { - List routingIds = List.of(); - for (int i = 0; i < array.length(); i++) { - routingIds.add(array.getString(i)); - } - return routingIds; - } - private static List parseParameters( JSONArray paramsJsonArray) { List parameters = new ArrayList<>(); diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionCursorFallbackTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionCursorFallbackTest.java index 9d13b3a791c..235705ae745 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionCursorFallbackTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionCursorFallbackTest.java @@ -99,7 +99,6 @@ private static SQLQueryRequest createSqlQueryRequest(String query, Optional Date: Fri, 28 Jul 2023 09:47:26 -0700 Subject: [PATCH 12/13] Update comment Signed-off-by: acarbonetto --- .github/workflows/sql-test-workflow.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/sql-test-workflow.yml b/.github/workflows/sql-test-workflow.yml index 573192e0a3a..abadaf2ace5 100644 --- a/.github/workflows/sql-test-workflow.yml +++ b/.github/workflows/sql-test-workflow.yml @@ -64,7 +64,7 @@ jobs: ./gradlew :core:jacocoTestCoverageVerification || echo "* Jacoco failed for core" >> report.log ./gradlew :protocol:jacocoTestCoverageVerification || echo "* Jacoco failed for protocol" >> report.log ./gradlew :opensearch-sql-plugin:jacocoTestCoverageVerification || echo "* Jacoco failed for plugin" >> report.log - # Misc Integration tests + # Misc/Additional Integration tests ./gradlew :integ-test:integTest || echo "* Integration test failed" >> report.log ./gradlew :integ-test:multiClusterSearch || echo "* Multi-Cluster Search tests failed" >> report.log ./gradlew :doctest:doctest || echo "* Doctest failed" >> report.log From 087a5b317532ab24c92035781f17301119f73011 Mon Sep 17 00:00:00 2001 From: acarbonetto Date: Mon, 31 Jul 2023 09:46:19 -0700 Subject: [PATCH 13/13] Fix doctests Signed-off-by: acarbonetto --- docs/user/optimization/optimization.rst | 22 +++++++++++----------- docs/user/ppl/interfaces/endpoint.rst | 2 +- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/docs/user/optimization/optimization.rst b/docs/user/optimization/optimization.rst index 8ab998309d6..6f3b64896cd 100644 --- a/docs/user/optimization/optimization.rst +++ b/docs/user/optimization/optimization.rst @@ -44,7 +44,7 @@ The consecutive Filter operator will be merged as one Filter operator:: { "name": "OpenSearchIndexScan", "description": { - "request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":10,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone=false)" + "request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":10,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone=false, routingId=null)" }, "children": [] } @@ -71,7 +71,7 @@ The Filter operator should be push down under Sort operator:: { "name": "OpenSearchIndexScan", "description": { - "request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone=false)" + "request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone=false, routingId=null)" }, "children": [] } @@ -102,7 +102,7 @@ The Project list will push down to Query DSL to `filter the source