Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,7 @@ private void processBulkIndexIngestRequest(
ingestService.executeBulkRequest(
original.numberOfActions(),
() -> bulkRequestModifier,
bulkRequestModifier::markItemAsDropped,
bulkRequestModifier::markItemAsFailed,
(originalThread, exception) -> {
if (exception != null) {
Expand Down Expand Up @@ -823,7 +824,6 @@ public boolean isForceExecution() {
}
}
},
bulkRequestModifier::markItemAsDropped,
executorName
);
}
Expand Down
23 changes: 12 additions & 11 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -668,13 +668,14 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, String pipelin
}

public void executeBulkRequest(
int numberOfActionRequests,
Iterable<DocWriteRequest<?>> actionRequests,
BiConsumer<Integer, Exception> onFailure,
BiConsumer<Thread, Exception> onCompletion,
IntConsumer onDropped,
String executorName
final int numberOfActionRequests,
final Iterable<DocWriteRequest<?>> actionRequests,
final IntConsumer onDropped,
final BiConsumer<Integer, Exception> onFailure,
final BiConsumer<Thread, Exception> onCompletion,
final String executorName
) {
assert numberOfActionRequests > 0 : "numberOfActionRequests must be greater than 0 but was [" + numberOfActionRequests + "]";

threadPool.executor(executorName).execute(new AbstractRunnable() {

Expand Down Expand Up @@ -886,11 +887,11 @@ static String getProcessorName(Processor processor) {
}

private void innerExecute(
int slot,
IndexRequest indexRequest,
Pipeline pipeline,
IntConsumer itemDroppedHandler,
Consumer<Exception> handler
final int slot,
final IndexRequest indexRequest,
final Pipeline pipeline,
final IntConsumer itemDroppedHandler,
final Consumer<Exception> handler
) {
if (pipeline.getProcessors().isEmpty()) {
handler.accept(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,9 @@ public void testIngestLocal() throws Exception {
verify(ingestService).executeBulkRequest(
eq(bulkRequest.numberOfActions()),
bulkDocsItr.capture(),
any(),
failureHandler.capture(),
completionHandler.capture(),
any(),
eq(Names.WRITE)
);
completionHandler.getValue().accept(null, exception);
Expand Down Expand Up @@ -332,9 +332,9 @@ public void testSingleItemBulkActionIngestLocal() throws Exception {
verify(ingestService).executeBulkRequest(
eq(1),
bulkDocsItr.capture(),
any(),
failureHandler.capture(),
completionHandler.capture(),
any(),
eq(Names.WRITE)
);
completionHandler.getValue().accept(null, exception);
Expand Down Expand Up @@ -378,9 +378,9 @@ public void testIngestSystemLocal() throws Exception {
verify(ingestService).executeBulkRequest(
eq(bulkRequest.numberOfActions()),
bulkDocsItr.capture(),
any(),
failureHandler.capture(),
completionHandler.capture(),
any(),
eq(Names.SYSTEM_WRITE)
);
completionHandler.getValue().accept(null, exception);
Expand Down Expand Up @@ -535,9 +535,9 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa
verify(ingestService).executeBulkRequest(
eq(bulkRequest.numberOfActions()),
bulkDocsItr.capture(),
any(),
failureHandler.capture(),
completionHandler.capture(),
any(),
eq(Names.WRITE)
);
assertEquals(indexRequest1.getPipeline(), "default_pipeline");
Expand Down Expand Up @@ -583,9 +583,9 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception {
verify(ingestService).executeBulkRequest(
eq(1),
bulkDocsItr.capture(),
any(),
failureHandler.capture(),
completionHandler.capture(),
any(),
eq(Names.WRITE)
);
completionHandler.getValue().accept(null, exception);
Expand Down Expand Up @@ -677,9 +677,9 @@ public void testFindDefaultPipelineFromTemplateMatch() {
verify(ingestService).executeBulkRequest(
eq(1),
bulkDocsItr.capture(),
any(),
failureHandler.capture(),
completionHandler.capture(),
any(),
eq(Names.WRITE)
);
}
Expand Down Expand Up @@ -721,9 +721,9 @@ public void testFindDefaultPipelineFromV2TemplateMatch() {
verify(ingestService).executeBulkRequest(
eq(1),
bulkDocsItr.capture(),
any(),
failureHandler.capture(),
completionHandler.capture(),
any(),
eq(Names.WRITE)
);
}
Expand Down Expand Up @@ -751,9 +751,9 @@ public void testIngestCallbackExceptionHandled() throws Exception {
verify(ingestService).executeBulkRequest(
eq(bulkRequest.numberOfActions()),
bulkDocsItr.capture(),
any(),
failureHandler.capture(),
completionHandler.capture(),
any(),
eq(Names.WRITE)
);
indexRequest1.autoGenerateId();
Expand Down Expand Up @@ -788,9 +788,9 @@ private void validateDefaultPipeline(IndexRequest indexRequest) {
verify(ingestService).executeBulkRequest(
eq(1),
bulkDocsItr.capture(),
any(),
failureHandler.capture(),
completionHandler.capture(),
any(),
eq(Names.WRITE)
);
assertEquals(indexRequest.getPipeline(), "default_pipeline");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,9 @@ public void testExecuteIndexPipelineDoesNotExist() {
ingestService.executeBulkRequest(
1,
Collections.singletonList(indexRequest),
indexReq -> {},
failureHandler,
completionHandler,
indexReq -> {},
Names.WRITE
);

Expand Down Expand Up @@ -963,9 +963,9 @@ public String getType() {
ingestService.executeBulkRequest(
bulkRequest.numberOfActions(),
bulkRequest.requests(),
indexReq -> {},
failureHandler,
completionHandler,
indexReq -> {},
Names.WRITE
);

Expand Down Expand Up @@ -1009,9 +1009,9 @@ public void testExecuteBulkPipelineDoesNotExist() {
ingestService.executeBulkRequest(
bulkRequest.numberOfActions(),
bulkRequest.requests(),
indexReq -> {},
failureHandler,
completionHandler,
indexReq -> {},
Names.WRITE
);
verify(failureHandler, times(1)).accept(
Expand Down Expand Up @@ -1045,9 +1045,9 @@ public void testExecuteSuccess() {
ingestService.executeBulkRequest(
1,
Collections.singletonList(indexRequest),
indexReq -> {},
failureHandler,
completionHandler,
indexReq -> {},
Names.WRITE
);
verify(failureHandler, never()).accept(any(), any());
Expand Down Expand Up @@ -1085,9 +1085,9 @@ public void testDynamicTemplates() throws Exception {
ingestService.executeBulkRequest(
1,
Collections.singletonList(indexRequest),
indexReq -> {},
failureHandler,
completionHandler,
indexReq -> {},
Names.WRITE
);
latch.await();
Expand All @@ -1113,9 +1113,9 @@ public void testExecuteEmptyPipeline() throws Exception {
ingestService.executeBulkRequest(
1,
Collections.singletonList(indexRequest),
indexReq -> {},
failureHandler,
completionHandler,
indexReq -> {},
Names.WRITE
);
verify(failureHandler, never()).accept(any(), any());
Expand Down Expand Up @@ -1176,9 +1176,9 @@ public void testExecutePropagateAllMetadataUpdates() throws Exception {
ingestService.executeBulkRequest(
1,
Collections.singletonList(indexRequest),
indexReq -> {},
failureHandler,
completionHandler,
indexReq -> {},
Names.WRITE
);
verify(processor).execute(any(), any());
Expand Down Expand Up @@ -1220,9 +1220,9 @@ public void testExecuteFailure() throws Exception {
ingestService.executeBulkRequest(
1,
Collections.singletonList(indexRequest),
indexReq -> {},
failureHandler,
completionHandler,
indexReq -> {},
Names.WRITE
);
verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
Expand Down Expand Up @@ -1278,9 +1278,9 @@ public void testExecuteSuccessWithOnFailure() throws Exception {
ingestService.executeBulkRequest(
1,
Collections.singletonList(indexRequest),
indexReq -> {},
failureHandler,
completionHandler,
indexReq -> {},
Names.WRITE
);
verify(failureHandler, never()).accept(eq(0), any(IngestProcessorException.class));
Expand Down Expand Up @@ -1330,9 +1330,9 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception {
ingestService.executeBulkRequest(
1,
Collections.singletonList(indexRequest),
indexReq -> {},
failureHandler,
completionHandler,
indexReq -> {},
Names.WRITE
);
verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
Expand Down Expand Up @@ -1393,9 +1393,9 @@ public void testBulkRequestExecutionWithFailures() throws Exception {
ingestService.executeBulkRequest(
numRequest,
bulkRequest.requests(),
indexReq -> {},
requestItemErrorHandler,
completionHandler,
indexReq -> {},
Names.WRITE
);

Expand Down Expand Up @@ -1447,9 +1447,9 @@ public void testBulkRequestExecution() throws Exception {
ingestService.executeBulkRequest(
numRequest,
bulkRequest.requests(),
indexReq -> {},
requestItemErrorHandler,
completionHandler,
indexReq -> {},
Names.WRITE
);

Expand Down Expand Up @@ -1517,9 +1517,9 @@ public void testStats() throws Exception {
ingestService.executeBulkRequest(
1,
Collections.singletonList(indexRequest),
indexReq -> {},
failureHandler,
completionHandler,
indexReq -> {},
Names.WRITE
);
final IngestStats afterFirstRequestStats = ingestService.stats();
Expand All @@ -1541,9 +1541,9 @@ public void testStats() throws Exception {
ingestService.executeBulkRequest(
1,
Collections.singletonList(indexRequest),
indexReq -> {},
failureHandler,
completionHandler,
indexReq -> {},
Names.WRITE
);
final IngestStats afterSecondRequestStats = ingestService.stats();
Expand All @@ -1570,9 +1570,9 @@ public void testStats() throws Exception {
ingestService.executeBulkRequest(
1,
Collections.singletonList(indexRequest),
indexReq -> {},
failureHandler,
completionHandler,
indexReq -> {},
Names.WRITE
);
final IngestStats afterThirdRequestStats = ingestService.stats();
Expand Down Expand Up @@ -1600,9 +1600,9 @@ public void testStats() throws Exception {
ingestService.executeBulkRequest(
1,
Collections.singletonList(indexRequest),
indexReq -> {},
failureHandler,
completionHandler,
indexReq -> {},
Names.WRITE
);
final IngestStats afterForthRequestStats = ingestService.stats();
Expand Down Expand Up @@ -1698,9 +1698,9 @@ public String getDescription() {
ingestService.executeBulkRequest(
bulkRequest.numberOfActions(),
bulkRequest.requests(),
dropHandler,
failureHandler,
completionHandler,
dropHandler,
Names.WRITE
);
verify(failureHandler, never()).accept(any(), any());
Expand Down Expand Up @@ -1784,9 +1784,9 @@ public void testCBORParsing() throws Exception {
ingestService.executeBulkRequest(
1,
Collections.singletonList(indexRequest),
indexReq -> {},
(integer, e) -> {},
(thread, e) -> {},
indexReq -> {},
Names.WRITE
);
}
Expand Down Expand Up @@ -1817,7 +1817,7 @@ public void testPostIngest() {
bulkRequest.add(indexRequest1);
bulkRequest.add(indexRequest2);

ingestService.executeBulkRequest(2, bulkRequest.requests(), (integer, e) -> {}, (thread, e) -> {}, indexReq -> {}, Names.WRITE);
ingestService.executeBulkRequest(2, bulkRequest.requests(), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE);

assertThat(indexRequest1.getRawTimestamp(), equalTo(10));
assertThat(indexRequest2.getRawTimestamp(), nullValue());
Expand Down