From 6c855ce7f9c2c0cc65e47e501dc96974b635942e Mon Sep 17 00:00:00 2001 From: Harsh Garg Date: Fri, 27 Sep 2024 17:01:43 +0530 Subject: [PATCH] Refactoring and bug fixes for ShardPaginationStrategy Signed-off-by: Harsh Garg --- .../cluster/shards/CatShardsRequest.java | 9 +- .../cluster/shards/CatShardsResponse.java | 9 +- .../shards/TransportCatShardsAction.java | 37 +++--- .../rest/action/cat/RestShardsAction.java | 1 + .../pagination/ShardPaginationStrategy.java | 124 ++++++++++-------- 5 files changed, 106 insertions(+), 74 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java index 4ee3aa1c9a175..3524ca38f84f4 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java @@ -39,7 +39,9 @@ public CatShardsRequest(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_3_0_0)) { indices = in.readStringArray(); cancelAfterTimeInterval = in.readTimeValue(); - pageParams = PageParams.readPageParams(in); + if (in.readBoolean()) { + pageParams = PageParams.readPageParams(in); + } } } @@ -49,7 +51,10 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_3_0_0)) { out.writeStringArray(indices); out.writeTimeValue(cancelAfterTimeInterval); - pageParams.writePageParams(out); + out.writeBoolean(pageParams != null); + if (pageParams != null) { + pageParams.writePageParams(out); + } } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsResponse.java index 21007c4e6ce96..80fec51fd25d6 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsResponse.java @@ -42,7 +42,9 @@ public CatShardsResponse(StreamInput in) throws IOException { indicesStatsResponse = new IndicesStatsResponse(in); if (in.getVersion().onOrAfter(Version.V_3_0_0)) { responseShards = in.readList(ShardRouting::new); - pageToken = PageToken.readPageToken(in); + if (in.readBoolean()) { + pageToken = PageToken.readPageToken(in); + } } } @@ -52,7 +54,10 @@ public void writeTo(StreamOutput out) throws IOException { indicesStatsResponse.writeTo(out); if (out.getVersion().onOrAfter(Version.V_3_0_0)) { out.writeList(responseShards); - pageToken.writePageToken(out); + out.writeBoolean(pageToken != null); + if (pageToken != null) { + pageToken.writePageToken(out); + } } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java index 41a4e4936c471..5a5e178eb6a82 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java @@ -48,7 +48,7 @@ public void doExecute(Task parentTask, CatShardsRequest shardsRequest, ActionLis clusterStateRequest.setShouldCancelOnTimeout(true); clusterStateRequest.local(shardsRequest.local()); clusterStateRequest.clusterManagerNodeTimeout(shardsRequest.clusterManagerNodeTimeout()); - if (Objects.nonNull(shardsRequest.getPageParams())) { + if (Objects.isNull(shardsRequest.getPageParams())) { clusterStateRequest.clear().nodes(true).routingTable(true).indices(shardsRequest.getIndices()); } else { clusterStateRequest.clear().nodes(true).routingTable(true).indices(shardsRequest.getIndices()).metadata(true); @@ -81,23 +81,26 @@ protected void innerOnFailure(Exception e) { client.admin().cluster().state(clusterStateRequest, new ActionListener() { @Override public void onResponse(ClusterStateResponse clusterStateResponse) { - ShardPaginationStrategy paginationStrategy = getPaginationStrategy(shardsRequest.getPageParams(), clusterStateResponse); - String[] indices = Objects.isNull(paginationStrategy) - ? shardsRequest.getIndices() - : paginationStrategy.getRequestedIndices().toArray(new String[0]); - catShardsResponse.setClusterStateResponse(clusterStateResponse); - catShardsResponse.setResponseShards( - Objects.isNull(paginationStrategy) - ? clusterStateResponse.getState().routingTable().allShards() - : paginationStrategy.getRequestedEntities() - ); - catShardsResponse.setPageToken(Objects.isNull(paginationStrategy) ? null : paginationStrategy.getResponseToken()); - IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); - indicesStatsRequest.setShouldCancelOnTimeout(true); - indicesStatsRequest.all(); - indicesStatsRequest.indices(indices); - indicesStatsRequest.setParentTask(client.getLocalNodeId(), parentTask.getId()); try { + ShardPaginationStrategy paginationStrategy = getPaginationStrategy( + shardsRequest.getPageParams(), + clusterStateResponse + ); + String[] indices = Objects.isNull(paginationStrategy) + ? shardsRequest.getIndices() + : paginationStrategy.getRequestedIndices().toArray(new String[0]); + catShardsResponse.setClusterStateResponse(clusterStateResponse); + catShardsResponse.setResponseShards( + Objects.isNull(paginationStrategy) + ? clusterStateResponse.getState().routingTable().allShards() + : paginationStrategy.getRequestedEntities() + ); + catShardsResponse.setPageToken(Objects.isNull(paginationStrategy) ? null : paginationStrategy.getResponseToken()); + IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); + indicesStatsRequest.setShouldCancelOnTimeout(true); + indicesStatsRequest.all(); + indicesStatsRequest.indices(indices); + indicesStatsRequest.setParentTask(client.getLocalNodeId(), parentTask.getId()); client.admin().indices().stats(indicesStatsRequest, new ActionListener() { @Override public void onResponse(IndicesStatsResponse indicesStatsResponse) { diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java index 7d44d70b285f6..ccb5bedf03fad 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java @@ -115,6 +115,7 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli shardsRequest.clusterManagerNodeTimeout(request.paramAsTime("cluster_manager_timeout", shardsRequest.clusterManagerNodeTimeout())); shardsRequest.setCancelAfterTimeInterval(request.paramAsTime("cancel_after_time_interval", NO_TIMEOUT)); shardsRequest.setIndices(indices); + shardsRequest.setPageParams(pageParams); parseDeprecatedMasterTimeoutParameter(shardsRequest, request, deprecationLogger, getName()); return channel -> client.execute(CatShardsAction.INSTANCE, shardsRequest, new RestResponseListener(channel) { @Override diff --git a/server/src/main/java/org/opensearch/rest/pagination/ShardPaginationStrategy.java b/server/src/main/java/org/opensearch/rest/pagination/ShardPaginationStrategy.java index 18a28a56175ff..b680012484b83 100644 --- a/server/src/main/java/org/opensearch/rest/pagination/ShardPaginationStrategy.java +++ b/server/src/main/java/org/opensearch/rest/pagination/ShardPaginationStrategy.java @@ -26,7 +26,6 @@ import static org.opensearch.rest.pagination.PageParams.PARAM_ASC_SORT_VALUE; - /** * This strategy can be used by the Rest APIs wanting to paginate the responses based on Shards. * The strategy considers create timestamps of indices and shardID as the keys to iterate over pages. @@ -50,89 +49,92 @@ public class ShardPaginationStrategy implements PaginationStrategy }; private PageToken pageToken; - private List requestedShardRoutings = new ArrayList<>(); - private List requestedIndices = new ArrayList<>(); + private List pageShardRoutings = new ArrayList<>(); + private List pageIndices = new ArrayList<>(); public ShardPaginationStrategy(PageParams pageParams, ClusterState clusterState) { + ShardStrategyToken shardStrategyToken = getShardStrategyToken(pageParams.getRequestedToken()); // Get list of indices metadata sorted by their creation time and filtered by the last sent index - List sortedIndices = PaginationStrategy.getSortedIndexMetadata( + List filteredIndices = PaginationStrategy.getSortedIndexMetadata( clusterState, - getMetadataFilter(pageParams.getRequestedToken(), pageParams.getSort()), + getIndexFilter(shardStrategyToken, pageParams.getSort()), PARAM_ASC_SORT_VALUE.equals(pageParams.getSort()) ? ASC_COMPARATOR : DESC_COMPARATOR ); // Get the list of shards and indices belonging to current page. Tuple, List> tuple = getPageData( clusterState.getRoutingTable().getIndicesRouting(), - sortedIndices, - pageParams.getSize(), - pageParams.getRequestedToken() + filteredIndices, + shardStrategyToken, + pageParams.getSize() ); - this.requestedShardRoutings = tuple.v1(); - List metadataSublist = tuple.v2(); + List pageShardRoutings = tuple.v1(); + List pageIndices = tuple.v2(); + this.pageShardRoutings = pageShardRoutings; // Get list of index names from the trimmed metadataSublist - this.requestedIndices = metadataSublist.stream().map(metadata -> metadata.getIndex().getName()).collect(Collectors.toList()); + this.pageIndices = pageIndices.stream().map(metadata -> metadata.getIndex().getName()).collect(Collectors.toList()); this.pageToken = getResponseToken( - pageParams.getSize(), - sortedIndices.size(), - metadataSublist.isEmpty() ? null : metadataSublist.get(metadataSublist.size() - 1), - tuple.v1().isEmpty() ? null : tuple.v1().get(tuple.v1().size() - 1) + pageIndices.isEmpty() ? null : pageIndices.get(pageIndices.size() - 1), + filteredIndices.isEmpty() ? null : filteredIndices.get(filteredIndices.size() - 1).getIndex().getName(), + pageShardRoutings.isEmpty() ? -1 : pageShardRoutings.get(pageShardRoutings.size() - 1).id() ); } - private static Predicate getMetadataFilter(String requestedTokenStr, String sortOrder) { - boolean isAscendingSort = sortOrder.equals(PARAM_ASC_SORT_VALUE); - ShardStrategyToken requestedToken = Objects.isNull(requestedTokenStr) || requestedTokenStr.isEmpty() - ? null - : new ShardStrategyToken(requestedTokenStr); - if (Objects.isNull(requestedToken)) { + private static Predicate getIndexFilter(ShardStrategyToken token, String sortOrder) { + if (Objects.isNull(token)) { return indexMetadata -> true; } + boolean isAscendingSort = sortOrder.equals(PARAM_ASC_SORT_VALUE); return metadata -> { - if (metadata.getIndex().getName().equals(requestedToken.lastIndexName)) { + if (metadata.getIndex().getName().equals(token.lastIndexName)) { return true; - } else if (metadata.getCreationDate() == requestedToken.lastIndexCreationTime) { + } else if (metadata.getCreationDate() == token.lastIndexCreationTime) { return isAscendingSort - ? metadata.getIndex().getName().compareTo(requestedToken.lastIndexName) > 0 - : metadata.getIndex().getName().compareTo(requestedToken.lastIndexName) < 0; + ? metadata.getIndex().getName().compareTo(token.lastIndexName) > 0 + : metadata.getIndex().getName().compareTo(token.lastIndexName) < 0; } return isAscendingSort - ? metadata.getCreationDate() > requestedToken.lastIndexCreationTime - : metadata.getCreationDate() < requestedToken.lastIndexCreationTime; + ? metadata.getCreationDate() > token.lastIndexCreationTime + : metadata.getCreationDate() < token.lastIndexCreationTime; }; } + /** + * Will be used to get the list of shards and respective indices to which they belong, + * which are to be displayed in a page. + * Note: All shards for a shardID will always be present in the same page. + */ private Tuple, List> getPageData( Map indicesRouting, - List sortedIndices, - final int pageSize, - String requestedTokenStr + List filteredIndices, + final ShardStrategyToken token, + final int numShardsRequired ) { List shardRoutings = new ArrayList<>(); List indexMetadataList = new ArrayList<>(); - ShardStrategyToken requestedToken = Objects.isNull(requestedTokenStr) || requestedTokenStr.isEmpty() - ? null - : new ShardStrategyToken(requestedTokenStr); int shardCount = 0; - for (IndexMetadata indexMetadata : sortedIndices) { + + // iterate over indices until shardCount is less than numShardsRequired + for (IndexMetadata indexMetadata : filteredIndices) { + String indexName = indexMetadata.getIndex().getName(); + int startShardId = getStartShardIdForPageIndex(token, indexName); boolean indexShardsAdded = false; - Map indexShardRoutingTable = indicesRouting.get(indexMetadata.getIndex().getName()) - .getShards(); - int shardId = Objects.isNull(requestedToken) ? 0 - : indexMetadata.getIndex().getName().equals(requestedToken.lastIndexName) ? requestedToken.lastShardId + 1 - : 0; - for (; shardId < indexShardRoutingTable.size(); shardId++) { - shardCount += indexShardRoutingTable.get(shardId).size(); - if (shardCount > pageSize) { + Map indexShardRoutingTable = indicesRouting.get(indexName).getShards(); + for (; startShardId < indexShardRoutingTable.size(); startShardId++) { + if (indexShardRoutingTable.get(startShardId).size() > numShardsRequired) { + throw new IllegalArgumentException("size value should be greater than the replica count of all indices"); + } + shardCount += indexShardRoutingTable.get(startShardId).size(); + if (shardCount > numShardsRequired) { break; } - shardRoutings.addAll(indexShardRoutingTable.get(shardId).shards()); + shardRoutings.addAll(indexShardRoutingTable.get(startShardId).shards()); indexShardsAdded = true; } // Add index to the list if any of its shard was added to the count. if (indexShardsAdded) { indexMetadataList.add(indexMetadata); } - if (shardCount >= pageSize) { + if (shardCount >= numShardsRequired) { break; } } @@ -140,16 +142,32 @@ private Tuple, List> getPageData( return new Tuple<>(shardRoutings, indexMetadataList); } - private PageToken getResponseToken(final int pageSize, final int totalIndices, IndexMetadata lastIndex, ShardRouting lastShard) { - if (totalIndices <= pageSize && lastIndex.getNumberOfShards() == lastShard.getId()) { + private PageToken getResponseToken(IndexMetadata pageEndIndex, final String lastFilteredIndexName, final int pageEndShardId) { + // If all the shards of filtered indices list have been included in pageShardRoutings, then no more + // shards are remaining to be fetched, and the next_token should thus be null. + String pageEndIndexName = Objects.isNull(pageEndIndex) ? null : pageEndIndex.getIndex().getName(); + if (Objects.isNull(pageEndIndexName) + || (pageEndIndexName.equals(lastFilteredIndexName) && pageEndIndex.getNumberOfShards() == pageEndShardId + 1)) { return new PageToken(null, DEFAULT_SHARDS_PAGINATED_ENTITY); } return new PageToken( - new ShardStrategyToken(lastShard.getId(), lastIndex.getCreationDate(), lastIndex.getIndex().getName()).generateEncryptedToken(), + new ShardStrategyToken(pageEndIndexName, pageEndShardId, pageEndIndex.getCreationDate()).generateEncryptedToken(), DEFAULT_SHARDS_PAGINATED_ENTITY ); } + private ShardStrategyToken getShardStrategyToken(String requestedToken) { + return Objects.isNull(requestedToken) || requestedToken.isEmpty() ? null : new ShardStrategyToken(requestedToken); + } + + /** + * Provides the shardId to start an index which is to be included in the page. If the index is same as + * lastIndex, start from the shardId next to lastShardId, else always start from 0. + */ + private int getStartShardIdForPageIndex(ShardStrategyToken token, final String pageIndexName) { + return Objects.isNull(token) ? 0 : token.lastIndexName.equals(pageIndexName) ? token.lastShardId + 1 : 0; + } + @Override public PageToken getResponseToken() { return pageToken; @@ -157,11 +175,11 @@ public PageToken getResponseToken() { @Override public List getRequestedEntities() { - return requestedShardRoutings; + return pageShardRoutings; } public List getRequestedIndices() { - return requestedIndices; + return pageIndices; } /** @@ -203,11 +221,11 @@ public ShardStrategyToken(String requestedTokenString) { this.lastIndexName = decryptedTokenElements[INDEX_NAME_POS_IN_TOKEN]; } - public ShardStrategyToken(int lastShardId, long creationTimeOfLastRespondedIndex, String nameOfLastRespondedIndex) { + public ShardStrategyToken(String lastIndexName, int lastShardId, long lastIndexCreationTime) { + Objects.requireNonNull(lastIndexName, "index name should be provided"); + this.lastIndexName = lastIndexName; this.lastShardId = lastShardId; - Objects.requireNonNull(nameOfLastRespondedIndex, "index name should be provided"); - this.lastIndexCreationTime = creationTimeOfLastRespondedIndex; - this.lastIndexName = nameOfLastRespondedIndex; + this.lastIndexCreationTime = lastIndexCreationTime; } public String generateEncryptedToken() {