Skip to content

Commit 5126f6d

Browse files
authored
Internal refactoring of some IngestService methods (#92203)
1 parent 7acf909 commit 5126f6d

File tree

4 files changed

+41
-40
lines changed

4 files changed

+41
-40
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -780,6 +780,7 @@ private void processBulkIndexIngestRequest(
780780
ingestService.executeBulkRequest(
781781
original.numberOfActions(),
782782
() -> bulkRequestModifier,
783+
bulkRequestModifier::markItemAsDropped,
783784
bulkRequestModifier::markItemAsFailed,
784785
(originalThread, exception) -> {
785786
if (exception != null) {
@@ -823,7 +824,6 @@ public boolean isForceExecution() {
823824
}
824825
}
825826
},
826-
bulkRequestModifier::markItemAsDropped,
827827
executorName
828828
);
829829
}

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -668,13 +668,14 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, String pipelin
668668
}
669669

670670
public void executeBulkRequest(
671-
int numberOfActionRequests,
672-
Iterable<DocWriteRequest<?>> actionRequests,
673-
BiConsumer<Integer, Exception> onFailure,
674-
BiConsumer<Thread, Exception> onCompletion,
675-
IntConsumer onDropped,
676-
String executorName
671+
final int numberOfActionRequests,
672+
final Iterable<DocWriteRequest<?>> actionRequests,
673+
final IntConsumer onDropped,
674+
final BiConsumer<Integer, Exception> onFailure,
675+
final BiConsumer<Thread, Exception> onCompletion,
676+
final String executorName
677677
) {
678+
assert numberOfActionRequests > 0 : "numberOfActionRequests must be greater than 0 but was [" + numberOfActionRequests + "]";
678679

679680
threadPool.executor(executorName).execute(new AbstractRunnable() {
680681

@@ -886,11 +887,11 @@ static String getProcessorName(Processor processor) {
886887
}
887888

888889
private void innerExecute(
889-
int slot,
890-
IndexRequest indexRequest,
891-
Pipeline pipeline,
892-
IntConsumer itemDroppedHandler,
893-
Consumer<Exception> handler
890+
final int slot,
891+
final IndexRequest indexRequest,
892+
final Pipeline pipeline,
893+
final IntConsumer itemDroppedHandler,
894+
final Consumer<Exception> handler
894895
) {
895896
if (pipeline.getProcessors().isEmpty()) {
896897
handler.accept(null);

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -290,9 +290,9 @@ public void testIngestLocal() throws Exception {
290290
verify(ingestService).executeBulkRequest(
291291
eq(bulkRequest.numberOfActions()),
292292
bulkDocsItr.capture(),
293+
any(),
293294
failureHandler.capture(),
294295
completionHandler.capture(),
295-
any(),
296296
eq(Names.WRITE)
297297
);
298298
completionHandler.getValue().accept(null, exception);
@@ -332,9 +332,9 @@ public void testSingleItemBulkActionIngestLocal() throws Exception {
332332
verify(ingestService).executeBulkRequest(
333333
eq(1),
334334
bulkDocsItr.capture(),
335+
any(),
335336
failureHandler.capture(),
336337
completionHandler.capture(),
337-
any(),
338338
eq(Names.WRITE)
339339
);
340340
completionHandler.getValue().accept(null, exception);
@@ -378,9 +378,9 @@ public void testIngestSystemLocal() throws Exception {
378378
verify(ingestService).executeBulkRequest(
379379
eq(bulkRequest.numberOfActions()),
380380
bulkDocsItr.capture(),
381+
any(),
381382
failureHandler.capture(),
382383
completionHandler.capture(),
383-
any(),
384384
eq(Names.SYSTEM_WRITE)
385385
);
386386
completionHandler.getValue().accept(null, exception);
@@ -535,9 +535,9 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa
535535
verify(ingestService).executeBulkRequest(
536536
eq(bulkRequest.numberOfActions()),
537537
bulkDocsItr.capture(),
538+
any(),
538539
failureHandler.capture(),
539540
completionHandler.capture(),
540-
any(),
541541
eq(Names.WRITE)
542542
);
543543
assertEquals(indexRequest1.getPipeline(), "default_pipeline");
@@ -583,9 +583,9 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception {
583583
verify(ingestService).executeBulkRequest(
584584
eq(1),
585585
bulkDocsItr.capture(),
586+
any(),
586587
failureHandler.capture(),
587588
completionHandler.capture(),
588-
any(),
589589
eq(Names.WRITE)
590590
);
591591
completionHandler.getValue().accept(null, exception);
@@ -677,9 +677,9 @@ public void testFindDefaultPipelineFromTemplateMatch() {
677677
verify(ingestService).executeBulkRequest(
678678
eq(1),
679679
bulkDocsItr.capture(),
680+
any(),
680681
failureHandler.capture(),
681682
completionHandler.capture(),
682-
any(),
683683
eq(Names.WRITE)
684684
);
685685
}
@@ -721,9 +721,9 @@ public void testFindDefaultPipelineFromV2TemplateMatch() {
721721
verify(ingestService).executeBulkRequest(
722722
eq(1),
723723
bulkDocsItr.capture(),
724+
any(),
724725
failureHandler.capture(),
725726
completionHandler.capture(),
726-
any(),
727727
eq(Names.WRITE)
728728
);
729729
}
@@ -751,9 +751,9 @@ public void testIngestCallbackExceptionHandled() throws Exception {
751751
verify(ingestService).executeBulkRequest(
752752
eq(bulkRequest.numberOfActions()),
753753
bulkDocsItr.capture(),
754+
any(),
754755
failureHandler.capture(),
755756
completionHandler.capture(),
756-
any(),
757757
eq(Names.WRITE)
758758
);
759759
indexRequest1.autoGenerateId();
@@ -788,9 +788,9 @@ private void validateDefaultPipeline(IndexRequest indexRequest) {
788788
verify(ingestService).executeBulkRequest(
789789
eq(1),
790790
bulkDocsItr.capture(),
791+
any(),
791792
failureHandler.capture(),
792793
completionHandler.capture(),
793-
any(),
794794
eq(Names.WRITE)
795795
);
796796
assertEquals(indexRequest.getPipeline(), "default_pipeline");

server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -195,9 +195,9 @@ public void testExecuteIndexPipelineDoesNotExist() {
195195
ingestService.executeBulkRequest(
196196
1,
197197
Collections.singletonList(indexRequest),
198+
indexReq -> {},
198199
failureHandler,
199200
completionHandler,
200-
indexReq -> {},
201201
Names.WRITE
202202
);
203203

@@ -963,9 +963,9 @@ public String getType() {
963963
ingestService.executeBulkRequest(
964964
bulkRequest.numberOfActions(),
965965
bulkRequest.requests(),
966+
indexReq -> {},
966967
failureHandler,
967968
completionHandler,
968-
indexReq -> {},
969969
Names.WRITE
970970
);
971971

@@ -1009,9 +1009,9 @@ public void testExecuteBulkPipelineDoesNotExist() {
10091009
ingestService.executeBulkRequest(
10101010
bulkRequest.numberOfActions(),
10111011
bulkRequest.requests(),
1012+
indexReq -> {},
10121013
failureHandler,
10131014
completionHandler,
1014-
indexReq -> {},
10151015
Names.WRITE
10161016
);
10171017
verify(failureHandler, times(1)).accept(
@@ -1045,9 +1045,9 @@ public void testExecuteSuccess() {
10451045
ingestService.executeBulkRequest(
10461046
1,
10471047
Collections.singletonList(indexRequest),
1048+
indexReq -> {},
10481049
failureHandler,
10491050
completionHandler,
1050-
indexReq -> {},
10511051
Names.WRITE
10521052
);
10531053
verify(failureHandler, never()).accept(any(), any());
@@ -1085,9 +1085,9 @@ public void testDynamicTemplates() throws Exception {
10851085
ingestService.executeBulkRequest(
10861086
1,
10871087
Collections.singletonList(indexRequest),
1088+
indexReq -> {},
10881089
failureHandler,
10891090
completionHandler,
1090-
indexReq -> {},
10911091
Names.WRITE
10921092
);
10931093
latch.await();
@@ -1113,9 +1113,9 @@ public void testExecuteEmptyPipeline() throws Exception {
11131113
ingestService.executeBulkRequest(
11141114
1,
11151115
Collections.singletonList(indexRequest),
1116+
indexReq -> {},
11161117
failureHandler,
11171118
completionHandler,
1118-
indexReq -> {},
11191119
Names.WRITE
11201120
);
11211121
verify(failureHandler, never()).accept(any(), any());
@@ -1176,9 +1176,9 @@ public void testExecutePropagateAllMetadataUpdates() throws Exception {
11761176
ingestService.executeBulkRequest(
11771177
1,
11781178
Collections.singletonList(indexRequest),
1179+
indexReq -> {},
11791180
failureHandler,
11801181
completionHandler,
1181-
indexReq -> {},
11821182
Names.WRITE
11831183
);
11841184
verify(processor).execute(any(), any());
@@ -1220,9 +1220,9 @@ public void testExecuteFailure() throws Exception {
12201220
ingestService.executeBulkRequest(
12211221
1,
12221222
Collections.singletonList(indexRequest),
1223+
indexReq -> {},
12231224
failureHandler,
12241225
completionHandler,
1225-
indexReq -> {},
12261226
Names.WRITE
12271227
);
12281228
verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
@@ -1278,9 +1278,9 @@ public void testExecuteSuccessWithOnFailure() throws Exception {
12781278
ingestService.executeBulkRequest(
12791279
1,
12801280
Collections.singletonList(indexRequest),
1281+
indexReq -> {},
12811282
failureHandler,
12821283
completionHandler,
1283-
indexReq -> {},
12841284
Names.WRITE
12851285
);
12861286
verify(failureHandler, never()).accept(eq(0), any(IngestProcessorException.class));
@@ -1330,9 +1330,9 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception {
13301330
ingestService.executeBulkRequest(
13311331
1,
13321332
Collections.singletonList(indexRequest),
1333+
indexReq -> {},
13331334
failureHandler,
13341335
completionHandler,
1335-
indexReq -> {},
13361336
Names.WRITE
13371337
);
13381338
verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
@@ -1393,9 +1393,9 @@ public void testBulkRequestExecutionWithFailures() throws Exception {
13931393
ingestService.executeBulkRequest(
13941394
numRequest,
13951395
bulkRequest.requests(),
1396+
indexReq -> {},
13961397
requestItemErrorHandler,
13971398
completionHandler,
1398-
indexReq -> {},
13991399
Names.WRITE
14001400
);
14011401

@@ -1447,9 +1447,9 @@ public void testBulkRequestExecution() throws Exception {
14471447
ingestService.executeBulkRequest(
14481448
numRequest,
14491449
bulkRequest.requests(),
1450+
indexReq -> {},
14501451
requestItemErrorHandler,
14511452
completionHandler,
1452-
indexReq -> {},
14531453
Names.WRITE
14541454
);
14551455

@@ -1517,9 +1517,9 @@ public void testStats() throws Exception {
15171517
ingestService.executeBulkRequest(
15181518
1,
15191519
Collections.singletonList(indexRequest),
1520+
indexReq -> {},
15201521
failureHandler,
15211522
completionHandler,
1522-
indexReq -> {},
15231523
Names.WRITE
15241524
);
15251525
final IngestStats afterFirstRequestStats = ingestService.stats();
@@ -1541,9 +1541,9 @@ public void testStats() throws Exception {
15411541
ingestService.executeBulkRequest(
15421542
1,
15431543
Collections.singletonList(indexRequest),
1544+
indexReq -> {},
15441545
failureHandler,
15451546
completionHandler,
1546-
indexReq -> {},
15471547
Names.WRITE
15481548
);
15491549
final IngestStats afterSecondRequestStats = ingestService.stats();
@@ -1570,9 +1570,9 @@ public void testStats() throws Exception {
15701570
ingestService.executeBulkRequest(
15711571
1,
15721572
Collections.singletonList(indexRequest),
1573+
indexReq -> {},
15731574
failureHandler,
15741575
completionHandler,
1575-
indexReq -> {},
15761576
Names.WRITE
15771577
);
15781578
final IngestStats afterThirdRequestStats = ingestService.stats();
@@ -1600,9 +1600,9 @@ public void testStats() throws Exception {
16001600
ingestService.executeBulkRequest(
16011601
1,
16021602
Collections.singletonList(indexRequest),
1603+
indexReq -> {},
16031604
failureHandler,
16041605
completionHandler,
1605-
indexReq -> {},
16061606
Names.WRITE
16071607
);
16081608
final IngestStats afterForthRequestStats = ingestService.stats();
@@ -1698,9 +1698,9 @@ public String getDescription() {
16981698
ingestService.executeBulkRequest(
16991699
bulkRequest.numberOfActions(),
17001700
bulkRequest.requests(),
1701+
dropHandler,
17011702
failureHandler,
17021703
completionHandler,
1703-
dropHandler,
17041704
Names.WRITE
17051705
);
17061706
verify(failureHandler, never()).accept(any(), any());
@@ -1784,9 +1784,9 @@ public void testCBORParsing() throws Exception {
17841784
ingestService.executeBulkRequest(
17851785
1,
17861786
Collections.singletonList(indexRequest),
1787+
indexReq -> {},
17871788
(integer, e) -> {},
17881789
(thread, e) -> {},
1789-
indexReq -> {},
17901790
Names.WRITE
17911791
);
17921792
}
@@ -1817,7 +1817,7 @@ public void testPostIngest() {
18171817
bulkRequest.add(indexRequest1);
18181818
bulkRequest.add(indexRequest2);
18191819

1820-
ingestService.executeBulkRequest(2, bulkRequest.requests(), (integer, e) -> {}, (thread, e) -> {}, indexReq -> {}, Names.WRITE);
1820+
ingestService.executeBulkRequest(2, bulkRequest.requests(), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE);
18211821

18221822
assertThat(indexRequest1.getRawTimestamp(), equalTo(10));
18231823
assertThat(indexRequest2.getRawTimestamp(), nullValue());

0 commit comments

Comments
 (0)