Skip to content

Commit 0bc58d2

Browse files
committed
Add a nested pipeline, too
1 parent c33063e commit 0bc58d2

File tree

1 file changed

+35
-3
lines changed

1 file changed

+35
-3
lines changed

server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.elasticsearch.script.ScriptModule;
5757
import org.elasticsearch.script.ScriptService;
5858
import org.elasticsearch.script.ScriptType;
59+
import org.elasticsearch.script.TemplateScript;
5960
import org.elasticsearch.test.ESTestCase;
6061
import org.elasticsearch.test.MockLogAppender;
6162
import org.elasticsearch.threadpool.ThreadPool;
@@ -1403,7 +1404,24 @@ public void testIngestAndPipelineStats() throws Exception {
14031404
return null;
14041405
}).when(processor).execute(any(IngestDocument.class), any());
14051406

1406-
IngestService ingestService = createWithProcessors(Map.of("mock", (factories, tag, description, config) -> processor));
1407+
// mock up an ingest service for returning a pipeline, this is used by the pipeline processor
1408+
final Pipeline[] pipelineToReturn = new Pipeline[1];
1409+
final IngestService pipelineIngestService = mock(IngestService.class);
1410+
when(pipelineIngestService.getPipeline(anyString())).thenAnswer(inv -> pipelineToReturn[0]);
1411+
1412+
IngestService ingestService = createWithProcessors(
1413+
Map.of(
1414+
"pipeline",
1415+
(factories, tag, description, config) -> new PipelineProcessor(tag, description, (params) -> new TemplateScript(params) {
1416+
@Override
1417+
public String execute() {
1418+
return "_id3";
1419+
} // this pipeline processor will always execute the '_id3' processor
1420+
}, false, pipelineIngestService),
1421+
"mock",
1422+
(factories, tag, description, config) -> processor
1423+
)
1424+
);
14071425

14081426
{
14091427
// all zeroes since nothing has executed
@@ -1418,29 +1436,41 @@ public void testIngestAndPipelineStats() throws Exception {
14181436
new BytesArray("{\"processors\": [{\"mock\" : {}}]}"),
14191437
XContentType.JSON
14201438
);
1439+
// n.b. this 'pipeline' processor will always run the '_id3' pipeline, see the mocking/plumbing above and below
14211440
PutPipelineRequest putRequest2 = new PutPipelineRequest(
14221441
"_id2",
1442+
new BytesArray("{\"processors\": [{\"pipeline\" : {}}]}"),
1443+
XContentType.JSON
1444+
);
1445+
PutPipelineRequest putRequest3 = new PutPipelineRequest(
1446+
"_id3",
14231447
new BytesArray("{\"processors\": [{\"mock\" : {}}]}"),
14241448
XContentType.JSON
14251449
);
14261450
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
14271451
ClusterState previousClusterState = clusterState;
14281452
clusterState = executePut(putRequest1, clusterState);
14291453
clusterState = executePut(putRequest2, clusterState);
1454+
clusterState = executePut(putRequest3, clusterState);
14301455
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
14311456

1457+
// hook up the mock ingest service to return pipeline3 when asked by the pipeline processor
1458+
pipelineToReturn[0] = ingestService.getPipeline("_id3");
1459+
14321460
{
14331461
final IngestStats ingestStats = ingestService.stats();
1434-
assertThat(ingestStats.getPipelineStats().size(), equalTo(2));
1462+
assertThat(ingestStats.getPipelineStats().size(), equalTo(3));
14351463

14361464
// total
14371465
assertStats(ingestStats.getTotalStats(), 0, 0, 0);
14381466
// pipeline
14391467
assertPipelineStats(ingestStats.getPipelineStats(), "_id1", 0, 0, 0);
14401468
assertPipelineStats(ingestStats.getPipelineStats(), "_id2", 0, 0, 0);
1469+
assertPipelineStats(ingestStats.getPipelineStats(), "_id3", 0, 0, 0);
14411470
// processor
14421471
assertProcessorStats(0, ingestStats, "_id1", 0, 0, 0);
14431472
assertProcessorStats(0, ingestStats, "_id2", 0, 0, 0);
1473+
assertProcessorStats(0, ingestStats, "_id3", 0, 0, 0);
14441474
}
14451475

14461476
// put a single document through ingest processing
@@ -1451,17 +1481,19 @@ public void testIngestAndPipelineStats() throws Exception {
14511481

14521482
{
14531483
final IngestStats ingestStats = ingestService.stats();
1454-
assertThat(ingestStats.getPipelineStats().size(), equalTo(2));
1484+
assertThat(ingestStats.getPipelineStats().size(), equalTo(3));
14551485

14561486
// total
14571487
// see https://github.com/elastic/elasticsearch/issues/92843 -- this should be 1, but it's actually 2
14581488
// assertStats(ingestStats.getTotalStats(), 1, 0, 0);
14591489
// pipeline
14601490
assertPipelineStats(ingestStats.getPipelineStats(), "_id1", 1, 0, 0);
14611491
assertPipelineStats(ingestStats.getPipelineStats(), "_id2", 1, 0, 0);
1492+
assertPipelineStats(ingestStats.getPipelineStats(), "_id3", 1, 0, 0);
14621493
// processor
14631494
assertProcessorStats(0, ingestStats, "_id1", 1, 0, 0);
14641495
assertProcessorStats(0, ingestStats, "_id2", 1, 0, 0);
1496+
assertProcessorStats(0, ingestStats, "_id3", 1, 0, 0);
14651497
}
14661498
}
14671499

0 commit comments

Comments
 (0)