Skip to content

Commit 21c8a56

Browse files
authored
Add an IngestService stats test (#93120)
1 parent 69914bf commit 21c8a56

File tree

2 files changed

+111
-5
lines changed

2 files changed

+111
-5
lines changed
Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@
55
* in compliance with, at your election, the Elastic License 2.0 or the Server
66
* Side Public License, v 1.
77
*/
8-
package org.elasticsearch.action.ingest;
8+
package org.elasticsearch.ingest;
99

1010
import org.elasticsearch.action.bulk.BulkRequest;
1111
import org.elasticsearch.action.bulk.BulkResponse;
1212
import org.elasticsearch.action.get.GetRequest;
1313
import org.elasticsearch.action.get.GetResponse;
1414
import org.elasticsearch.action.index.IndexRequest;
15+
import org.elasticsearch.action.ingest.PutPipelineRequest;
1516
import org.elasticsearch.client.internal.Client;
1617
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1718
import org.elasticsearch.cluster.routing.allocation.AllocationService;
@@ -21,9 +22,6 @@
2122
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2223
import org.elasticsearch.env.Environment;
2324
import org.elasticsearch.env.NodeEnvironment;
24-
import org.elasticsearch.ingest.AbstractProcessor;
25-
import org.elasticsearch.ingest.IngestDocument;
26-
import org.elasticsearch.ingest.Processor;
2725
import org.elasticsearch.plugins.IngestPlugin;
2826
import org.elasticsearch.plugins.Plugin;
2927
import org.elasticsearch.repositories.RepositoriesService;
@@ -51,7 +49,7 @@
5149
* executes asynchronously. The result of the operation should be the same and also the order in which the
5250
* bulk responses are returned should be the same as how the corresponding index requests were defined.
5351
*/
54-
public class AsyncIngestProcessorIT extends ESSingleNodeTestCase {
52+
public class IngestAsyncProcessorIT extends ESSingleNodeTestCase {
5553

5654
@Override
5755
protected Collection<Class<? extends Plugin>> getPlugins() {

server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.elasticsearch.script.ScriptModule;
5757
import org.elasticsearch.script.ScriptService;
5858
import org.elasticsearch.script.ScriptType;
59+
import org.elasticsearch.script.TemplateScript;
5960
import org.elasticsearch.test.ESTestCase;
6061
import org.elasticsearch.test.MockLogAppender;
6162
import org.elasticsearch.threadpool.ThreadPool;
@@ -1389,6 +1390,113 @@ public void testBulkRequestExecution() throws Exception {
13891390
}
13901391
}
13911392

1393+
public void testIngestAndPipelineStats() throws Exception {
1394+
final Processor processor = mock(Processor.class);
1395+
when(processor.getType()).thenReturn("mock");
1396+
when(processor.getTag()).thenReturn("mockTag");
1397+
when(processor.isAsync()).thenReturn(true);
1398+
1399+
// avoid returning null and dropping the document
1400+
doAnswer(args -> {
1401+
@SuppressWarnings("unchecked")
1402+
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
1403+
handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null);
1404+
return null;
1405+
}).when(processor).execute(any(IngestDocument.class), any());
1406+
1407+
// mock up an ingest service for returning a pipeline, this is used by the pipeline processor
1408+
final Pipeline[] pipelineToReturn = new Pipeline[1];
1409+
final IngestService pipelineIngestService = mock(IngestService.class);
1410+
when(pipelineIngestService.getPipeline(anyString())).thenAnswer(inv -> pipelineToReturn[0]);
1411+
1412+
IngestService ingestService = createWithProcessors(
1413+
Map.of(
1414+
"pipeline",
1415+
(factories, tag, description, config) -> new PipelineProcessor(tag, description, (params) -> new TemplateScript(params) {
1416+
@Override
1417+
public String execute() {
1418+
return "_id3";
1419+
} // this pipeline processor will always execute the '_id3' processor
1420+
}, false, pipelineIngestService),
1421+
"mock",
1422+
(factories, tag, description, config) -> processor
1423+
)
1424+
);
1425+
1426+
{
1427+
// all zeroes since nothing has executed
1428+
final IngestStats ingestStats = ingestService.stats();
1429+
assertThat(ingestStats.getPipelineStats().size(), equalTo(0));
1430+
assertStats(ingestStats.getTotalStats(), 0, 0, 0);
1431+
}
1432+
1433+
// put some pipelines, and now there are pipeline and processor stats, too
1434+
PutPipelineRequest putRequest1 = new PutPipelineRequest(
1435+
"_id1",
1436+
new BytesArray("{\"processors\": [{\"mock\" : {}}]}"),
1437+
XContentType.JSON
1438+
);
1439+
// n.b. this 'pipeline' processor will always run the '_id3' pipeline, see the mocking/plumbing above and below
1440+
PutPipelineRequest putRequest2 = new PutPipelineRequest(
1441+
"_id2",
1442+
new BytesArray("{\"processors\": [{\"pipeline\" : {}}]}"),
1443+
XContentType.JSON
1444+
);
1445+
PutPipelineRequest putRequest3 = new PutPipelineRequest(
1446+
"_id3",
1447+
new BytesArray("{\"processors\": [{\"mock\" : {}}]}"),
1448+
XContentType.JSON
1449+
);
1450+
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
1451+
ClusterState previousClusterState = clusterState;
1452+
clusterState = executePut(putRequest1, clusterState);
1453+
clusterState = executePut(putRequest2, clusterState);
1454+
clusterState = executePut(putRequest3, clusterState);
1455+
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
1456+
1457+
// hook up the mock ingest service to return pipeline3 when asked by the pipeline processor
1458+
pipelineToReturn[0] = ingestService.getPipeline("_id3");
1459+
1460+
{
1461+
final IngestStats ingestStats = ingestService.stats();
1462+
assertThat(ingestStats.getPipelineStats().size(), equalTo(3));
1463+
1464+
// total
1465+
assertStats(ingestStats.getTotalStats(), 0, 0, 0);
1466+
// pipeline
1467+
assertPipelineStats(ingestStats.getPipelineStats(), "_id1", 0, 0, 0);
1468+
assertPipelineStats(ingestStats.getPipelineStats(), "_id2", 0, 0, 0);
1469+
assertPipelineStats(ingestStats.getPipelineStats(), "_id3", 0, 0, 0);
1470+
// processor
1471+
assertProcessorStats(0, ingestStats, "_id1", 0, 0, 0);
1472+
assertProcessorStats(0, ingestStats, "_id2", 0, 0, 0);
1473+
assertProcessorStats(0, ingestStats, "_id3", 0, 0, 0);
1474+
}
1475+
1476+
// put a single document through ingest processing
1477+
final IndexRequest indexRequest = new IndexRequest("_index");
1478+
indexRequest.setPipeline("_id1").setFinalPipeline("_id2");
1479+
indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10));
1480+
ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE);
1481+
1482+
{
1483+
final IngestStats ingestStats = ingestService.stats();
1484+
assertThat(ingestStats.getPipelineStats().size(), equalTo(3));
1485+
1486+
// total
1487+
// see https://github.com/elastic/elasticsearch/issues/92843 -- this should be 1, but it's actually 2
1488+
// assertStats(ingestStats.getTotalStats(), 1, 0, 0);
1489+
// pipeline
1490+
assertPipelineStats(ingestStats.getPipelineStats(), "_id1", 1, 0, 0);
1491+
assertPipelineStats(ingestStats.getPipelineStats(), "_id2", 1, 0, 0);
1492+
assertPipelineStats(ingestStats.getPipelineStats(), "_id3", 1, 0, 0);
1493+
// processor
1494+
assertProcessorStats(0, ingestStats, "_id1", 1, 0, 0);
1495+
assertProcessorStats(0, ingestStats, "_id2", 1, 0, 0);
1496+
assertProcessorStats(0, ingestStats, "_id3", 1, 0, 0);
1497+
}
1498+
}
1499+
13921500
public void testStats() throws Exception {
13931501
final Processor processor = mock(Processor.class);
13941502
final Processor processorFailure = mock(Processor.class);

0 commit comments

Comments
 (0)