diff --git a/server/src/main/java/org/opensearch/common/Table.java b/server/src/main/java/org/opensearch/common/Table.java index da14f628efa0f..5e80b44b7fdd8 100644 --- a/server/src/main/java/org/opensearch/common/Table.java +++ b/server/src/main/java/org/opensearch/common/Table.java @@ -43,6 +43,8 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import reactor.util.annotation.NonNull; + import static java.util.Collections.emptyMap; /** @@ -59,9 +61,19 @@ public class Table { private List currentCells; private boolean inHeaders = false; private boolean withTime = false; + private PaginationMetadata paginationMetadata = new PaginationMetadata(false, null, null); + public static final String EPOCH = "epoch"; public static final String TIMESTAMP = "timestamp"; + public Table() {} + + public Table(@Nullable PaginationMetadata paginationMetadata) { + if (paginationMetadata != null) { + this.paginationMetadata = paginationMetadata; + } + } + public Table startHeaders() { inHeaders = true; currentCells = new ArrayList<>(); @@ -230,6 +242,18 @@ public Map getAliasMap() { return headerAliasMap; } + public boolean isTablePaginated() { + return paginationMetadata.isResponsePaginated; + } + + public String getNextTokenForTable() { + return paginationMetadata.nextToken; + } + + public String getPaginatedElementForTable() { + return paginationMetadata.paginatedElement; + } + /** * Cell in a table * @@ -254,4 +278,34 @@ public Cell(Object value, Map attr) { this.attr = attr; } } + + /** + * Pagination metadata for a table. + * + * @opensearch.internal + */ + public static class PaginationMetadata { + + /** + * boolean denoting whether the table is paginated or not. + */ + public final boolean isResponsePaginated; + + /** + * String denoting the element which is being paginated (for e.g. shards, indices..). + */ + public final String paginatedElement; + + /** + * String denoting the nextToken of paginated response, which will be used to fetch nextPage (if any). + */ + public final String nextToken; + + public PaginationMetadata(@NonNull boolean isResponsePaginated, @Nullable String paginatedElement, @Nullable String nextToken) { + this.isResponsePaginated = isResponsePaginated; + assert !isResponsePaginated || paginatedElement != null : "paginatedElement must be specified for a table which is paginated"; + this.paginatedElement = paginatedElement; + this.nextToken = nextToken; + } + } } diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java index 4413c8eb370be..d1efa4f6da79a 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java @@ -39,6 +39,7 @@ import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.action.admin.indices.stats.ShardStats; import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.common.Table; @@ -65,11 +66,17 @@ import org.opensearch.rest.action.RestResponseListener; import org.opensearch.search.suggest.completion.CompletionStats; +import java.nio.charset.StandardCharsets; import java.time.Instant; +import java.util.ArrayList; +import java.util.Base64; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.Objects; import java.util.function.Function; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Arrays.asList; import static java.util.Collections.unmodifiableList; import static org.opensearch.rest.RestRequest.Method.GET; @@ -106,24 +113,53 @@ protected void documentation(StringBuilder sb) { @Override public RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) { - final String[] indices = Strings.splitStringByCommaToArray(request.param("index")); + + String[] indices = new String[0]; final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local())); clusterStateRequest.clusterManagerNodeTimeout( request.paramAsTime("cluster_manager_timeout", clusterStateRequest.clusterManagerNodeTimeout()) ); parseDeprecatedMasterTimeoutParameter(clusterStateRequest, request, deprecationLogger, getName()); - clusterStateRequest.clear().nodes(true).routingTable(true).indices(indices); + if (request.hasParam("nextToken")) { + // ToDo: Add validation on the nextToken passed in the request + // Need to get the metadata as well + request.param("nextToken"); + clusterStateRequest.clear().nodes(true).routingTable(true).metadata(true); + } else { + // Only parse the "index" param if the request is not-paginated. + indices = Strings.splitStringByCommaToArray(request.param("index")); + clusterStateRequest.clear().nodes(true).routingTable(true).indices(indices); + } + + String[] finalIndices = indices; return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener(channel) { @Override public void processResponse(final ClusterStateResponse clusterStateResponse) { IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); indicesStatsRequest.all(); - indicesStatsRequest.indices(indices); + final List shardRoutingResponseList = new ArrayList<>(); + final Table.PaginationMetadata paginationMetadata; + if (request.hasParam("nextToken")) { + List indicesToBeQueried = new ArrayList<>(); + paginationMetadata = new Table.PaginationMetadata( + true, + "shards", + getNextTokenForPaginatedResponse(request, clusterStateResponse, indicesToBeQueried, shardRoutingResponseList) + ); + indicesStatsRequest.indices(indicesToBeQueried.toArray(new String[0])); + } else { + shardRoutingResponseList.addAll(clusterStateResponse.getState().routingTable().allShards()); + indicesStatsRequest.indices(finalIndices); + paginationMetadata = null; + } client.admin().indices().stats(indicesStatsRequest, new RestResponseListener(channel) { @Override public RestResponse buildResponse(IndicesStatsResponse indicesStatsResponse) throws Exception { - return RestTable.buildResponse(buildTable(request, clusterStateResponse, indicesStatsResponse), channel); + return RestTable.buildResponse( + buildTable(request, clusterStateResponse, indicesStatsResponse, shardRoutingResponseList, paginationMetadata), + channel + ); } }); } @@ -132,7 +168,11 @@ public RestResponse buildResponse(IndicesStatsResponse indicesStatsResponse) thr @Override protected Table getTableWithHeader(final RestRequest request) { - Table table = new Table(); + return getTableWithHeader(request, null); + } + + protected Table getTableWithHeader(final RestRequest request, Table.PaginationMetadata paginationMetadata) { + Table table = new Table(paginationMetadata); table.startHeaders() .addCell("index", "default:true;alias:i,idx;desc:index name") .addCell("shard", "default:true;alias:s,sh;desc:shard name") @@ -301,10 +341,15 @@ private static Object getOrNull(S stats, Function accessor, Functio } // package private for testing - Table buildTable(RestRequest request, ClusterStateResponse state, IndicesStatsResponse stats) { - Table table = getTableWithHeader(request); - - for (ShardRouting shard : state.getState().routingTable().allShards()) { + Table buildTable( + RestRequest request, + ClusterStateResponse state, + IndicesStatsResponse stats, + List shardRoutingList, + Table.PaginationMetadata paginationMetadata + ) { + Table table = getTableWithHeader(request, paginationMetadata); + for (ShardRouting shard : shardRoutingList) { ShardStats shardStats = stats.asMap().get(shard); CommonStats commonStats = null; CommitStats commitStats = null; @@ -453,7 +498,106 @@ Table buildTable(RestRequest request, ClusterStateResponse state, IndicesStatsRe table.endRow(); } - return table; } + + private List getListOfIndicesSortedByCreateTime(final ClusterStateResponse clusterStateResponse) { + List indicesList = new ArrayList(clusterStateResponse.getState().getRoutingTable().getIndicesRouting().keySet()); + indicesList.sort((index1, index2) -> { + Long index1CreationTimeStamp = clusterStateResponse.getState().metadata().indices().get(index1).getCreationDate(); + Long index2CreationTimeStamp = clusterStateResponse.getState().metadata().indices().get(index2).getCreationDate(); + if (index1CreationTimeStamp.equals(index2CreationTimeStamp)) { + return index1.compareTo(index2); + } + return (index1CreationTimeStamp - index2CreationTimeStamp) > 0 ? 1 : -1; + }); + return indicesList; + } + + private String getNextTokenForPaginatedResponse( + final RestRequest request, + ClusterStateResponse clusterStateResponse, + List indicesToBeQueried, + List shardRoutingResponseList + ) { + final long defaultPageSize = (long) clusterStateResponse.getState().nodes().getDataNodes().size() + 1; + + // Get the nextToken provided in the request + final String nextTokenInRequest = Objects.equals(request.param("nextToken"), "null") + ? null + : new String(Base64.getDecoder().decode(request.param("nextToken")), UTF_8); + + List sortedIndicesList = getListOfIndicesSortedByCreateTime(clusterStateResponse); + + // Since all the shards for last ID would have already been sent in the last response, + // start iterating from the next shard for current page + int newPageStartShardID = nextTokenInRequest == null ? 0 : Integer.parseInt(nextTokenInRequest.split("\\$")[0]) + 1; + + // Since all the shards corresponding to the last processed index might not have been included in the last page, + // start iterating from the last index number itself + int newPageStartIndexNumber = nextTokenInRequest == null ? 0 : Integer.parseInt(nextTokenInRequest.split("\\$")[1]); + + // Get the number of shards upto the maxPageSize + long shardCountSoFar = 0L; + int lastProcessedShardNumber = -1; + int lastProcessedIndexNumber = -1; + + // ToDo: Handle case when index gets deleted. Select the first index with creationTime just greater than the last index's + // creationTime + int indexNumberInSortedList = newPageStartIndexNumber; + for (; indexNumberInSortedList < sortedIndicesList.size(); indexNumberInSortedList++) { + String index = sortedIndicesList.get(indexNumberInSortedList); + Map indexShards = clusterStateResponse.getState() + .getRoutingTable() + .getIndicesRouting() + .get(index) + .getShards(); + // If all the shards corresponding to the last index were already processed, move to the next Index + if (indexNumberInSortedList == newPageStartIndexNumber && (newPageStartShardID > indexShards.size() - 1)) { + // ToDo: Add validation that the newPageStartShardID should not be greater than the newPageStartIndexShards.size() + newPageStartShardID = 0; + continue; + } + int lastProcessedShardNumberForCurrentIndex = -1; + int shardID = (indexNumberInSortedList == newPageStartIndexNumber) ? newPageStartShardID : 0; + for (; shardID < indexShards.size(); shardID++) { + shardCountSoFar += indexShards.get(shardID).shards().size(); + if (shardCountSoFar > defaultPageSize) { + break; + } + shardRoutingResponseList.addAll(indexShards.get(shardID).shards()); + lastProcessedShardNumberForCurrentIndex = shardID; + } + + if (shardCountSoFar > defaultPageSize) { + if (lastProcessedShardNumberForCurrentIndex != -1) { + indicesToBeQueried.add(index); + lastProcessedIndexNumber = indexNumberInSortedList; + lastProcessedShardNumber = lastProcessedShardNumberForCurrentIndex; + } + break; + } + indicesToBeQueried.add(index); + lastProcessedShardNumber = lastProcessedShardNumberForCurrentIndex; + lastProcessedIndexNumber = indexNumberInSortedList; + } + + // nextToken = "lastProcessedShardNumber$LastProcessedIndexNumber$CreateTimeOfLastProcessedIndex$NameOfLastProcessedIndex" + return indexNumberInSortedList >= sortedIndicesList.size() + ? null + : Base64.getEncoder() + .encodeToString( + (lastProcessedShardNumber + + "$" + + (lastProcessedIndexNumber) + + "$" + + clusterStateResponse.getState() + .metadata() + .indices() + .get(sortedIndicesList.get(lastProcessedIndexNumber)) + .getCreationDate() + + "$" + + sortedIndicesList.get(lastProcessedIndexNumber)).getBytes(StandardCharsets.UTF_8) + ); + } } diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestTable.java b/server/src/main/java/org/opensearch/rest/action/cat/RestTable.java index 4f1090b163ee6..2cb73ff44919e 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestTable.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestTable.java @@ -88,7 +88,15 @@ public static RestResponse buildXContentBuilder(Table table, RestChannel channel XContentBuilder builder = channel.newBuilder(); List displayHeaders = buildDisplayHeaders(table, request); - builder.startArray(); + if (table.isTablePaginated()) { + assert table.getPaginatedElementForTable() != null : "Paginated element is required in-case nextToken is not null"; + builder.startObject(); + builder.field("nextToken", table.getNextTokenForTable()); + builder.startArray(table.getPaginatedElementForTable()); + } else { + builder.startArray(); + } + List rowOrder = getRowOrder(table, request); for (Integer row : rowOrder) { builder.startObject(); @@ -98,6 +106,11 @@ public static RestResponse buildXContentBuilder(Table table, RestChannel channel builder.endObject(); } builder.endArray(); + + if (table.isTablePaginated()) { + builder.endObject(); + } + return new BytesRestResponse(RestStatus.OK, builder); } @@ -136,6 +149,13 @@ public static RestResponse buildTextPlainResponse(Table table, RestChannel chann } out.append("\n"); } + + // Adding a nextToken row, post an empty line, in the response if the table is paginated. + if (table.isTablePaginated()) { + out.append("\n"); + out.append("nextToken" + " " + table.getNextTokenForTable()); + out.append("\n"); + } out.close(); return new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, bytesOut.bytes()); } diff --git a/server/src/test/java/org/opensearch/rest/action/cat/RestShardsActionTests.java b/server/src/test/java/org/opensearch/rest/action/cat/RestShardsActionTests.java index 883df7da5d717..df32e3a3bce00 100644 --- a/server/src/test/java/org/opensearch/rest/action/cat/RestShardsActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/cat/RestShardsActionTests.java @@ -112,7 +112,7 @@ public void testBuildTable() { when(state.getState()).thenReturn(clusterState); final RestShardsAction action = new RestShardsAction(); - final Table table = action.buildTable(new FakeRestRequest(), state, stats); + final Table table = action.buildTable(new FakeRestRequest(), state, stats, state.getState().routingTable().allShards(), null); // now, verify the table is correct List headers = table.getHeaders();