diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f813fecf66cf..1033728db92c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -60,6 +60,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Indexed IP field supports `terms_query` with more than 1025 IP masks [#16391](https://github.com/opensearch-project/OpenSearch/pull/16391) - Make entries for dependencies from server/build.gradle to gradle version catalog ([#16707](https://github.com/opensearch-project/OpenSearch/pull/16707)) - Allow extended plugins to be optional ([#16909](https://github.com/opensearch-project/OpenSearch/pull/16909)) +- Delete useless loop in `TransportBulkAction$BulkOperation.doRun` q([#16950](https://github.com/opensearch-project/OpenSearch/pull/16950)) ### Deprecated - Performing update operation with default pipeline or final pipeline is deprecated ([#16712](https://github.com/opensearch-project/OpenSearch/pull/16712)) diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java index 19ffb12859183..db509afb68da9 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java @@ -532,6 +532,8 @@ protected void doRun() { } final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver); Metadata metadata = clusterState.metadata(); + // go over all the requests and create a ShardId -> Operations mapping + Map> requestsByShard = new HashMap<>(); for (int i = 0; i < bulkRequest.requests.size(); i++) { DocWriteRequest docWriteRequest = bulkRequest.requests.get(i); // the request can only be null because we set it to null in the previous step, so it gets ignored @@ -587,6 +589,12 @@ protected void doRun() { default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]"); } + + ShardId shardId = clusterService.operationRouting() + .indexShards(clusterState, concreteIndex.getName(), docWriteRequest.id(), docWriteRequest.routing()) + .shardId(); + List shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>()); + shardRequests.add(new BulkItemRequest(i, docWriteRequest)); } catch (OpenSearchParseException | IllegalArgumentException | RoutingMissingException e) { BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.id(), e); BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure); @@ -596,21 +604,6 @@ protected void doRun() { } } - // first, go over all the requests and create a ShardId -> Operations mapping - Map> requestsByShard = new HashMap<>(); - for (int i = 0; i < bulkRequest.requests.size(); i++) { - DocWriteRequest request = bulkRequest.requests.get(i); - if (request == null) { - continue; - } - String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName(); - ShardId shardId = clusterService.operationRouting() - .indexShards(clusterState, concreteIndex, request.id(), request.routing()) - .shardId(); - List shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>()); - shardRequests.add(new BulkItemRequest(i, request)); - } - if (requestsByShard.isEmpty()) { BulkItemResponse[] response = responses.toArray(new BulkItemResponse[responses.length()]); long tookMillis = buildTookInMillis(startTimeNanos);