diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 9648f74e96b27..190fc5060846a 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -19,18 +19,11 @@ package org.elasticsearch.action.search; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Function; -import java.util.function.IntFunction; -import java.util.function.Supplier; -import java.util.stream.Collectors; +import com.carrotsearch.hppc.IntArrayList; +import com.carrotsearch.hppc.ObjectObjectHashMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.index.Term; import org.apache.lucene.search.CollectionStatistics; import org.apache.lucene.search.FieldDoc; @@ -44,6 +37,8 @@ import org.apache.lucene.search.TotalHits.Relation; import org.apache.lucene.search.grouping.CollapseTopFieldDocs; import org.elasticsearch.common.collect.HppcMaps; +import org.elasticsearch.common.io.stream.DelayableWriteable; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchHit; @@ -67,16 +62,28 @@ import org.elasticsearch.search.suggest.Suggest.Suggestion; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; -import com.carrotsearch.hppc.IntArrayList; -import com.carrotsearch.hppc.ObjectObjectHashMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.IntFunction; +import java.util.function.Supplier; +import java.util.stream.Collectors; public final class SearchPhaseController { + private static final Logger logger = LogManager.getLogger(SearchPhaseController.class); private static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0]; + private final NamedWriteableRegistry namedWriteableRegistry; private final Function requestToAggReduceContextBuilder; - public SearchPhaseController( + public SearchPhaseController(NamedWriteableRegistry namedWriteableRegistry, Function requestToAggReduceContextBuilder) { + this.namedWriteableRegistry = namedWriteableRegistry; this.requestToAggReduceContextBuilder = requestToAggReduceContextBuilder; } @@ -430,7 +437,8 @@ public ReducedQueryPhase reducedQueryPhase(Collection queryResults, - List> bufferedAggs, List bufferedTopDocs, + List> bufferedAggs, + List bufferedTopDocs, TopDocsStats topDocsStats, int numReducePhases, boolean isScrollRequest, InternalAggregation.ReduceContextBuilder aggReduceContextBuilder, boolean performFinalReduce) { @@ -522,7 +530,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection> aggregationsList + List> aggregationsList ) { /* * Parse the aggregations, clearing the list as we go so bits backing @@ -617,8 +625,9 @@ public InternalSearchResponse buildResponse(SearchHits hits) { * iff the buffer is exhausted. */ static final class QueryPhaseResultConsumer extends ArraySearchPhaseResults { + private final NamedWriteableRegistry namedWriteableRegistry; private final SearchShardTarget[] processedShards; - private final Supplier[] aggsBuffer; + private final DelayableWriteable.Serialized[] aggsBuffer; private final TopDocs[] topDocsBuffer; private final boolean hasAggs; private final boolean hasTopDocs; @@ -631,6 +640,8 @@ static final class QueryPhaseResultConsumer extends ArraySearchPhaseResults= 2 if there is more than one expected result"); } @@ -661,7 +674,7 @@ private QueryPhaseResultConsumer(SearchProgressListener progressListener, Search this.processedShards = new SearchShardTarget[expectedResultSize]; // no need to buffer anything if we have less expected results. in this case we don't consume any results ahead of time. @SuppressWarnings("unchecked") - Supplier[] aggsBuffer = new Supplier[hasAggs ? bufferSize : 0]; + DelayableWriteable.Serialized[] aggsBuffer = new DelayableWriteable.Serialized[hasAggs ? bufferSize : 0]; this.aggsBuffer = aggsBuffer; this.topDocsBuffer = new TopDocs[hasTopDocs ? bufferSize : 0]; this.hasTopDocs = hasTopDocs; @@ -684,15 +697,21 @@ public void consumeResult(SearchPhaseResult result) { private synchronized void consumeInternal(QuerySearchResult querySearchResult) { if (querySearchResult.isNull() == false) { if (index == bufferSize) { + InternalAggregations reducedAggs = null; if (hasAggs) { List aggs = new ArrayList<>(aggsBuffer.length); for (int i = 0; i < aggsBuffer.length; i++) { aggs.add(aggsBuffer[i].get()); aggsBuffer[i] = null; // null the buffer so it can be GCed now. } - InternalAggregations reducedAggs = InternalAggregations.topLevelReduce( - aggs, aggReduceContextBuilder.forPartialReduction()); - aggsBuffer[0] = () -> reducedAggs; + reducedAggs = InternalAggregations.topLevelReduce(aggs, aggReduceContextBuilder.forPartialReduction()); + aggsBuffer[0] = DelayableWriteable.referencing(reducedAggs) + .asSerialized(InternalAggregations::new, namedWriteableRegistry); + long previousBufferSize = aggsCurrentBufferSize; + aggsMaxBufferSize = Math.max(aggsMaxBufferSize, aggsCurrentBufferSize); + aggsCurrentBufferSize = aggsBuffer[0].ramBytesUsed(); + logger.trace("aggs partial reduction [{}->{}] max [{}]", + previousBufferSize, aggsCurrentBufferSize, aggsMaxBufferSize); } if (hasTopDocs) { TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer), @@ -705,12 +724,13 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) { index = 1; if (hasAggs || hasTopDocs) { progressListener.notifyPartialReduce(SearchProgressListener.buildSearchShards(processedShards), - topDocsStats.getTotalHits(), hasAggs ? aggsBuffer[0].get() : null, numReducePhases); + topDocsStats.getTotalHits(), reducedAggs, numReducePhases); } } final int i = index++; if (hasAggs) { - aggsBuffer[i] = querySearchResult.consumeAggs(); + aggsBuffer[i] = querySearchResult.consumeAggs().asSerialized(InternalAggregations::new, namedWriteableRegistry); + aggsCurrentBufferSize += aggsBuffer[i].ramBytesUsed(); } if (hasTopDocs) { final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null @@ -723,7 +743,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) { } private synchronized List> getRemainingAggs() { - return hasAggs ? Arrays.asList(aggsBuffer).subList(0, index) : null; + return hasAggs ? Arrays.asList((Supplier[]) aggsBuffer).subList(0, index) : null; } private synchronized List getRemainingTopDocs() { @@ -732,6 +752,8 @@ private synchronized List getRemainingTopDocs() { @Override public ReducedQueryPhase reduce() { + aggsMaxBufferSize = Math.max(aggsMaxBufferSize, aggsCurrentBufferSize); + logger.trace("aggs final reduction [{}] max [{}]", aggsCurrentBufferSize, aggsMaxBufferSize); ReducedQueryPhase reducePhase = controller.reducedQueryPhase(results.asList(), getRemainingAggs(), getRemainingTopDocs(), topDocsStats, numReducePhases, false, aggReduceContextBuilder, performFinalReduce); progressListener.notifyFinalReduce(SearchProgressListener.buildSearchShards(results.asList()), @@ -766,8 +788,8 @@ ArraySearchPhaseResults newSearchPhaseResults(SearchProgressL if (request.getBatchedReduceSize() < numShards) { int topNSize = getTopDocsSize(request); // only use this if there are aggs and if there are more shards than we should reduce at once - return new QueryPhaseResultConsumer(listener, this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs, - trackTotalHitsUpTo, topNSize, aggReduceContextBuilder, request.isFinalReduce()); + return new QueryPhaseResultConsumer(namedWriteableRegistry, listener, this, numShards, request.getBatchedReduceSize(), + hasTopDocs, hasAggs, trackTotalHitsUpTo, topNSize, aggReduceContextBuilder, request.isFinalReduce()); } } return new ArraySearchPhaseResults(numShards) { diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java index b27010017e5ee..201c7edead6de 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java @@ -19,12 +19,14 @@ package org.elasticsearch.common.io.stream; -import java.io.IOException; -import java.util.function.Supplier; - +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.Version; import org.elasticsearch.common.bytes.BytesReference; +import java.io.IOException; +import java.util.function.Supplier; + /** * A holder for {@link Writeable}s that can delays reading the underlying * {@linkplain Writeable} when it is read from a remote node. @@ -43,12 +45,22 @@ public static DelayableWriteable referencing(T referenc * when {@link Supplier#get()} is called. */ public static DelayableWriteable delayed(Writeable.Reader reader, StreamInput in) throws IOException { - return new Delayed<>(reader, in); + return new Serialized<>(reader, in.getVersion(), in.namedWriteableRegistry(), in.readBytesReference()); } private DelayableWriteable() {} - public abstract boolean isDelayed(); + /** + * Returns a {@linkplain DelayableWriteable} that stores its contents + * in serialized form. + */ + public abstract Serialized asSerialized(Writeable.Reader reader, NamedWriteableRegistry registry); + + /** + * {@code true} if the {@linkplain Writeable} is being stored in + * serialized form, {@code false} otherwise. + */ + abstract boolean isSerialized(); private static class Referencing extends DelayableWriteable { private T reference; @@ -59,11 +71,7 @@ private static class Referencing extends DelayableWriteable @Override public void writeTo(StreamOutput out) throws IOException { - try (BytesStreamOutput buffer = new BytesStreamOutput()) { - buffer.setVersion(out.getVersion()); - reference.writeTo(buffer); - out.writeBytesReference(buffer.bytes()); - } + out.writeBytesReference(writeToBuffer(out.getVersion()).bytes()); } @Override @@ -72,27 +80,48 @@ public T get() { } @Override - public boolean isDelayed() { + public Serialized asSerialized(Reader reader, NamedWriteableRegistry registry) { + try { + return new Serialized(reader, Version.CURRENT, registry, writeToBuffer(Version.CURRENT).bytes()); + } catch (IOException e) { + throw new RuntimeException("unexpected error expanding aggregations", e); + } + } + + @Override + boolean isSerialized() { return false; } + + private BytesStreamOutput writeToBuffer(Version version) throws IOException { + try (BytesStreamOutput buffer = new BytesStreamOutput()) { + buffer.setVersion(version); + reference.writeTo(buffer); + return buffer; + } + } } - private static class Delayed extends DelayableWriteable { + /** + * A {@link Writeable} stored in serialized form. + */ + public static class Serialized extends DelayableWriteable implements Accountable { private final Writeable.Reader reader; - private final Version remoteVersion; - private final BytesReference serialized; + private final Version serializedAtVersion; private final NamedWriteableRegistry registry; + private final BytesReference serialized; - Delayed(Writeable.Reader reader, StreamInput in) throws IOException { + Serialized(Writeable.Reader reader, Version serializedAtVersion, + NamedWriteableRegistry registry, BytesReference serialized) throws IOException { this.reader = reader; - remoteVersion = in.getVersion(); - serialized = in.readBytesReference(); - registry = in.namedWriteableRegistry(); + this.serializedAtVersion = serializedAtVersion; + this.registry = registry; + this.serialized = serialized; } @Override public void writeTo(StreamOutput out) throws IOException { - if (out.getVersion() == remoteVersion) { + if (out.getVersion() == serializedAtVersion) { /* * If the version *does* line up we can just copy the bytes * which is good because this is how shard request caching @@ -116,7 +145,7 @@ public T get() { try { try (StreamInput in = registry == null ? serialized.streamInput() : new NamedWriteableAwareStreamInput(serialized.streamInput(), registry)) { - in.setVersion(remoteVersion); + in.setVersion(serializedAtVersion); return reader.read(in); } } catch (IOException e) { @@ -125,8 +154,18 @@ public T get() { } @Override - public boolean isDelayed() { + public Serialized asSerialized(Reader reader, NamedWriteableRegistry registry) { + return this; // We're already serialized + } + + @Override + boolean isSerialized() { return true; } + + @Override + public long ramBytesUsed() { + return serialized.ramBytesUsed() + RamUsageEstimator.NUM_BYTES_OBJECT_REF * 3 + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER; + } } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index ec0e86505bf1e..afb278df9b498 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -570,7 +570,8 @@ protected Node(final Environment initialEnvironment, b.bind(MetadataCreateIndexService.class).toInstance(metadataCreateIndexService); b.bind(SearchService.class).toInstance(searchService); b.bind(SearchTransportService.class).toInstance(searchTransportService); - b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(searchService::aggReduceContextBuilder)); + b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController( + namedWriteableRegistry, searchService::aggReduceContextBuilder)); b.bind(Transport.class).toInstance(transport); b.bind(TransportService.class).toInstance(transportService); b.bind(NetworkService.class).toInstance(networkService); diff --git a/server/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java index a09a81aa54349..aab6f0b32baef 100644 --- a/server/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java @@ -213,6 +213,6 @@ public void run() throws IOException { } private SearchPhaseController searchPhaseController() { - return new SearchPhaseController(request -> InternalAggregationTestCase.emptyReduceContextBuilder()); + return new SearchPhaseController(writableRegistry(), request -> InternalAggregationTestCase.emptyReduceContextBuilder()); } } diff --git a/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java index df9d810302ff1..0cb81b91266d6 100644 --- a/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java @@ -49,7 +49,8 @@ public class FetchSearchPhaseTests extends ESTestCase { public void testShortcutQueryAndFetchOptimization() { - SearchPhaseController controller = new SearchPhaseController(s -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchPhaseController controller = new SearchPhaseController( + writableRegistry(), s -> InternalAggregationTestCase.emptyReduceContextBuilder()); MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1); ArraySearchPhaseResults results = controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 1); boolean hasHits = randomBoolean(); @@ -92,7 +93,8 @@ public void run() { public void testFetchTwoDocument() { MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); - SearchPhaseController controller = new SearchPhaseController(s -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchPhaseController controller = new SearchPhaseController( + writableRegistry(), s -> InternalAggregationTestCase.emptyReduceContextBuilder()); ArraySearchPhaseResults results = controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2); int resultSetSize = randomIntBetween(2, 10); final SearchContextId ctx1 = new SearchContextId(UUIDs.randomBase64UUID(), 123); @@ -151,7 +153,8 @@ public void run() { public void testFailFetchOneDoc() { MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); - SearchPhaseController controller = new SearchPhaseController(s -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchPhaseController controller = new SearchPhaseController( + writableRegistry(), s -> InternalAggregationTestCase.emptyReduceContextBuilder()); ArraySearchPhaseResults results = controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2); int resultSetSize = randomIntBetween(2, 10); @@ -214,7 +217,8 @@ public void testFetchDocsConcurrently() throws InterruptedException { int resultSetSize = randomIntBetween(0, 100); // we use at least 2 hits otherwise this is subject to single shard optimization and we trip an assert... int numHits = randomIntBetween(2, 100); // also numshards --> 1 hit per shard - SearchPhaseController controller = new SearchPhaseController(s -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchPhaseController controller = new SearchPhaseController( + writableRegistry(), s -> InternalAggregationTestCase.emptyReduceContextBuilder()); MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(numHits); ArraySearchPhaseResults results = controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), numHits); @@ -271,7 +275,8 @@ public void run() { public void testExceptionFailsPhase() { MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); - SearchPhaseController controller = new SearchPhaseController(s -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchPhaseController controller = new SearchPhaseController( + writableRegistry(), s -> InternalAggregationTestCase.emptyReduceContextBuilder()); ArraySearchPhaseResults results = controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2); int resultSetSize = randomIntBetween(2, 10); @@ -327,7 +332,8 @@ public void run() { public void testCleanupIrrelevantContexts() { // contexts that are not fetched should be cleaned up MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); - SearchPhaseController controller = new SearchPhaseController(s -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchPhaseController controller = new SearchPhaseController( + writableRegistry(), s -> InternalAggregationTestCase.emptyReduceContextBuilder()); ArraySearchPhaseResults results = controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2); int resultSetSize = 1; diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index 4fed48cc4c0f8..094b6365ab812 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -33,8 +33,10 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.text.Text; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AtomicArray; @@ -42,6 +44,7 @@ import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.AggregationBuilders; @@ -78,6 +81,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; import static org.elasticsearch.action.search.SearchProgressListener.NOOP; @@ -92,10 +96,15 @@ public class SearchPhaseControllerTests extends ESTestCase { private SearchPhaseController searchPhaseController; private List reductions; + @Override + protected NamedWriteableRegistry writableRegistry() { + return new NamedWriteableRegistry(new SearchModule(Settings.EMPTY, emptyList()).getNamedWriteables()); + } + @Before public void setup() { reductions = new CopyOnWriteArrayList<>(); - searchPhaseController = new SearchPhaseController(s -> new InternalAggregation.ReduceContextBuilder() { + searchPhaseController = new SearchPhaseController(writableRegistry(), s -> new InternalAggregation.ReduceContextBuilder() { @Override public ReduceContext forPartialReduction() { reductions.add(false); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java index f8b2d9ccd0a4d..2f0e91b630e32 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -123,7 +123,8 @@ public void sendExecuteQuery(Transport.Connection connection, ShardSearchRequest searchRequest.source().trackTotalHitsUpTo(2); } searchRequest.allowPartialSearchResults(false); - SearchPhaseController controller = new SearchPhaseController(r -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchPhaseController controller = new SearchPhaseController( + writableRegistry(), r -> InternalAggregationTestCase.emptyReduceContextBuilder()); SearchTask task = new SearchTask(0, "n/a", "n/a", "test", null, Collections.emptyMap()); SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, (clusterAlias, node) -> lookup.get(node), diff --git a/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java b/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java index d12a15c1a7207..ed5a7b8d6fb4f 100644 --- a/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java +++ b/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java @@ -117,42 +117,42 @@ public void writeTo(StreamOutput out) throws IOException { public void testRoundTripFromReferencing() throws IOException { Example e = new Example(randomAlphaOfLength(5)); DelayableWriteable original = DelayableWriteable.referencing(e); - assertFalse(original.isDelayed()); + assertFalse(original.isSerialized()); roundTripTestCase(original, Example::new); } public void testRoundTripFromReferencingWithNamedWriteable() throws IOException { NamedHolder n = new NamedHolder(new Example(randomAlphaOfLength(5))); DelayableWriteable original = DelayableWriteable.referencing(n); - assertFalse(original.isDelayed()); + assertFalse(original.isSerialized()); roundTripTestCase(original, NamedHolder::new); } public void testRoundTripFromDelayed() throws IOException { Example e = new Example(randomAlphaOfLength(5)); - DelayableWriteable original = roundTrip(DelayableWriteable.referencing(e), Example::new, Version.CURRENT); - assertTrue(original.isDelayed()); + DelayableWriteable original = DelayableWriteable.referencing(e).asSerialized(Example::new, writableRegistry()); + assertTrue(original.isSerialized()); roundTripTestCase(original, Example::new); } public void testRoundTripFromDelayedWithNamedWriteable() throws IOException { NamedHolder n = new NamedHolder(new Example(randomAlphaOfLength(5))); - DelayableWriteable original = roundTrip(DelayableWriteable.referencing(n), NamedHolder::new, Version.CURRENT); - assertTrue(original.isDelayed()); + DelayableWriteable original = DelayableWriteable.referencing(n).asSerialized(NamedHolder::new, writableRegistry()); + assertTrue(original.isSerialized()); roundTripTestCase(original, NamedHolder::new); } public void testRoundTripFromDelayedFromOldVersion() throws IOException { Example e = new Example(randomAlphaOfLength(5)); DelayableWriteable original = roundTrip(DelayableWriteable.referencing(e), Example::new, randomOldVersion()); - assertTrue(original.isDelayed()); + assertTrue(original.isSerialized()); roundTripTestCase(original, Example::new); } public void testRoundTripFromDelayedFromOldVersionWithNamedWriteable() throws IOException { NamedHolder n = new NamedHolder(new Example(randomAlphaOfLength(5))); DelayableWriteable original = roundTrip(DelayableWriteable.referencing(n), NamedHolder::new, randomOldVersion()); - assertTrue(original.isDelayed()); + assertTrue(original.isSerialized()); roundTripTestCase(original, NamedHolder::new); } @@ -162,9 +162,16 @@ public void testSerializesWithRemoteVersion() throws IOException { assertThat(roundTrip(original, SneakOtherSideVersionOnWire::new, remoteVersion).get().version, equalTo(remoteVersion)); } + public void testAsSerializedIsNoopOnSerialized() throws IOException { + Example e = new Example(randomAlphaOfLength(5)); + DelayableWriteable d = DelayableWriteable.referencing(e).asSerialized(Example::new, writableRegistry()); + assertTrue(d.isSerialized()); + assertSame(d, d.asSerialized(Example::new, writableRegistry())); + } + private void roundTripTestCase(DelayableWriteable original, Writeable.Reader reader) throws IOException { DelayableWriteable roundTripped = roundTrip(original, reader, Version.CURRENT); - assertTrue(roundTripped.isDelayed()); + assertTrue(roundTripped.isSerialized()); assertThat(roundTripped.get(), equalTo(original.get())); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index b1f985d9d7583..96f3119292932 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1372,9 +1372,11 @@ clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedActi SearchExecutionStatsCollector.makeWrapper(responseCollectorService)); final SearchService searchService = new SearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, new FetchPhase(Collections.emptyList()), responseCollectorService, new NoneCircuitBreakerService()); + SearchPhaseController searchPhaseController = new SearchPhaseController( + writableRegistry(), searchService::aggReduceContextBuilder); actions.put(SearchAction.INSTANCE, new TransportSearchAction(threadPool, transportService, searchService, - searchTransportService, new SearchPhaseController(searchService::aggReduceContextBuilder), clusterService, + searchTransportService, searchPhaseController, clusterService, actionFilters, indexNameExpressionResolver)); actions.put(RestoreSnapshotAction.INSTANCE, new TransportRestoreSnapshotAction(transportService, clusterService, threadPool, restoreService, actionFilters,