Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.action.ingest;
package org.elasticsearch.ingest;

import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
Expand All @@ -21,9 +22,6 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -51,7 +49,7 @@
* executes asynchronously. The result of the operation should be the same and also the order in which the
* bulk responses are returned should be the same as how the corresponding index requests were defined.
*/
public class AsyncIngestProcessorIT extends ESSingleNodeTestCase {
public class IngestAsyncProcessorIT extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,83 @@ public void testBulkRequestExecution() throws Exception {
}
}

public void testIngestAndPipelineStats() throws Exception {
final Processor processor = mock(Processor.class);
final Processor processorFailure = mock(Processor.class);
when(processor.getType()).thenReturn("mock");
when(processor.getTag()).thenReturn("mockTag");
when(processor.isAsync()).thenReturn(true);

// avoid returning null and dropping the document
doAnswer(args -> {
@SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null);
return null;
}).when(processor).execute(any(IngestDocument.class), any());

IngestService ingestService = createWithProcessors(Map.of("mock", (factories, tag, description, config) -> processor));

{
// all zeroes since nothing has executed
final IngestStats ingestStats = ingestService.stats();
assertThat(ingestStats.getPipelineStats().size(), equalTo(0));
assertStats(ingestStats.getTotalStats(), 0, 0, 0);
}

// put some pipelines, and now there are pipeline and processor stats, too
PutPipelineRequest putRequest1 = new PutPipelineRequest(
"_id1",
new BytesArray("{\"processors\": [{\"mock\" : {}}]}"),
XContentType.JSON
);
PutPipelineRequest putRequest2 = new PutPipelineRequest(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth including a nested pipeline here? I know you and I have discussed that previously, but I'm not sure whether it actually makes the counters any more complex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take a time-boxed swing at it, I do think it's worth adding (assuming it doesn't blow up the size or complexity of the test extraordinarily).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added via 0bc58d2. It's a bit janky, because there's a chicken and egg problem of the mock ingest service needing the processor factory, but the factory needing an ingest service.

I'm game to take a swing at golfing it, though, if you'd like to join up for 10-15 minutes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks reasonable enough to me.

"_id2",
new BytesArray("{\"processors\": [{\"mock\" : {}}]}"),
XContentType.JSON
);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
ClusterState previousClusterState = clusterState;
clusterState = executePut(putRequest1, clusterState);
clusterState = executePut(putRequest2, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));

{
final IngestStats ingestStats = ingestService.stats();
assertThat(ingestStats.getPipelineStats().size(), equalTo(2));

// total
assertStats(ingestStats.getTotalStats(), 0, 0, 0);
// pipeline
assertPipelineStats(ingestStats.getPipelineStats(), "_id1", 0, 0, 0);
assertPipelineStats(ingestStats.getPipelineStats(), "_id2", 0, 0, 0);
// processor
assertProcessorStats(0, ingestStats, "_id1", 0, 0, 0);
assertProcessorStats(0, ingestStats, "_id2", 0, 0, 0);
}

// put a single document through ingest processing
final IndexRequest indexRequest = new IndexRequest("_index");
indexRequest.setPipeline("_id1").setFinalPipeline("_id2");
indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10));
ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE);

{
final IngestStats ingestStats = ingestService.stats();
assertThat(ingestStats.getPipelineStats().size(), equalTo(2));

// total
// see https://github.com/elastic/elasticsearch/issues/92843 -- this should be 1, but it's actually 2
// assertStats(ingestStats.getTotalStats(), 1, 0, 0);
// pipeline
assertPipelineStats(ingestStats.getPipelineStats(), "_id1", 1, 0, 0);
assertPipelineStats(ingestStats.getPipelineStats(), "_id2", 1, 0, 0);
// processor
assertProcessorStats(0, ingestStats, "_id1", 1, 0, 0);
assertProcessorStats(0, ingestStats, "_id2", 1, 0, 0);
}
}

public void testStats() throws Exception {
final Processor processor = mock(Processor.class);
final Processor processorFailure = mock(Processor.class);
Expand Down