Skip to content

Commit 76cb3e3

Browse files
author
Hendrik Muhs
authored
[7.15][Transform][Rollup] remove unnecessary list indirection (#75459) (#75829)
Remove an unnecessary indirection and refactor progress tracking. Both rollup and transform process documents as stream, however in the AsyncTwoPhaseIndexer takes a List of index requests. This change removes the unnecessary temporary container and makes upcoming transform enhancements easier.
1 parent 106aff2 commit 76cb3e3

File tree

16 files changed

+148
-143
lines changed

16 files changed

+148
-143
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,12 @@
1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.action.bulk.BulkRequest;
1414
import org.elasticsearch.action.bulk.BulkResponse;
15-
import org.elasticsearch.action.index.IndexRequest;
1615
import org.elasticsearch.action.search.SearchResponse;
17-
import org.elasticsearch.core.TimeValue;
1816
import org.elasticsearch.common.util.concurrent.RunOnce;
17+
import org.elasticsearch.core.TimeValue;
1918
import org.elasticsearch.threadpool.Scheduler;
2019
import org.elasticsearch.threadpool.ThreadPool;
2120

22-
import java.util.List;
2321
import java.util.concurrent.TimeUnit;
2422
import java.util.concurrent.atomic.AtomicBoolean;
2523
import java.util.concurrent.atomic.AtomicReference;
@@ -487,13 +485,12 @@ private void onSearchResponse(SearchResponse searchResponse) {
487485
return;
488486
}
489487

490-
final List<IndexRequest> docs = iterationResult.getToIndex();
488+
final BulkRequest bulkRequest = new BulkRequest();
489+
iterationResult.getToIndex().forEach(bulkRequest::add);
490+
stats.markEndProcessing();
491491

492492
// an iteration result might return an empty set of documents to be indexed
493-
if (docs.isEmpty() == false) {
494-
final BulkRequest bulkRequest = new BulkRequest();
495-
docs.forEach(bulkRequest::add);
496-
stats.markEndProcessing();
493+
if (bulkRequest.numberOfActions() > 0) {
497494
stats.markStartIndexing();
498495
doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> {
499496
// TODO we should check items in the response and move after accordingly to
@@ -512,7 +509,6 @@ private void onSearchResponse(SearchResponse searchResponse) {
512509
onBulkResponse(bulkResponse, newPosition);
513510
}, this::finishWithIndexingFailure));
514511
} else {
515-
stats.markEndProcessing();
516512
// no documents need to be indexed, continue with search
517513
try {
518514
JobPosition newPosition = iterationResult.getPosition();

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/IterationResult.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
import org.elasticsearch.action.index.IndexRequest;
1111

12-
import java.util.List;
12+
import java.util.stream.Stream;
1313

1414
/**
1515
* Result object to hold the result of 1 iteration of iterative indexing.
@@ -19,18 +19,18 @@ public class IterationResult<JobPosition> {
1919

2020
private final boolean isDone;
2121
private final JobPosition position;
22-
private final List<IndexRequest> toIndex;
22+
private final Stream<IndexRequest> toIndex;
2323

2424
/**
2525
* Constructor for the result of 1 iteration.
2626
*
27-
* @param toIndex the list of requests to be indexed
27+
* @param toIndex the stream of requests to be indexed
2828
* @param position the extracted, persistable position of the job required for the search phase
2929
* @param isDone true if source is exhausted and job should go to sleep
3030
*
3131
* Note: toIndex.empty() != isDone due to possible filtering in the specific implementation
3232
*/
33-
public IterationResult(List<IndexRequest> toIndex, JobPosition position, boolean isDone) {
33+
public IterationResult(Stream<IndexRequest> toIndex, JobPosition position, boolean isDone) {
3434
this.toIndex = toIndex;
3535
this.position = position;
3636
this.isDone = isDone;
@@ -53,11 +53,11 @@ public JobPosition getPosition() {
5353
}
5454

5555
/**
56-
* List of requests to be passed to bulk indexing.
56+
* Stream of requests to be passed to bulk indexing.
5757
*
58-
* @return List of index requests.
58+
* @return Stream of index requests.
5959
*/
60-
public List<IndexRequest> getToIndex() {
60+
public Stream<IndexRequest> getToIndex() {
6161
return toIndex;
6262
}
6363
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
import org.elasticsearch.action.search.SearchResponseSections;
1818
import org.elasticsearch.action.search.ShardSearchFailure;
1919
import org.elasticsearch.common.settings.Settings;
20-
import org.elasticsearch.core.TimeValue;
2120
import org.elasticsearch.common.xcontent.XContentBuilder;
21+
import org.elasticsearch.core.TimeValue;
2222
import org.elasticsearch.search.SearchHit;
2323
import org.elasticsearch.search.SearchHits;
2424
import org.elasticsearch.test.ESTestCase;
@@ -36,6 +36,7 @@
3636
import java.util.concurrent.TimeUnit;
3737
import java.util.concurrent.atomic.AtomicBoolean;
3838
import java.util.concurrent.atomic.AtomicReference;
39+
import java.util.stream.Stream;
3940

4041
import static org.hamcrest.Matchers.equalTo;
4142

@@ -74,7 +75,7 @@ protected IterationResult<Integer> doProcess(SearchResponse searchResponse) {
7475
assertFalse("should not be called as stoppedBeforeFinished is false", stoppedBeforeFinished);
7576
assertThat(step, equalTo(2));
7677
++step;
77-
return new IterationResult<>(Collections.emptyList(), 3, true);
78+
return new IterationResult<>(Stream.empty(), 3, true);
7879
}
7980

8081
private void awaitForLatch() {
@@ -191,13 +192,13 @@ protected IterationResult<Integer> doProcess(SearchResponse searchResponse) {
191192

192193
++processOps;
193194
if (processOps == 5) {
194-
return new IterationResult<>(Collections.singletonList(new IndexRequest()), processOps, true);
195+
return new IterationResult<>(Stream.of(new IndexRequest()), processOps, true);
195196
}
196197
else if (processOps % 2 == 0) {
197-
return new IterationResult<>(Collections.emptyList(), processOps, false);
198+
return new IterationResult<>(Stream.empty(), processOps, false);
198199
}
199200

200-
return new IterationResult<>(Collections.singletonList(new IndexRequest()), processOps, false);
201+
return new IterationResult<>(Stream.of(new IndexRequest()), processOps, false);
201202
}
202203

203204
@Override

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import java.util.List;
2828
import java.util.Map;
2929
import java.util.TreeMap;
30-
import java.util.stream.Collectors;
30+
import java.util.stream.Stream;
3131

3232
/**
3333
* These utilities are used to convert agg responses into a set of rollup documents.
@@ -46,10 +46,10 @@ class IndexerUtils {
4646
* @param groupConfig The grouping configuration for the job
4747
* @param jobId The ID for the job
4848
* @param isUpgradedDocID `true` if this job is using the new ID scheme
49-
* @return A list of rolled documents derived from the response
49+
* @return A stream of rolled documents derived from the response
5050
*/
51-
static List<IndexRequest> processBuckets(CompositeAggregation agg, String rollupIndex, RollupIndexerJobStats stats,
52-
GroupConfig groupConfig, String jobId, boolean isUpgradedDocID) {
51+
static Stream<IndexRequest> processBuckets(CompositeAggregation agg, String rollupIndex, RollupIndexerJobStats stats,
52+
GroupConfig groupConfig, String jobId, boolean isUpgradedDocID) {
5353

5454
logger.debug("Buckets: [" + agg.getBuckets().size() + "][" + jobId + "]");
5555
return agg.getBuckets().stream().map(b ->{
@@ -79,7 +79,7 @@ static List<IndexRequest> processBuckets(CompositeAggregation agg, String rollup
7979
IndexRequest request = new IndexRequest(rollupIndex, RollupField.TYPE_NAME, idGenerator.getID());
8080
request.source(doc);
8181
return request;
82-
}).collect(Collectors.toList());
82+
});
8383
}
8484

8585
private static void processKeys(Map<String, Object> keys, Map<String, Object> doc,

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.Map;
5353
import java.util.concurrent.atomic.AtomicBoolean;
5454
import java.util.concurrent.atomic.AtomicReference;
55+
import java.util.stream.Stream;
5556

5657
import static org.elasticsearch.xpack.core.rollup.RollupField.formatFieldName;
5758

@@ -142,7 +143,7 @@ protected IterationResult<Map<String, Object>> doProcess(SearchResponse searchRe
142143

143144
if (response.getBuckets().isEmpty()) {
144145
// do not reset the position as we want to continue from where we stopped
145-
return new IterationResult<>(Collections.emptyList(), getPosition(), true);
146+
return new IterationResult<>(Stream.empty(), getPosition(), true);
146147
}
147148

148149
return new IterationResult<>(

x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
3434
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
3535
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
36-
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
3736
import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
37+
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
3838
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
3939
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
4040
import org.elasticsearch.xpack.core.rollup.RollupField;
@@ -55,6 +55,7 @@
5555
import java.util.LinkedHashMap;
5656
import java.util.List;
5757
import java.util.Map;
58+
import java.util.stream.Collectors;
5859

5960
import static java.util.Collections.singletonList;
6061
import static org.elasticsearch.xpack.core.rollup.ConfigTestHelpers.randomDateHistogramGroupConfig;
@@ -113,7 +114,8 @@ public void testMissingFields() throws IOException {
113114
directory.close();
114115

115116
final GroupConfig groupConfig = randomGroupConfig(random());
116-
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean());
117+
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean())
118+
.collect(Collectors.toList());
117119

118120
assertThat(docs.size(), equalTo(numDocs));
119121
for (IndexRequest doc : docs) {
@@ -174,7 +176,8 @@ public void testCorrectFields() throws IOException {
174176
directory.close();
175177

176178
final GroupConfig groupConfig = randomGroupConfig(random());
177-
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean());
179+
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean())
180+
.collect(Collectors.toList());
178181

179182
assertThat(docs.size(), equalTo(numDocs));
180183
for (IndexRequest doc : docs) {
@@ -227,7 +230,8 @@ public void testNumericTerms() throws IOException {
227230
directory.close();
228231

229232
final GroupConfig groupConfig = randomGroupConfig(random());
230-
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean());
233+
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean())
234+
.collect(Collectors.toList());
231235

232236
assertThat(docs.size(), equalTo(numDocs));
233237
for (IndexRequest doc : docs) {
@@ -287,7 +291,8 @@ public void testEmptyCounts() throws IOException {
287291
directory.close();
288292

289293
final GroupConfig groupConfig = randomGroupConfig(random());
290-
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean());
294+
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean())
295+
.collect(Collectors.toList());
291296

292297
assertThat(docs.size(), equalTo(numDocs));
293298
for (IndexRequest doc : docs) {
@@ -339,7 +344,8 @@ public void testKeyOrderingOldID() {
339344
// The content of the config don't actually matter for this test
340345
// because the test is just looking at agg keys
341346
GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), new HistogramGroupConfig(123L, "abc"), null);
342-
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo", false);
347+
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo", false)
348+
.collect(Collectors.toList());
343349
assertThat(docs.size(), equalTo(1));
344350
assertThat(docs.get(0).id(), equalTo("1237859798"));
345351
}
@@ -383,7 +389,8 @@ public void testKeyOrderingNewID() {
383389
});
384390

385391
GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), new HistogramGroupConfig(1L, "abc"), null);
386-
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo", true);
392+
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo", true)
393+
.collect(Collectors.toList());
387394
assertThat(docs.size(), equalTo(1));
388395
assertThat(docs.get(0).id(), equalTo("foo$c9LcrFqeFW92uN_Z7sv1hA"));
389396
}
@@ -433,7 +440,8 @@ public void testKeyOrderingNewIDLong() {
433440
});
434441

435442
GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), new HistogramGroupConfig(1, "abc"), null);
436-
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo", true);
443+
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo", true)
444+
.collect(Collectors.toList());
437445
assertThat(docs.size(), equalTo(1));
438446
assertThat(docs.get(0).id(), equalTo("foo$VAFKZpyaEqYRPLyic57_qw"));
439447
}
@@ -461,7 +469,7 @@ public void testNullKeys() {
461469

462470
GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), randomHistogramGroupConfig(random()), null);
463471
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(),
464-
groupConfig, "foo", randomBoolean());
472+
groupConfig, "foo", randomBoolean()).collect(Collectors.toList());
465473
assertThat(docs.size(), equalTo(1));
466474
assertFalse(Strings.isNullOrEmpty(docs.get(0).id()));
467475
}
@@ -518,7 +526,8 @@ public void testMissingBuckets() throws IOException {
518526
directory.close();
519527

520528
final GroupConfig groupConfig = randomGroupConfig(random());
521-
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean());
529+
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean())
530+
.collect(Collectors.toList());
522531

523532
assertThat(docs.size(), equalTo(6));
524533
for (IndexRequest doc : docs) {
@@ -589,7 +598,8 @@ public void testTimezone() throws IOException {
589598
directory.close();
590599

591600
final GroupConfig groupConfig = randomGroupConfig(random());
592-
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean());
601+
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean())
602+
.collect(Collectors.toList());
593603

594604
assertThat(docs.size(), equalTo(2));
595605

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,13 +209,15 @@ void preview(
209209
* @param destinationPipeline the destination pipeline
210210
* @param fieldMappings field mappings for the destination
211211
* @param stats a stats object to record/collect stats
212+
* @param progress a progress object to record/collect progress information
212213
* @return a tuple with the stream of index requests and the cursor
213214
*/
214215
Tuple<Stream<IndexRequest>, Map<String, Object>> processSearchResponse(
215216
SearchResponse searchResponse,
216217
String destinationIndex,
217218
String destinationPipeline,
218219
Map<String, String> fieldMappings,
219-
TransformIndexerStats stats
220+
TransformIndexerStats stats,
221+
TransformProgress progress
220222
);
221223
}

0 commit comments

Comments
 (0)