Skip to content

Commit 94653c1

Browse files
committed
Spotless
Signed-off-by: Andy Qin <[email protected]>
1 parent f8169f3 commit 94653c1

File tree

7 files changed

+68
-54
lines changed

7 files changed

+68
-54
lines changed

server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -266,13 +266,25 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
266266
// System ingest pipelines and processors should handle these cases individually.
267267
boolean indexRequestHasPipeline = false;
268268
switch (updateRequest.getType()) {
269-
case NORMAL_UPDATE -> indexRequestHasPipeline |= ingestService.resolveSystemIngestPipeline(actionRequest, updateRequest.doc(), metadata);
269+
case NORMAL_UPDATE -> indexRequestHasPipeline |= ingestService.resolveSystemIngestPipeline(
270+
actionRequest,
271+
updateRequest.doc(),
272+
metadata
273+
);
270274
case NORMAL_UPSERT -> {
271275
indexRequestHasPipeline |= ingestService.resolveSystemIngestPipeline(actionRequest, updateRequest.doc(), metadata);
272276
indexRequestHasPipeline |= ingestService.resolvePipelines(actionRequest, updateRequest.upsertRequest(), metadata);
273277
}
274-
case UPSERT_WITH_SCRIPT -> indexRequestHasPipeline |= ingestService.resolvePipelines(actionRequest, updateRequest.upsertRequest(), metadata);
275-
case DOC_AS_UPSERT -> indexRequestHasPipeline |= ingestService.resolvePipelines(actionRequest, updateRequest.doc(), metadata);
278+
case UPSERT_WITH_SCRIPT -> indexRequestHasPipeline |= ingestService.resolvePipelines(
279+
actionRequest,
280+
updateRequest.upsertRequest(),
281+
metadata
282+
);
283+
case DOC_AS_UPSERT -> indexRequestHasPipeline |= ingestService.resolvePipelines(
284+
actionRequest,
285+
updateRequest.doc(),
286+
metadata
287+
);
276288
// Pure scripted updates have no child index requests, so nothing is resolved.
277289
}
278290
hasIndexRequestsWithPipelines |= indexRequestHasPipeline;
@@ -300,16 +312,12 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
300312
// this path is never taken.
301313
try {
302314
if (Assertions.ENABLED) {
303-
final boolean arePipelinesResolved = bulkRequest.requests()
304-
.stream()
305-
.flatMap(request -> {
306-
if (request instanceof UpdateRequest updateRequest) {
307-
return updateRequest.getChildIndexRequests().stream();
308-
}
309-
return Stream.of(getIndexWriteRequest(request));
310-
})
311-
.filter(Objects::nonNull)
312-
.allMatch(IndexRequest::isPipelineResolved);
315+
final boolean arePipelinesResolved = bulkRequest.requests().stream().flatMap(request -> {
316+
if (request instanceof UpdateRequest updateRequest) {
317+
return updateRequest.getChildIndexRequests().stream();
318+
}
319+
return Stream.of(getIndexWriteRequest(request));
320+
}).filter(Objects::nonNull).allMatch(IndexRequest::isPipelineResolved);
313321
assert arePipelinesResolved : bulkRequest;
314322
}
315323
if (clusterService.localNode().isIngestNode()) {

server/src/main/java/org/opensearch/action/update/UpdateRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1054,7 +1054,7 @@ public UpdateRequest.Type getType() {
10541054
/**
10551055
* Inner enum to classify the type of update request.
10561056
*/
1057-
@PublicApi(since="3.1.0")
1057+
@PublicApi(since = "3.1.0")
10581058
public enum Type {
10591059
NORMAL_UPDATE,
10601060
NORMAL_UPSERT,

server/src/main/java/org/opensearch/ingest/CompoundProcessor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,9 @@ void innerBatchExecute(
245245
doc.getIngestDocument(),
246246
(IngestProcessorException) doc.getException(),
247247
(result, ex) -> {
248-
handler.accept(Collections.singletonList(new IngestDocumentWrapper(doc.getSlot(), doc.getInnerSlot(), result, ex)));
248+
handler.accept(
249+
Collections.singletonList(new IngestDocumentWrapper(doc.getSlot(), doc.getInnerSlot(), result, ex))
250+
);
249251
}
250252
)
251253
);

server/src/main/java/org/opensearch/ingest/IngestService.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,11 @@ public boolean resolveSystemIngestPipeline(
274274
if (v2Template != null) {
275275
systemIngestPipelineId = getSystemIngestPipelineForTemplateV2(v2Template, indexRequest);
276276
} else {
277-
List<IndexTemplateMetadata> templates = MetadataIndexTemplateService.findV1Templates(metadata, indexRequest.index(), null);
277+
List<IndexTemplateMetadata> templates = MetadataIndexTemplateService.findV1Templates(
278+
metadata,
279+
indexRequest.index(),
280+
null
281+
);
278282
systemIngestPipelineId = getSystemIngestPipelineForTemplateV1(templates, indexRequest);
279283
}
280284
}
@@ -1412,10 +1416,7 @@ private void innerBatchExecute(
14121416
}
14131417
for (IngestDocumentWrapper ingestDocumentWrapper : succeeded) {
14141418
SlotKey slotKey = new SlotKey(ingestDocumentWrapper.getSlot(), ingestDocumentWrapper.getInnerSlot());
1415-
updateIndexRequestWithIngestDocument(
1416-
slotToindexRequestMap.get(slotKey),
1417-
ingestDocumentWrapper.getIngestDocument()
1418-
);
1419+
updateIndexRequestWithIngestDocument(slotToindexRequestMap.get(slotKey), ingestDocumentWrapper.getIngestDocument());
14191420
}
14201421
handler.accept(allResults);
14211422
}
@@ -1662,15 +1663,23 @@ private static IngestDocumentWrapper toIngestDocumentWrapper(int slot, int inner
16621663
return new IngestDocumentWrapper(slot, innerSlot, toIngestDocument(indexRequest), null);
16631664
}
16641665

1665-
private static List<IngestDocumentWrapper> toIngestDocumentWrappers(List<Integer> slots, List<Integer> innerSlots, List<IndexRequest> indexRequests) {
1666+
private static List<IngestDocumentWrapper> toIngestDocumentWrappers(
1667+
List<Integer> slots,
1668+
List<Integer> innerSlots,
1669+
List<IndexRequest> indexRequests
1670+
) {
16661671
List<IngestDocumentWrapper> ingestDocumentWrappers = new ArrayList<>();
16671672
for (int i = 0; i < slots.size(); ++i) {
16681673
ingestDocumentWrappers.add(toIngestDocumentWrapper(slots.get(i), innerSlots.get(i), indexRequests.get(i)));
16691674
}
16701675
return ingestDocumentWrappers;
16711676
}
16721677

1673-
private static Map<SlotKey, IndexRequest> createSlotIndexRequestMap(List<Integer> slots, List<Integer> innerSlots, List<IndexRequest> indexRequests) {
1678+
private static Map<SlotKey, IndexRequest> createSlotIndexRequestMap(
1679+
List<Integer> slots,
1680+
List<Integer> innerSlots,
1681+
List<IndexRequest> indexRequests
1682+
) {
16741683
Map<SlotKey, IndexRequest> slotIndexRequestMap = new HashMap<>();
16751684
for (int i = 0; i < slots.size(); ++i) {
16761685
slotIndexRequestMap.put(new SlotKey(slots.get(i), innerSlots.get(i)), indexRequests.get(i));
@@ -1684,5 +1693,6 @@ private static Map<SlotKey, IndexRequest> createSlotIndexRequestMap(List<Integer
16841693
* @param slot
16851694
* @param innerSlot
16861695
*/
1687-
private record SlotKey(int slot, int innerSlot) {}
1696+
private record SlotKey(int slot, int innerSlot) {
1697+
}
16881698
}

server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,11 @@ public void testNonExceptional() {
8484
bulkRequest.add(new IndexRequest(randomAlphaOfLength(5)));
8585
bulkRequest.add(new IndexRequest(randomAlphaOfLength(5)));
8686
bulkRequest.add(new DeleteRequest(randomAlphaOfLength(5)));
87-
bulkRequest.add(new UpdateRequest(randomAlphaOfLength(5), randomAlphaOfLength(5))
88-
.doc(new IndexRequest(randomAlphaOfLength(5))
89-
.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1")));
87+
bulkRequest.add(
88+
new UpdateRequest(randomAlphaOfLength(5), randomAlphaOfLength(5)).doc(
89+
new IndexRequest(randomAlphaOfLength(5)).source(Requests.INDEX_CONTENT_TYPE, "field1", "value1")
90+
)
91+
);
9092
// Test emulating auto_create_index=false
9193
indicesThatCannotBeCreatedTestCase(emptySet(), bulkRequest, null);
9294
// Test emulating auto_create_index=true
@@ -102,9 +104,11 @@ public void testAllFail() {
102104
bulkRequest.add(new IndexRequest("no"));
103105
bulkRequest.add(new IndexRequest("can't"));
104106
bulkRequest.add(new DeleteRequest("do").version(0).versionType(VersionType.EXTERNAL));
105-
bulkRequest.add(new UpdateRequest("nothin", randomAlphaOfLength(5))
106-
.doc(new IndexRequest(randomAlphaOfLength(5))
107-
.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1")));
107+
bulkRequest.add(
108+
new UpdateRequest("nothin", randomAlphaOfLength(5)).doc(
109+
new IndexRequest(randomAlphaOfLength(5)).source(Requests.INDEX_CONTENT_TYPE, "field1", "value1")
110+
)
111+
);
108112
indicesThatCannotBeCreatedTestCase(new HashSet<>(Arrays.asList("no", "can't", "do", "nothin")), bulkRequest, index -> {
109113
throw new IndexNotFoundException("Can't make it because I say so");
110114
});

server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -579,17 +579,11 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa
579579
.script(mockScript("1"))
580580
.scriptedUpsert(true);
581581
UpdateRequest regularUpdate = new UpdateRequest(updateRequestIndexName, "id4").doc(indexRequestForNormalUpdate);
582-
UpdateRequest upsertWithDocAndUpsertRequest = new UpdateRequest(updateRequestIndexName, "id4")
583-
.doc(indexRequestDocForNormalUpsert)
582+
UpdateRequest upsertWithDocAndUpsertRequest = new UpdateRequest(updateRequestIndexName, "id4").doc(indexRequestDocForNormalUpsert)
584583
.upsert(indexRequestUpsertForNormalUpsert);
585584

586-
587585
// Add update requests
588-
bulkRequest.add(upsertRequest)
589-
.add(docAsUpsertRequest)
590-
.add(scriptedUpsert)
591-
.add(regularUpdate)
592-
.add(upsertWithDocAndUpsertRequest);
586+
bulkRequest.add(upsertRequest).add(docAsUpsertRequest).add(scriptedUpsert).add(regularUpdate).add(upsertWithDocAndUpsertRequest);
593587

594588
AtomicBoolean responseCalled = new AtomicBoolean(false);
595589
AtomicBoolean failureCalled = new AtomicBoolean(false);
@@ -647,7 +641,11 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa
647641
verify(ingestService, times(1)).resolveSystemIngestPipeline(any(), eq(indexRequestForNormalUpdate), any());
648642

649643
verify(ingestService, never()).resolvePipelines(any(), eq(indexRequestDocForNormalUpsert), any());
650-
verify(ingestService, times(1)).resolveSystemIngestPipeline(eq(upsertWithDocAndUpsertRequest), eq(indexRequestDocForNormalUpsert), any());
644+
verify(ingestService, times(1)).resolveSystemIngestPipeline(
645+
eq(upsertWithDocAndUpsertRequest),
646+
eq(indexRequestDocForNormalUpsert),
647+
any()
648+
);
651649

652650
verify(ingestService, times(1)).resolvePipelines(eq(upsertWithDocAndUpsertRequest), eq(indexRequestUpsertForNormalUpsert), any());
653651
verify(ingestService, never()).resolveSystemIngestPipeline(any(), eq(indexRequestUpsertForNormalUpsert), any());
@@ -656,9 +654,9 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa
656654
indexRequest1.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
657655
indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
658656
indexRequest3.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
659-
indexRequestForNormalUpdate.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
660-
indexRequestDocForNormalUpsert.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
661-
indexRequestUpsertForNormalUpsert.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
657+
indexRequestForNormalUpdate.setPipeline(IngestService.NOOP_PIPELINE_NAME);
658+
indexRequestDocForNormalUpsert.setPipeline(IngestService.NOOP_PIPELINE_NAME);
659+
indexRequestUpsertForNormalUpsert.setPipeline(IngestService.NOOP_PIPELINE_NAME);
662660

663661
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
664662
assertTrue(action.isExecuted);

server/src/test/java/org/opensearch/ingest/IngestServiceTests.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1316,8 +1316,7 @@ public void testBulkRequestExecutionWithFailures() {
13161316
UpdateRequest updateRequest = new UpdateRequest("_index", "_id");
13171317

13181318
// We attach a child index request
1319-
IndexRequest indexRequest = new IndexRequest("_index").id("_id")
1320-
.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1");
1319+
IndexRequest indexRequest = new IndexRequest("_index").id("_id").source(Requests.INDEX_CONTENT_TYPE, "field1", "value1");
13211320
updateRequest.doc(indexRequest);
13221321
bulkRequest.add(updateRequest);
13231322
}
@@ -2213,17 +2212,10 @@ public void testExecuteBulkRequestInBatchWithExceptionAndDropInCallback_requests
22132212
final Map<Integer, List<Exception>> failureHandler = new HashMap<>();
22142213
final Map<Thread, Exception> completionHandler = new HashMap<>();
22152214
final List<Integer> dropHandler = new ArrayList<>();
2216-
ingestService.executeBulkRequest(
2217-
5,
2218-
bulkRequest.requests(),
2219-
(slot, exception) -> {
2220-
// Collect exceptions into a map of slots to list to inspect later
2221-
failureHandler.computeIfAbsent(slot, i -> new ArrayList<>()).add(exception);
2222-
},
2223-
completionHandler::put,
2224-
dropHandler::add,
2225-
Names.WRITE
2226-
);
2215+
ingestService.executeBulkRequest(5, bulkRequest.requests(), (slot, exception) -> {
2216+
// Collect exceptions into a map of slots to list to inspect later
2217+
failureHandler.computeIfAbsent(slot, i -> new ArrayList<>()).add(exception);
2218+
}, completionHandler::put, dropHandler::add, Names.WRITE);
22272219

22282220
// The first and second slots should have failures
22292221
assertEquals(Set.of(1, 2), failureHandler.keySet());

0 commit comments

Comments
 (0)