Skip to content

Commit

Permalink
Refactoring and bug fixes for ShardPaginationStrategy
Browse files Browse the repository at this point in the history
Signed-off-by: Harsh Garg <[email protected]>
  • Loading branch information
Harsh Garg committed Sep 27, 2024
1 parent 437b87f commit 6c855ce
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ public CatShardsRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
indices = in.readStringArray();
cancelAfterTimeInterval = in.readTimeValue();
pageParams = PageParams.readPageParams(in);
if (in.readBoolean()) {
pageParams = PageParams.readPageParams(in);
}
}
}

Expand All @@ -49,7 +51,10 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeStringArray(indices);
out.writeTimeValue(cancelAfterTimeInterval);
pageParams.writePageParams(out);
out.writeBoolean(pageParams != null);
if (pageParams != null) {
pageParams.writePageParams(out);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ public CatShardsResponse(StreamInput in) throws IOException {
indicesStatsResponse = new IndicesStatsResponse(in);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
responseShards = in.readList(ShardRouting::new);
pageToken = PageToken.readPageToken(in);
if (in.readBoolean()) {
pageToken = PageToken.readPageToken(in);
}
}
}

Expand All @@ -52,7 +54,10 @@ public void writeTo(StreamOutput out) throws IOException {
indicesStatsResponse.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeList(responseShards);
pageToken.writePageToken(out);
out.writeBoolean(pageToken != null);
if (pageToken != null) {
pageToken.writePageToken(out);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void doExecute(Task parentTask, CatShardsRequest shardsRequest, ActionLis
clusterStateRequest.setShouldCancelOnTimeout(true);
clusterStateRequest.local(shardsRequest.local());
clusterStateRequest.clusterManagerNodeTimeout(shardsRequest.clusterManagerNodeTimeout());
if (Objects.nonNull(shardsRequest.getPageParams())) {
if (Objects.isNull(shardsRequest.getPageParams())) {
clusterStateRequest.clear().nodes(true).routingTable(true).indices(shardsRequest.getIndices());
} else {
clusterStateRequest.clear().nodes(true).routingTable(true).indices(shardsRequest.getIndices()).metadata(true);
Expand Down Expand Up @@ -81,23 +81,26 @@ protected void innerOnFailure(Exception e) {
client.admin().cluster().state(clusterStateRequest, new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(ClusterStateResponse clusterStateResponse) {
ShardPaginationStrategy paginationStrategy = getPaginationStrategy(shardsRequest.getPageParams(), clusterStateResponse);
String[] indices = Objects.isNull(paginationStrategy)
? shardsRequest.getIndices()
: paginationStrategy.getRequestedIndices().toArray(new String[0]);
catShardsResponse.setClusterStateResponse(clusterStateResponse);
catShardsResponse.setResponseShards(
Objects.isNull(paginationStrategy)
? clusterStateResponse.getState().routingTable().allShards()
: paginationStrategy.getRequestedEntities()
);
catShardsResponse.setPageToken(Objects.isNull(paginationStrategy) ? null : paginationStrategy.getResponseToken());
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.setShouldCancelOnTimeout(true);
indicesStatsRequest.all();
indicesStatsRequest.indices(indices);
indicesStatsRequest.setParentTask(client.getLocalNodeId(), parentTask.getId());
try {
ShardPaginationStrategy paginationStrategy = getPaginationStrategy(
shardsRequest.getPageParams(),
clusterStateResponse
);
String[] indices = Objects.isNull(paginationStrategy)
? shardsRequest.getIndices()
: paginationStrategy.getRequestedIndices().toArray(new String[0]);
catShardsResponse.setClusterStateResponse(clusterStateResponse);
catShardsResponse.setResponseShards(
Objects.isNull(paginationStrategy)
? clusterStateResponse.getState().routingTable().allShards()
: paginationStrategy.getRequestedEntities()
);
catShardsResponse.setPageToken(Objects.isNull(paginationStrategy) ? null : paginationStrategy.getResponseToken());
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.setShouldCancelOnTimeout(true);
indicesStatsRequest.all();
indicesStatsRequest.indices(indices);
indicesStatsRequest.setParentTask(client.getLocalNodeId(), parentTask.getId());
client.admin().indices().stats(indicesStatsRequest, new ActionListener<IndicesStatsResponse>() {
@Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli
shardsRequest.clusterManagerNodeTimeout(request.paramAsTime("cluster_manager_timeout", shardsRequest.clusterManagerNodeTimeout()));
shardsRequest.setCancelAfterTimeInterval(request.paramAsTime("cancel_after_time_interval", NO_TIMEOUT));
shardsRequest.setIndices(indices);
shardsRequest.setPageParams(pageParams);
parseDeprecatedMasterTimeoutParameter(shardsRequest, request, deprecationLogger, getName());
return channel -> client.execute(CatShardsAction.INSTANCE, shardsRequest, new RestResponseListener<CatShardsResponse>(channel) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import static org.opensearch.rest.pagination.PageParams.PARAM_ASC_SORT_VALUE;


/**
* This strategy can be used by the Rest APIs wanting to paginate the responses based on Shards.
* The strategy considers create timestamps of indices and shardID as the keys to iterate over pages.
Expand All @@ -50,118 +49,137 @@ public class ShardPaginationStrategy implements PaginationStrategy<ShardRouting>
};

private PageToken pageToken;
private List<ShardRouting> requestedShardRoutings = new ArrayList<>();
private List<String> requestedIndices = new ArrayList<>();
private List<ShardRouting> pageShardRoutings = new ArrayList<>();
private List<String> pageIndices = new ArrayList<>();

public ShardPaginationStrategy(PageParams pageParams, ClusterState clusterState) {
ShardStrategyToken shardStrategyToken = getShardStrategyToken(pageParams.getRequestedToken());
// Get list of indices metadata sorted by their creation time and filtered by the last sent index
List<IndexMetadata> sortedIndices = PaginationStrategy.getSortedIndexMetadata(
List<IndexMetadata> filteredIndices = PaginationStrategy.getSortedIndexMetadata(
clusterState,
getMetadataFilter(pageParams.getRequestedToken(), pageParams.getSort()),
getIndexFilter(shardStrategyToken, pageParams.getSort()),
PARAM_ASC_SORT_VALUE.equals(pageParams.getSort()) ? ASC_COMPARATOR : DESC_COMPARATOR
);
// Get the list of shards and indices belonging to current page.
Tuple<List<ShardRouting>, List<IndexMetadata>> tuple = getPageData(
clusterState.getRoutingTable().getIndicesRouting(),
sortedIndices,
pageParams.getSize(),
pageParams.getRequestedToken()
filteredIndices,
shardStrategyToken,
pageParams.getSize()
);
this.requestedShardRoutings = tuple.v1();
List<IndexMetadata> metadataSublist = tuple.v2();
List<ShardRouting> pageShardRoutings = tuple.v1();
List<IndexMetadata> pageIndices = tuple.v2();
this.pageShardRoutings = pageShardRoutings;
// Get list of index names from the trimmed metadataSublist
this.requestedIndices = metadataSublist.stream().map(metadata -> metadata.getIndex().getName()).collect(Collectors.toList());
this.pageIndices = pageIndices.stream().map(metadata -> metadata.getIndex().getName()).collect(Collectors.toList());
this.pageToken = getResponseToken(
pageParams.getSize(),
sortedIndices.size(),
metadataSublist.isEmpty() ? null : metadataSublist.get(metadataSublist.size() - 1),
tuple.v1().isEmpty() ? null : tuple.v1().get(tuple.v1().size() - 1)
pageIndices.isEmpty() ? null : pageIndices.get(pageIndices.size() - 1),
filteredIndices.isEmpty() ? null : filteredIndices.get(filteredIndices.size() - 1).getIndex().getName(),
pageShardRoutings.isEmpty() ? -1 : pageShardRoutings.get(pageShardRoutings.size() - 1).id()
);
}

private static Predicate<IndexMetadata> getMetadataFilter(String requestedTokenStr, String sortOrder) {
boolean isAscendingSort = sortOrder.equals(PARAM_ASC_SORT_VALUE);
ShardStrategyToken requestedToken = Objects.isNull(requestedTokenStr) || requestedTokenStr.isEmpty()
? null
: new ShardStrategyToken(requestedTokenStr);
if (Objects.isNull(requestedToken)) {
private static Predicate<IndexMetadata> getIndexFilter(ShardStrategyToken token, String sortOrder) {
if (Objects.isNull(token)) {
return indexMetadata -> true;
}
boolean isAscendingSort = sortOrder.equals(PARAM_ASC_SORT_VALUE);
return metadata -> {
if (metadata.getIndex().getName().equals(requestedToken.lastIndexName)) {
if (metadata.getIndex().getName().equals(token.lastIndexName)) {
return true;
} else if (metadata.getCreationDate() == requestedToken.lastIndexCreationTime) {
} else if (metadata.getCreationDate() == token.lastIndexCreationTime) {
return isAscendingSort
? metadata.getIndex().getName().compareTo(requestedToken.lastIndexName) > 0
: metadata.getIndex().getName().compareTo(requestedToken.lastIndexName) < 0;
? metadata.getIndex().getName().compareTo(token.lastIndexName) > 0
: metadata.getIndex().getName().compareTo(token.lastIndexName) < 0;
}
return isAscendingSort
? metadata.getCreationDate() > requestedToken.lastIndexCreationTime
: metadata.getCreationDate() < requestedToken.lastIndexCreationTime;
? metadata.getCreationDate() > token.lastIndexCreationTime
: metadata.getCreationDate() < token.lastIndexCreationTime;
};
}

/**
* Will be used to get the list of shards and respective indices to which they belong,
* which are to be displayed in a page.
* Note: All shards for a shardID will always be present in the same page.
*/
private Tuple<List<ShardRouting>, List<IndexMetadata>> getPageData(
Map<String, IndexRoutingTable> indicesRouting,
List<IndexMetadata> sortedIndices,
final int pageSize,
String requestedTokenStr
List<IndexMetadata> filteredIndices,
final ShardStrategyToken token,
final int numShardsRequired
) {
List<ShardRouting> shardRoutings = new ArrayList<>();
List<IndexMetadata> indexMetadataList = new ArrayList<>();
ShardStrategyToken requestedToken = Objects.isNull(requestedTokenStr) || requestedTokenStr.isEmpty()
? null
: new ShardStrategyToken(requestedTokenStr);
int shardCount = 0;
for (IndexMetadata indexMetadata : sortedIndices) {

// iterate over indices until shardCount is less than numShardsRequired
for (IndexMetadata indexMetadata : filteredIndices) {
String indexName = indexMetadata.getIndex().getName();
int startShardId = getStartShardIdForPageIndex(token, indexName);
boolean indexShardsAdded = false;
Map<Integer, IndexShardRoutingTable> indexShardRoutingTable = indicesRouting.get(indexMetadata.getIndex().getName())
.getShards();
int shardId = Objects.isNull(requestedToken) ? 0
: indexMetadata.getIndex().getName().equals(requestedToken.lastIndexName) ? requestedToken.lastShardId + 1
: 0;
for (; shardId < indexShardRoutingTable.size(); shardId++) {
shardCount += indexShardRoutingTable.get(shardId).size();
if (shardCount > pageSize) {
Map<Integer, IndexShardRoutingTable> indexShardRoutingTable = indicesRouting.get(indexName).getShards();
for (; startShardId < indexShardRoutingTable.size(); startShardId++) {
if (indexShardRoutingTable.get(startShardId).size() > numShardsRequired) {
throw new IllegalArgumentException("size value should be greater than the replica count of all indices");
}
shardCount += indexShardRoutingTable.get(startShardId).size();
if (shardCount > numShardsRequired) {
break;
}
shardRoutings.addAll(indexShardRoutingTable.get(shardId).shards());
shardRoutings.addAll(indexShardRoutingTable.get(startShardId).shards());
indexShardsAdded = true;
}
// Add index to the list if any of its shard was added to the count.
if (indexShardsAdded) {
indexMetadataList.add(indexMetadata);
}
if (shardCount >= pageSize) {
if (shardCount >= numShardsRequired) {
break;
}
}

return new Tuple<>(shardRoutings, indexMetadataList);
}

private PageToken getResponseToken(final int pageSize, final int totalIndices, IndexMetadata lastIndex, ShardRouting lastShard) {
if (totalIndices <= pageSize && lastIndex.getNumberOfShards() == lastShard.getId()) {
private PageToken getResponseToken(IndexMetadata pageEndIndex, final String lastFilteredIndexName, final int pageEndShardId) {
// If all the shards of filtered indices list have been included in pageShardRoutings, then no more
// shards are remaining to be fetched, and the next_token should thus be null.
String pageEndIndexName = Objects.isNull(pageEndIndex) ? null : pageEndIndex.getIndex().getName();
if (Objects.isNull(pageEndIndexName)
|| (pageEndIndexName.equals(lastFilteredIndexName) && pageEndIndex.getNumberOfShards() == pageEndShardId + 1)) {
return new PageToken(null, DEFAULT_SHARDS_PAGINATED_ENTITY);
}
return new PageToken(
new ShardStrategyToken(lastShard.getId(), lastIndex.getCreationDate(), lastIndex.getIndex().getName()).generateEncryptedToken(),
new ShardStrategyToken(pageEndIndexName, pageEndShardId, pageEndIndex.getCreationDate()).generateEncryptedToken(),
DEFAULT_SHARDS_PAGINATED_ENTITY
);
}

private ShardStrategyToken getShardStrategyToken(String requestedToken) {
return Objects.isNull(requestedToken) || requestedToken.isEmpty() ? null : new ShardStrategyToken(requestedToken);
}

/**
* Provides the shardId to start an index which is to be included in the page. If the index is same as
* lastIndex, start from the shardId next to lastShardId, else always start from 0.
*/
private int getStartShardIdForPageIndex(ShardStrategyToken token, final String pageIndexName) {
return Objects.isNull(token) ? 0 : token.lastIndexName.equals(pageIndexName) ? token.lastShardId + 1 : 0;
}

@Override
public PageToken getResponseToken() {
return pageToken;
}

@Override
public List<ShardRouting> getRequestedEntities() {
return requestedShardRoutings;
return pageShardRoutings;
}

public List<String> getRequestedIndices() {
return requestedIndices;
return pageIndices;
}

/**
Expand Down Expand Up @@ -203,11 +221,11 @@ public ShardStrategyToken(String requestedTokenString) {
this.lastIndexName = decryptedTokenElements[INDEX_NAME_POS_IN_TOKEN];
}

public ShardStrategyToken(int lastShardId, long creationTimeOfLastRespondedIndex, String nameOfLastRespondedIndex) {
public ShardStrategyToken(String lastIndexName, int lastShardId, long lastIndexCreationTime) {
Objects.requireNonNull(lastIndexName, "index name should be provided");
this.lastIndexName = lastIndexName;
this.lastShardId = lastShardId;
Objects.requireNonNull(nameOfLastRespondedIndex, "index name should be provided");
this.lastIndexCreationTime = creationTimeOfLastRespondedIndex;
this.lastIndexName = nameOfLastRespondedIndex;
this.lastIndexCreationTime = lastIndexCreationTime;
}

public String generateEncryptedToken() {
Expand Down

0 comments on commit 6c855ce

Please sign in to comment.