diff --git a/core/src/main/java/org/elasticsearch/action/ActionModule.java b/core/src/main/java/org/elasticsearch/action/ActionModule.java index c613f617774c1..bc21b2e74f162 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/core/src/main/java/org/elasticsearch/action/ActionModule.java @@ -176,6 +176,8 @@ import org.elasticsearch.action.search.type.TransportSearchScrollQueryThenFetchAction; import org.elasticsearch.action.search.type.TransportSearchScrollScanAction; import org.elasticsearch.action.suggest.SuggestAction; +import org.elasticsearch.action.suggest.TransportSuggestAndFetchAction; +import org.elasticsearch.action.suggest.TransportSuggestThenFetchAction; import org.elasticsearch.action.suggest.TransportSuggestAction; import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.ActionFilters; @@ -315,7 +317,7 @@ protected void configure() { TransportShardMultiTermsVectorAction.class); registerAction(DeleteAction.INSTANCE, TransportDeleteAction.class); registerAction(ExistsAction.INSTANCE, TransportExistsAction.class); - registerAction(SuggestAction.INSTANCE, TransportSuggestAction.class); + registerAction(SuggestAction.INSTANCE, TransportSuggestAction.class, TransportSuggestThenFetchAction.class, TransportSuggestAndFetchAction.class); registerAction(UpdateAction.INSTANCE, TransportUpdateAction.class); registerAction(MultiGetAction.INSTANCE, TransportMultiGetAction.class, TransportShardMultiGetAction.class); diff --git a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java index de3032eb887a1..ae141987e1f7b 100644 --- a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java @@ -23,10 +23,7 @@ import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.action.search.ReduceSearchPhaseException; -import org.elasticsearch.action.search.SearchPhaseExecutionException; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.*; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -167,7 +164,7 @@ void innerExecuteFetchPhase() throws Exception { for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { QuerySearchResult queryResult = queryResults.get(entry.index); DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); + ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, null, lastEmittedDocPerShard); executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } } diff --git a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java index edd5cf63d2273..9112e17c12258 100644 --- a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java @@ -102,7 +102,7 @@ protected void moveToSecondPhase() throws Exception { for (AtomicArray.Entry entry : docIdsToLoad.asList()) { QuerySearchResultProvider queryResult = firstResults.get(entry.index); DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard); + ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, null, lastEmittedDocPerShard); executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } } diff --git a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java index fa5776387ddd7..c58c43bb25696 100644 --- a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java @@ -347,12 +347,12 @@ protected void sendReleaseSearchContext(long contextId, DiscoveryNode node) { } } - protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, AtomicArray.Entry entry, ScoreDoc[] lastEmittedDocPerShard) { + protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, AtomicArray.Entry entry, Map namedDocIds, ScoreDoc[] lastEmittedDocPerShard) { if (lastEmittedDocPerShard != null) { ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index]; - return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc); + return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, namedDocIds, lastEmittedDoc); } else { - return new ShardFetchSearchRequest(request, queryResult.id(), entry.value); + return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, namedDocIds); } } diff --git a/core/src/main/java/org/elasticsearch/action/suggest/ShardSuggestRequest.java b/core/src/main/java/org/elasticsearch/action/suggest/ShardSuggestRequest.java deleted file mode 100644 index 794dd9badf76d..0000000000000 --- a/core/src/main/java/org/elasticsearch/action/suggest/ShardSuggestRequest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.suggest; - -import org.elasticsearch.action.support.broadcast.BroadcastShardRequest; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.index.shard.ShardId; - -import java.io.IOException; - -/** - * Internal suggest request executed directly against a specific index shard. - */ -final class ShardSuggestRequest extends BroadcastShardRequest { - - private BytesReference suggestSource; - - ShardSuggestRequest() { - } - - ShardSuggestRequest(ShardId shardId, SuggestRequest request) { - super(shardId, request); - this.suggestSource = request.suggest(); - } - - public BytesReference suggest() { - return suggestSource; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - suggestSource = in.readBytesReference(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBytesReference(suggestSource); - } -} diff --git a/core/src/main/java/org/elasticsearch/action/suggest/ShardSuggestResponse.java b/core/src/main/java/org/elasticsearch/action/suggest/ShardSuggestResponse.java deleted file mode 100644 index bca29800bd111..0000000000000 --- a/core/src/main/java/org/elasticsearch/action/suggest/ShardSuggestResponse.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.suggest; - -import org.elasticsearch.action.support.broadcast.BroadcastShardResponse; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.search.suggest.Suggest; - -import java.io.IOException; - -/** - * Internal suggest response of a shard suggest request executed directly against a specific shard. - */ -class ShardSuggestResponse extends BroadcastShardResponse { - - private final Suggest suggest; - - ShardSuggestResponse() { - this.suggest = new Suggest(); - } - - ShardSuggestResponse(ShardId shardId, Suggest suggest) { - super(shardId); - this.suggest = suggest; - } - - public Suggest getSuggest() { - return this.suggest; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - suggest.readFrom(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - suggest.writeTo(out); - } -} diff --git a/core/src/main/java/org/elasticsearch/action/suggest/SuggestRequest.java b/core/src/main/java/org/elasticsearch/action/suggest/SuggestRequest.java index c75e262bac499..ba126201b9e95 100644 --- a/core/src/main/java/org/elasticsearch/action/suggest/SuggestRequest.java +++ b/core/src/main/java/org/elasticsearch/action/suggest/SuggestRequest.java @@ -19,17 +19,13 @@ package org.elasticsearch.action.suggest; -import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.broadcast.BroadcastRequest; -import org.elasticsearch.client.Requests; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.search.suggest.SuggestBuilder; import java.io.IOException; import java.util.Arrays; @@ -39,8 +35,7 @@ * {@link org.elasticsearch.client.Requests#suggestRequest(String...)}. *

*

The request requires the suggest query source to be set either using - * {@link #suggest(org.elasticsearch.common.bytes.BytesReference)} / {@link #suggest(org.elasticsearch.common.bytes.BytesReference)} - * or by using {@link #suggest(org.elasticsearch.search.suggest.SuggestBuilder)} + * {@link #suggest(org.elasticsearch.common.bytes.BytesReference)} * (Best created using the {link @org.elasticsearch.search.suggest.SuggestBuilders)}). * * @see SuggestResponse @@ -69,12 +64,6 @@ public SuggestRequest(String... indices) { super(indices); } - @Override - public ActionRequestValidationException validate() { - ActionRequestValidationException validationException = super.validate(); - return validationException; - } - /** * The Phrase to get correction suggestions for */ @@ -90,26 +79,6 @@ public SuggestRequest suggest(BytesReference suggestSource) { return this; } - /** - * set a new source using a {@link org.elasticsearch.search.suggest.SuggestBuilder} - * for phrase and term suggestion lookup - */ - public SuggestRequest suggest(SuggestBuilder suggestBuilder) { - return suggest(suggestBuilder.buildAsBytes(Requests.CONTENT_TYPE)); - } - - /** - * set a new source using a {@link org.elasticsearch.search.suggest.SuggestBuilder.SuggestionBuilder} - * for completion suggestion lookup - */ - public SuggestRequest suggest(SuggestBuilder.SuggestionBuilder suggestionBuilder) { - return suggest(suggestionBuilder.buildAsBytes(Requests.CONTENT_TYPE)); - } - - public SuggestRequest suggest(String source) { - return suggest(new BytesArray(source)); - } - /** * A comma separated list of routing values to control the shards the search will be executed on. */ @@ -147,7 +116,7 @@ public void readFrom(StreamInput in) throws IOException { super.readFrom(in); routing = in.readOptionalString(); preference = in.readOptionalString(); - suggest(in.readBytesReference()); + suggestSource = in.readBytesReference(); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/suggest/SuggestRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/suggest/SuggestRequestBuilder.java index 954b86c6ff53e..07e62b9ee8044 100644 --- a/core/src/main/java/org/elasticsearch/action/suggest/SuggestRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/suggest/SuggestRequestBuilder.java @@ -23,9 +23,12 @@ import org.elasticsearch.action.support.broadcast.BroadcastOperationRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.Requests; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.fetch.source.FetchSourceContext; import org.elasticsearch.search.suggest.SuggestBuilder; import org.elasticsearch.search.suggest.SuggestBuilder.SuggestionBuilder; @@ -36,7 +39,7 @@ */ public class SuggestRequestBuilder extends BroadcastOperationRequestBuilder { - final SuggestBuilder suggest = new SuggestBuilder(); + final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); public SuggestRequestBuilder(ElasticsearchClient client, SuggestAction action) { super(client, action, new SuggestRequest()); @@ -46,7 +49,7 @@ public SuggestRequestBuilder(ElasticsearchClient client, SuggestAction action) { * Add a definition for suggestions to the request */ public SuggestRequestBuilder addSuggestion(SuggestionBuilder suggestion) { - suggest.addSuggestion(suggestion); + searchSourceBuilder.suggest().addSuggestion(suggestion); return this; } @@ -59,7 +62,7 @@ public SuggestRequestBuilder setRouting(String routing) { } public SuggestRequestBuilder setSuggestText(String globalText) { - this.suggest.setText(globalText); + searchSourceBuilder.suggest().setText(globalText); return this; } @@ -82,11 +85,62 @@ public SuggestRequestBuilder setRouting(String... routing) { return this; } + /** + * Indicates whether the response should contain the stored _source for + * every hit + */ + public SuggestRequestBuilder fetchSource(boolean fetch) { + searchSourceBuilder.fetchSource(fetch); + return this; + } + + /** + * Indicate that _source should be returned with every hit, with an + * "include" and/or "exclude" set which can include simple wildcard + * elements. + * + * @param include + * An optional include (optionally wildcarded) pattern to filter + * the returned _source + * @param exclude + * An optional exclude (optionally wildcarded) pattern to filter + * the returned _source + */ + public SuggestRequestBuilder fetchSource(@Nullable String include, @Nullable String exclude) { + searchSourceBuilder.fetchSource(include, exclude); + return this; + } + + /** + * Indicate that _source should be returned with every hit, with an + * "include" and/or "exclude" set which can include simple wildcard + * elements. + * + * @param includes + * An optional list of include (optionally wildcarded) pattern to + * filter the returned _source + * @param excludes + * An optional list of exclude (optionally wildcarded) pattern to + * filter the returned _source + */ + public SuggestRequestBuilder fetchSource(@Nullable String[] includes, @Nullable String[] excludes) { + searchSourceBuilder.fetchSource(includes, excludes); + return this; + } + + /** + * Indicate how the _source should be fetched. + */ + public SuggestRequestBuilder fetchSource(@Nullable FetchSourceContext fetchSourceContext) { + searchSourceBuilder.fetchSource(fetchSourceContext); + return this; + } + @Override protected SuggestRequest beforeExecute(SuggestRequest request) { try { XContentBuilder builder = XContentFactory.contentBuilder(Requests.CONTENT_TYPE); - suggest.toXContent(builder, ToXContent.EMPTY_PARAMS); + searchSourceBuilder.toXContent(builder, ToXContent.EMPTY_PARAMS); request.suggest(builder.bytes()); } catch (IOException e) { throw new ElasticsearchException("Unable to build suggestion request", e); diff --git a/core/src/main/java/org/elasticsearch/action/suggest/TransportSuggestAction.java b/core/src/main/java/org/elasticsearch/action/suggest/TransportSuggestAction.java index c584c8856a5b5..8cf8f6d863c03 100644 --- a/core/src/main/java/org/elasticsearch/action/suggest/TransportSuggestAction.java +++ b/core/src/main/java/org/elasticsearch/action/suggest/TransportSuggestAction.java @@ -19,142 +19,90 @@ package org.elasticsearch.action.suggest; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.DefaultShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.TransportBroadcastAction; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.suggest.stats.ShardSuggestMetric; -import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.indices.IndexClosedException; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.suggest.Suggest; -import org.elasticsearch.search.suggest.SuggestPhase; -import org.elasticsearch.search.suggest.SuggestionSearchContext; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.*; /** - * Defines the transport of a suggestion request across the cluster + * Defines the transport of a suggestion request across the cluster. + * Delegates to {@link TransportSuggestThenFetchAction} in case of multiple shards + * and {@link TransportSuggestAndFetchAction} for single shard */ -public class TransportSuggestAction extends TransportBroadcastAction { +public class TransportSuggestAction extends HandledTransportAction { - private final IndicesService indicesService; - private final SuggestPhase suggestPhase; + private final TransportSuggestThenFetchAction suggestThenFetchAction; + private final TransportSuggestAndFetchAction suggestAndFetchAction; + private final ClusterService clusterService; @Inject - public TransportSuggestAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, - IndicesService indicesService, SuggestPhase suggestPhase, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver) { - super(settings, SuggestAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, - SuggestRequest.class, ShardSuggestRequest.class, ThreadPool.Names.SUGGEST); - this.indicesService = indicesService; - this.suggestPhase = suggestPhase; + public TransportSuggestAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, + TransportSuggestThenFetchAction suggestThenFetchAction, TransportSuggestAndFetchAction suggestAndFetchAction) { + super(settings, SuggestAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SuggestRequest.class); + this.clusterService = clusterService; + this.suggestThenFetchAction = suggestThenFetchAction; + this.suggestAndFetchAction = suggestAndFetchAction; } @Override - protected ShardSuggestRequest newShardRequest(int numShards, ShardRouting shard, SuggestRequest request) { - return new ShardSuggestRequest(shard.shardId(), request); - } - - @Override - protected ShardSuggestResponse newShardResponse() { - return new ShardSuggestResponse(); - } - - @Override - protected GroupShardsIterator shards(ClusterState clusterState, SuggestRequest request, String[] concreteIndices) { - Map> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, request.routing(), request.indices()); - return clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, request.preference()); - } - - @Override - protected ClusterBlockException checkGlobalBlock(ClusterState state, SuggestRequest request) { - return state.blocks().globalBlockedException(ClusterBlockLevel.READ); - } - - @Override - protected ClusterBlockException checkRequestBlock(ClusterState state, SuggestRequest countRequest, String[] concreteIndices) { - return state.blocks().indicesBlockedException(ClusterBlockLevel.READ, concreteIndices); - } - - @Override - protected SuggestResponse newResponse(SuggestRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) { - int successfulShards = 0; - int failedShards = 0; - - final Map> groupedSuggestions = new HashMap<>(); - - List shardFailures = null; - for (int i = 0; i < shardsResponses.length(); i++) { - Object shardResponse = shardsResponses.get(i); - if (shardResponse == null) { - // simply ignore non active shards - } else if (shardResponse instanceof BroadcastShardOperationFailedException) { - failedShards++; - if (shardFailures == null) { - shardFailures = new ArrayList<>(); + protected void doExecute(SuggestRequest request, final ActionListener listener) { + final SearchRequest searchRequest = new SearchRequest(request.indices()); + if (request.suggest() != null) { + searchRequest.source(request.suggest()); + } + searchRequest.preference(request.preference()); + searchRequest.routing(request.routing()); + searchRequest.indicesOptions(request.indicesOptions()); + final ActionListener searchResponseActionListener = new ActionListener() { + @Override + public void onResponse(SearchResponse searchResponse) { + Suggest suggest = searchResponse.getSuggest(); + if (suggest != null) { + listener.onResponse(new SuggestResponse(suggest)); + } else { + listener.onResponse(new SuggestResponse(new Suggest(Collections.>>emptyList()))); } - shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse)); - } else { - Suggest suggest = ((ShardSuggestResponse) shardResponse).getSuggest(); - Suggest.group(groupedSuggestions, suggest); - successfulShards++; } - } - return new SuggestResponse(new Suggest(Suggest.reduce(groupedSuggestions)), shardsResponses.length(), successfulShards, failedShards, shardFailures); - } - - @Override - protected ShardSuggestResponse shardOperation(ShardSuggestRequest request) { - IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); - IndexShard indexShard = indexService.shardSafe(request.shardId().id()); - ShardSuggestMetric suggestMetric = indexShard.getSuggestMetric(); - suggestMetric.preSuggest(); - long startTime = System.nanoTime(); - XContentParser parser = null; - try (Engine.Searcher searcher = indexShard.acquireSearcher("suggest")) { - BytesReference suggest = request.suggest(); - if (suggest != null && suggest.length() > 0) { - parser = XContentFactory.xContent(suggest).createParser(suggest); - if (parser.nextToken() != XContentParser.Token.START_OBJECT) { - throw new IllegalArgumentException("suggest content missing"); - } - final SuggestionSearchContext context = suggestPhase.parseElement().parseInternal(parser, indexService.mapperService(), - indexService.queryParserService(), request.shardId().getIndex(), request.shardId().id()); - final Suggest result = suggestPhase.execute(context, searcher.searcher()); - return new ShardSuggestResponse(request.shardId(), result); + @Override + public void onFailure(Throwable e) { + listener.onFailure(e); } - return new ShardSuggestResponse(request.shardId(), new Suggest()); - } catch (Throwable ex) { - throw new ElasticsearchException("failed to execute suggest", ex); - } finally { - if (parser != null) { - parser.close(); + }; + boolean singleShard = false; + try { + ClusterState clusterState = clusterService.state(); + String[] concreteIndices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest); + Map> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(), searchRequest.indices()); + int shardCount = clusterService.operationRouting().searchShardsCount(clusterState, concreteIndices, routingMap); + if (shardCount == 1) { + singleShard = true; } - suggestMetric.postSuggest(System.nanoTime() - startTime); + } catch (IndexNotFoundException | IndexClosedException e) { + // ignore these failures, we will notify the search response if its really the case from the actual action + } catch (Exception e) { + logger.debug("failed to optimize search type, continue as normal", e); + } + if (singleShard) { + suggestAndFetchAction.doExecute(searchRequest, searchResponseActionListener); + } else { + suggestThenFetchAction.doExecute(searchRequest, searchResponseActionListener); } } + } diff --git a/core/src/main/java/org/elasticsearch/action/suggest/TransportSuggestAndFetchAction.java b/core/src/main/java/org/elasticsearch/action/suggest/TransportSuggestAndFetchAction.java new file mode 100644 index 0000000000000..4d02bd90dd467 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/suggest/TransportSuggestAndFetchAction.java @@ -0,0 +1,102 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.suggest; + +import org.apache.lucene.search.ScoreDoc; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.search.ReduceSearchPhaseException; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.type.TransportSearchTypeAction; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.search.SearchService; +import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.controller.SearchPhaseController; +import org.elasticsearch.search.fetch.QueryFetchSearchResult; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.internal.ShardSearchTransportRequest; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.Map; + +/** + * Single phase transport action for suggest and fetch + * Executes suggest and fetch using {@link SearchService#executeSuggestFetchPhase(org.elasticsearch.search.internal.ShardSearchRequest)} + * on relevant shards + */ +public class TransportSuggestAndFetchAction extends TransportSearchTypeAction { + + @Inject + public TransportSuggestAndFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, + SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, threadPool, clusterService, searchService, searchPhaseController, actionFilters, indexNameExpressionResolver); + } + + @Override + protected void doExecute(SearchRequest searchRequest, ActionListener listener) { + new AsyncAction(searchRequest, listener).start(); + } + + private class AsyncAction extends BaseAsyncAction { + + private AsyncAction(SearchRequest request, ActionListener listener) { + super(request, listener); + } + + @Override + protected String firstPhaseName() { + return "suggest_fetch"; + } + + @Override + protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener) { + searchService.sendExecuteSuggestFetch(node, request, listener); + } + + @Override + protected void moveToSecondPhase() throws Exception { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { + @Override + public void doRun() throws IOException { + final Map sortedShardList = searchPhaseController.sortSuggestDocs(firstResults); + final InternalSearchResponse internalResponse = searchPhaseController.mergeSuggestions(sortedShardList, firstResults, firstResults); + listener.onResponse(new SearchResponse(internalResponse, null, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); + } + + @Override + public void onFailure(Throwable t) { + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", t, buildShardFailures()); + if (logger.isDebugEnabled()) { + logger.debug("failed to reduce search", failure); + } + super.onFailure(failure); + } + }); + } + } +} diff --git a/core/src/main/java/org/elasticsearch/action/suggest/TransportSuggestThenFetchAction.java b/core/src/main/java/org/elasticsearch/action/suggest/TransportSuggestThenFetchAction.java new file mode 100644 index 0000000000000..31bbf700eaed0 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/suggest/TransportSuggestThenFetchAction.java @@ -0,0 +1,197 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.suggest; + +import com.carrotsearch.hppc.IntArrayList; +import org.apache.lucene.search.ScoreDoc; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.search.ReduceSearchPhaseException; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.type.TransportSearchTypeAction; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.search.SearchService; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.controller.SearchPhaseController; +import org.elasticsearch.search.fetch.FetchSearchResult; +import org.elasticsearch.search.fetch.ShardFetchSearchRequest; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.internal.ShardSearchTransportRequest; +import org.elasticsearch.search.query.QuerySearchResultProvider; +import org.elasticsearch.search.suggest.Suggest; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Two phased Transport action for suggest then fetch + * + * Executes suggest by {@link SearchService#executeSuggestPhase(org.elasticsearch.search.internal.ShardSearchRequest)} + * on relevant shards followed by {@link org.elasticsearch.search.SearchService#executeFetchPhase(org.elasticsearch.search.fetch.ShardFetchRequest)} + * to fetch suggest documents + */ +public class TransportSuggestThenFetchAction extends TransportSearchTypeAction { + + @Inject + public TransportSuggestThenFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, + SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, threadPool, clusterService, searchService, searchPhaseController, actionFilters, indexNameExpressionResolver); + } + + @Override + protected void doExecute(SearchRequest searchRequest, ActionListener listener) { + new AsyncAction(searchRequest, listener).start(); + } + + + private class AsyncAction extends BaseAsyncAction { + + final AtomicArray fetchResults; + final AtomicArray> namedDocIdSets; + private Map sortedShardList; + private AtomicArray.Entry scratch = new AtomicArray.Entry<>(0, new IntArrayList(0)); + + private AsyncAction(SearchRequest request, ActionListener listener) { + super(request, listener); + fetchResults = new AtomicArray<>(firstResults.length()); + namedDocIdSets = new AtomicArray<>(firstResults.length()); + } + + @Override + protected String firstPhaseName() { + return "suggest"; + } + + @Override + protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, final ActionListener listener) { + searchService.sendExecuteSuggest(node, request, listener); + } + + @Override + protected void moveToSecondPhase() throws Exception { + sortedShardList = searchPhaseController.sortSuggestDocs(firstResults); + searchPhaseController.fillNamedDocIdSets(namedDocIdSets, sortedShardList); + List>> namedDocIdList = namedDocIdSets.asList(); + if (namedDocIdList.isEmpty()) { + finishHim(); + return; + } + + final AtomicInteger counter = new AtomicInteger(namedDocIdList.size()); + for (AtomicArray.Entry> entry : namedDocIdList) { + QuerySearchResultProvider queryResult = firstResults.get(entry.index); + DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); + ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), scratch, entry.value, null); + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); + } + } + + void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) { + searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener() { + @Override + public void onResponse(FetchSearchResult result) { + result.shardTarget(shardTarget); + fetchResults.set(shardIndex, result); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + + @Override + public void onFailure(Throwable t) { + // the search context might not be cleared on the node where the fetch was executed for example + // because the action was rejected by the thread pool. in this case we need to send a dedicated + // request to clear the search context. by setting docIdsToLoad to null, the context will be cleared + // in releaseSearchContexts() after the search request is done. + namedDocIdSets.set(shardIndex, null); + onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter); + } + }); + } + + void onFetchFailure(Throwable t, ShardFetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget, AtomicInteger counter) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id()); + } + this.addShardFailure(shardIndex, shardTarget, t); + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + + private void finishHim() { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { + @Override + public void doRun() throws IOException { + try { + final InternalSearchResponse internalResponse = searchPhaseController.mergeSuggestions(sortedShardList, firstResults, fetchResults); + listener.onResponse(new SearchResponse(internalResponse, null, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); + } catch (IllegalArgumentException e) { + listener.onFailure(e); + } finally { + releaseSearchContexts(firstResults, namedDocIdSets); + } + } + + @Override + public void onFailure(Throwable t) { + try { + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", t, buildShardFailures()); + if (logger.isDebugEnabled()) { + logger.debug("failed to reduce search", failure); + } + super.onFailure(failure); + } finally { + releaseSearchContexts(firstResults, namedDocIdSets); + } + } + }); + } + + private void releaseSearchContexts(AtomicArray queryResults, + AtomicArray> namedDocIdsToLoad) { + for (AtomicArray.Entry entry : queryResults.asList()) { + Suggest suggest = entry.value.queryResult().suggest(); + if (suggest != null && suggest.hasScoreDocs() // had some hits + && namedDocIdsToLoad.get(entry.index) == null) { // but did not make it to global hits + try { + DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId()); + sendReleaseSearchContext(entry.value.queryResult().id(), node); + } catch (Throwable t1) { + logger.trace("failed to release context", t1); + } + } + } + } + } +} diff --git a/core/src/main/java/org/elasticsearch/percolator/PercolateContext.java b/core/src/main/java/org/elasticsearch/percolator/PercolateContext.java index d42b858ce7bf1..9ebca1c264089 100644 --- a/core/src/main/java/org/elasticsearch/percolator/PercolateContext.java +++ b/core/src/main/java/org/elasticsearch/percolator/PercolateContext.java @@ -595,6 +595,16 @@ public int docIdsToLoadSize() { throw new UnsupportedOperationException(); } + @Override + public Map namedDocIdsToLoad() { + throw new UnsupportedOperationException(); + } + + @Override + public SearchContext namedDocIdsToLoad(Map namedDocIds) { + throw new UnsupportedOperationException(); + } + @Override public SearchContext docIdsToLoad(int[] docIdsToLoad, int docsIdsToLoadFrom, int docsIdsToLoadSize) { throw new UnsupportedOperationException(); diff --git a/core/src/main/java/org/elasticsearch/rest/action/suggest/RestSuggestAction.java b/core/src/main/java/org/elasticsearch/rest/action/suggest/RestSuggestAction.java index 8bf360dc36cb1..71ecd96b3df15 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/suggest/RestSuggestAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/suggest/RestSuggestAction.java @@ -23,12 +23,14 @@ import static org.elasticsearch.rest.RestRequest.Method.POST; import static org.elasticsearch.rest.action.support.RestActions.buildBroadcastShardsHeader; import org.elasticsearch.action.suggest.SuggestRequest; +import org.elasticsearch.action.suggest.SuggestRequestBuilder; import org.elasticsearch.action.suggest.SuggestResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BytesRestResponse; @@ -39,8 +41,11 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestBuilderListener; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.suggest.Suggest; +import java.util.Collections; + /** * */ @@ -60,7 +65,9 @@ public void handleRequest(final RestRequest request, final RestChannel channel, SuggestRequest suggestRequest = new SuggestRequest(Strings.splitStringByCommaToArray(request.param("index"))); suggestRequest.indicesOptions(IndicesOptions.fromRequest(request, suggestRequest.indicesOptions())); if (RestActions.hasBodyContent(request)) { - suggestRequest.suggest(RestActions.getRestContent(request)); + SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.searchSource(); + searchSourceBuilder.suggest(RestActions.getRestContent(request)); + suggestRequest.suggest(searchSourceBuilder.buildAsBytes()); } else { throw new IllegalArgumentException("no content or source provided to execute suggestion"); } @@ -75,7 +82,7 @@ public RestResponse buildResponse(SuggestResponse response, XContentBuilder buil buildBroadcastShardsHeader(builder, request, response); Suggest suggest = response.getSuggest(); if (suggest != null) { - suggest.toXContent(builder, request); + suggest.toXContent(builder, new ToXContent.MapParams(Collections.singletonMap("use_suggest_namespace", "false"))); } builder.endObject(); return new BytesRestResponse(restStatus, builder); diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java index 49acd1163fcf6..225a4fddde738 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java @@ -19,7 +19,6 @@ package org.elasticsearch.search; -import org.elasticsearch.common.Classes; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.common.settings.Settings; @@ -143,10 +142,9 @@ import org.elasticsearch.search.highlight.Highlighter; import org.elasticsearch.search.highlight.Highlighters; import org.elasticsearch.search.query.QueryPhase; -import org.elasticsearch.search.suggest.SuggestParseElement; -import org.elasticsearch.search.suggest.SuggestPhase; import org.elasticsearch.search.suggest.Suggester; import org.elasticsearch.search.suggest.Suggesters; +import org.elasticsearch.search.suggest.completion.NamedDocIdsFetchPhase; import java.util.*; @@ -241,6 +239,7 @@ protected void configureFetchSubPhase() { fetchSubPhaseMultibinder.addBinding().to(clazz); } bind(InnerHitsFetchSubPhase.class).asEagerSingleton(); + bind(NamedDocIdsFetchPhase.class).asEagerSingleton(); } protected void configureSuggesters() { diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index 15eb3c0e8dc20..ca12f2ece3418 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -26,6 +26,7 @@ import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; @@ -67,6 +68,7 @@ import org.elasticsearch.index.search.stats.StatsGroupsParseElement; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.suggest.stats.ShardSuggestMetric; import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesWarmer; @@ -86,6 +88,8 @@ import org.elasticsearch.search.internal.*; import org.elasticsearch.search.internal.SearchContext.Lifetime; import org.elasticsearch.search.query.*; +import org.elasticsearch.search.suggest.Suggest; +import org.elasticsearch.search.suggest.SuggestPhase; import org.elasticsearch.search.warmer.IndexWarmersMetaData; import org.elasticsearch.threadpool.ThreadPool; @@ -134,6 +138,8 @@ public class SearchService extends AbstractLifecycleComponent { private final FetchPhase fetchPhase; + private final SuggestPhase suggestPhase; + private final IndicesRequestCache indicesQueryCache; private final long defaultKeepAlive; @@ -152,7 +158,7 @@ public class SearchService extends AbstractLifecycleComponent { @Inject public SearchService(Settings settings, NodeSettingsService nodeSettingsService, ClusterService clusterService, IndicesService indicesService,IndicesWarmer indicesWarmer, ThreadPool threadPool, - ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase, + ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase, SuggestPhase suggestPhase, IndicesRequestCache indicesQueryCache) { super(settings); this.parseFieldMatcher = new ParseFieldMatcher(settings); @@ -187,6 +193,7 @@ public void afterIndexDeleted(Index index, @IndexSettings Settings indexSettings this.dfsPhase = dfsPhase; this.queryPhase = queryPhase; this.fetchPhase = fetchPhase; + this.suggestPhase = suggestPhase; this.indicesQueryCache = indicesQueryCache; TimeValue keepAliveInterval = settings.getAsTime(KEEPALIVE_INTERVAL_KEY, timeValueMinutes(1)); @@ -385,6 +392,33 @@ public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) { } } + public QuerySearchResultProvider executeSuggestPhase(ShardSearchRequest request) { + final SearchContext context = createAndPutContext(request); + try { + final ShardSuggestMetric suggestMetric = context.indexShard().getSuggestMetric(); + suggestMetric.preSuggest(); + long time = System.nanoTime(); + contextProcessing(context); + suggestPhase.execute(context); + context.queryResult().topDocs(Lucene.EMPTY_TOP_DOCS); + Suggest suggest = context.queryResult().suggest(); + boolean freeContext = suggest == null || !suggest.hasScoreDocs(); + if (freeContext) { + freeContext(context.id()); + } else { + contextProcessedSuccessfully(context); + } + suggestMetric.postSuggest(System.nanoTime() - time); + return context.queryResult(); + } catch (Throwable e) { + logger.trace("Query phase failed", e); + processFailure(context, e); + throw ExceptionsHelper.convertToRuntime(e); + } finally { + cleanContext(context); + } + } + public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request) { final SearchContext context = findContext(request.id()); ShardSearchStats shardSearchStats = context.indexShard().searchService(); @@ -445,6 +479,43 @@ private boolean fetchPhaseShouldFreeContext(SearchContext context) { } } + public QueryFetchSearchResult executeSuggestFetchPhase(ShardSearchRequest request) { + final SearchContext context = createAndPutContext(request); + try { + final ShardSuggestMetric suggestMetric = context.indexShard().getSuggestMetric(); + final ShardSearchStats shardSearchStats = context.indexShard().searchService(); + suggestMetric.preSuggest(); + long time = System.nanoTime(); + contextProcessing(context); + try { + suggestPhase.execute(context); + } catch (Throwable e) { + shardSearchStats.onFailedQueryPhase(context); + throw ExceptionsHelper.convertToRuntime(e); + } + context.queryResult().topDocs(Lucene.EMPTY_TOP_DOCS); + long time2 = System.nanoTime(); + suggestMetric.postSuggest(time2 - time); + shardSearchStats.onPreFetchPhase(context); + try { + loadNamedDocIds(context); + fetchPhase.execute(context); + freeContext(context.id()); + } catch (Throwable e) { + shardSearchStats.onFailedFetchPhase(context); + throw ExceptionsHelper.convertToRuntime(e); + } + shardSearchStats.onFetchPhase(context, System.nanoTime() - time2); + return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); + } catch (Throwable e) { + logger.trace("Fetch phase failed", e); + processFailure(context, e); + throw ExceptionsHelper.convertToRuntime(e); + } finally { + cleanContext(context); + } + } + public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request) { final SearchContext context = createAndPutContext(request); contextProcessing(context); @@ -573,6 +644,7 @@ public FetchSearchResult executeFetchPhase(ShardFetchRequest request) { context.scrollContext().lastEmittedDoc = request.lastEmittedDoc(); } context.docIdsToLoad(request.docIds(), 0, request.docIdsSize()); + context.namedDocIdsToLoad(request.namedDocIds()); shardSearchStats.onPreFetchPhase(context); long time = System.nanoTime(); fetchPhase.execute(context); @@ -843,6 +915,23 @@ private void parseSource(SearchContext context, BytesReference source) throws Se private static final int[] EMPTY_DOC_IDS = new int[0]; + private void loadNamedDocIds(SearchContext context) { + Suggest suggest = context.queryResult().suggest(); + if (suggest != null && suggest.hasScoreDocs()) { + Map namedScoreDocs = suggest.toNamedScoreDocs(-1); + Map namedDocIds = new HashMap<>(namedScoreDocs.size()); + for (Map.Entry entry : namedScoreDocs.entrySet()) { + ScoreDoc[] scoreDocs = entry.getValue(); + int[] docIds = new int[scoreDocs.length]; + for (int i = 0; i < scoreDocs.length; i++) { + docIds[i] = scoreDocs[i].doc; + } + namedDocIds.put(entry.getKey(), docIds); + } + context.namedDocIdsToLoad(namedDocIds); + } + } + /** * Shortcut ids to load, we load only "from" and up to "size". The phase controller * handles this as well since the result is always size * shards for Q_A_F diff --git a/core/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java b/core/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java index 4205fd95299de..a79e1a0a81c9d 100644 --- a/core/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java +++ b/core/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java @@ -57,6 +57,8 @@ public class SearchServiceTransportAction extends AbstractComponent { public static final String CLEAR_SCROLL_CONTEXTS_ACTION_NAME = "indices:data/read/search[clear_scroll_contexts]"; public static final String DFS_ACTION_NAME = "indices:data/read/search[phase/dfs]"; public static final String QUERY_ACTION_NAME = "indices:data/read/search[phase/query]"; + public static final String SUGGEST_ACTION_NAME = "indices:data/read/search[phase/suggest]"; + public static final String SUGGEST_FETCH_ACTION_NAME = "indices:data/read/search[phase/suggest+fetch]"; public static final String QUERY_ID_ACTION_NAME = "indices:data/read/search[phase/query/id]"; public static final String QUERY_SCROLL_ACTION_NAME = "indices:data/read/search[phase/query/scroll]"; public static final String QUERY_FETCH_ACTION_NAME = "indices:data/read/search[phase/query+fetch]"; @@ -81,6 +83,8 @@ public SearchServiceTransportAction(Settings settings, TransportService transpor transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, ClearScrollContextsRequest.class, ThreadPool.Names.SAME, new ClearScrollContextsTransportHandler()); transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest.class, ThreadPool.Names.SEARCH, new SearchDfsTransportHandler()); transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest.class, ThreadPool.Names.SEARCH, new SearchQueryTransportHandler()); + transportService.registerRequestHandler(SUGGEST_ACTION_NAME, ShardSearchTransportRequest.class, ThreadPool.Names.SEARCH, new SearchSuggestTransportHandler()); + transportService.registerRequestHandler(SUGGEST_FETCH_ACTION_NAME, ShardSearchTransportRequest.class, ThreadPool.Names.SEARCH, new SearchSuggestFetchTransportHandler()); transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest.class, ThreadPool.Names.SEARCH, new SearchQueryByIdTransportHandler()); transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest.class, ThreadPool.Names.SEARCH, new SearchQueryScrollTransportHandler()); transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest.class, ThreadPool.Names.SEARCH, new SearchQueryFetchTransportHandler()); @@ -138,6 +142,24 @@ public DfsSearchResult newInstance() { }); } + public void sendExecuteSuggest(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener listener) { + transportService.sendRequest(node, SUGGEST_ACTION_NAME, request, new ActionListenerResponseHandler(listener) { + @Override + public QuerySearchResult newInstance() { + return new QuerySearchResult(); + } + }); + } + + public void sendExecuteSuggestFetch(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener listener) { + transportService.sendRequest(node, SUGGEST_FETCH_ACTION_NAME, request, new ActionListenerResponseHandler(listener) { + @Override + public QueryFetchSearchResult newInstance() { + return new QueryFetchSearchResult(); + } + }); + } + public void sendExecuteQuery(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener listener) { transportService.sendRequest(node, QUERY_ACTION_NAME, request, new ActionListenerResponseHandler(listener) { @Override @@ -370,6 +392,22 @@ public void messageReceived(ShardSearchTransportRequest request, TransportChanne } } + class SearchSuggestTransportHandler implements TransportRequestHandler { + @Override + public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel) throws Exception { + QuerySearchResultProvider result = searchService.executeSuggestPhase(request); + channel.sendResponse(result); + } + } + + class SearchSuggestFetchTransportHandler implements TransportRequestHandler { + @Override + public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel) throws Exception { + QueryFetchSearchResult result = searchService.executeSuggestFetchPhase(request); + channel.sendResponse(result); + } + } + class SearchQueryByIdTransportHandler implements TransportRequestHandler { @Override public void messageReceived(QuerySearchRequest request, TransportChannel channel) throws Exception { diff --git a/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java b/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java index 90ce07b87ffa4..0702ef06bf53b 100644 --- a/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java @@ -110,6 +110,7 @@ public static HighlightBuilder highlight() { private HighlightBuilder highlightBuilder; private SuggestBuilder suggestBuilder; + private BytesReference suggestBinary; private InnerHitsBuilder innerHitsBuilder; @@ -409,6 +410,11 @@ public SearchSourceBuilder aggregations(BytesReference aggregationsBinary) { return this; } + public SearchSourceBuilder suggest(BytesReference suggestBinary) { + this.suggestBinary = suggestBinary; + return this; + } + /** * Sets a raw (xcontent / json) addAggregation. */ @@ -781,6 +787,14 @@ public void innerToXContent(XContentBuilder builder, Params params) throws IOExc suggestBuilder.toXContent(builder, params); } + if (suggestBinary != null && suggestBinary.length() > 0) { + if (XContentFactory.xContentType(suggestBinary) == builder.contentType()) { + builder.rawField("suggest", suggestBinary); + } else { + builder.field("suggest_binary", suggestBinary); + } + } + if (rescoreBuilders != null) { // Strip empty rescoreBuilders from the request Iterator itr = rescoreBuilders.iterator(); diff --git a/core/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java index 74e263220e4d1..5e7498014e576 100644 --- a/core/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java +++ b/core/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java @@ -54,14 +54,10 @@ import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.search.suggest.Suggest; +import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import static org.elasticsearch.common.util.CollectionUtils.eagerTransform; @@ -426,4 +422,179 @@ public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArrayqueryResultsArr with hits from fetchResultsArr + * using sortedDocs + */ + public InternalSearchResponse mergeSuggestions(Map sortedDocs, AtomicArray queryResultsArr, AtomicArray fetchResultsArr) { + + List> queryResults = queryResultsArr.asList(); + List> fetchResults = fetchResultsArr.asList(); + + if (queryResults.isEmpty()) { + return InternalSearchResponse.empty(); + } + + Suggest suggest = null; + final Map> groupedSuggestions = new HashMap<>(); + boolean hasSuggestions = false; + // reduce suggestions + for (AtomicArray.Entry entry : queryResults) { + Suggest shardResult = entry.value.queryResult().queryResult().suggest(); + + if (shardResult == null) { + continue; + } + hasSuggestions = true; + Suggest.group(groupedSuggestions, shardResult); + } + if (hasSuggestions) { + List>> suggestions = Suggest.reduce(groupedSuggestions); + suggest = new Suggest(Suggest.Fields.SUGGEST, suggestions); + if (!fetchResults.isEmpty()) { + // enrich suggestions + for (Map.Entry entry : sortedDocs.entrySet()) { + String suggestionName = entry.getKey(); + ScoreDoc[] scoreDocs = entry.getValue(); + Suggest.Suggestion completionSuggestion = suggest.getCompletionSuggestion(suggestionName); + List options = completionSuggestion.options(); + assert options.size() == scoreDocs.length; + int[] counters = new int[queryResults.size()]; + for (int i = 0; i < scoreDocs.length; i++) { + ScoreDoc doc = scoreDocs[i]; + FetchSearchResultProvider fetchSearchResultProvider = fetchResultsArr.get(doc.shardIndex); + FetchSearchResult fetchSearchResult = fetchSearchResultProvider.fetchResult(); + Map namedHits = fetchSearchResult.namedHits(); + InternalSearchHit[] hits = namedHits.get(suggestionName).internalHits(); + CompletionSuggestion.Entry.Option option = (CompletionSuggestion.Entry.Option) options.get(i); + InternalSearchHit hit = hits[counters[doc.shardIndex]]; + hit.score(doc.score); + hit.shard(fetchSearchResult.shardTarget()); + option.hit(hit); + counters[doc.shardIndex]++; + } + } + } + } + return new InternalSearchResponse(InternalSearchHits.empty(), null, suggest, false, false); + } + + public void fillNamedDocIdSets(AtomicArray> namedDocIdSets, Map sortedShardList) { + for (Map.Entry entry : sortedShardList.entrySet()) { + for (ScoreDoc doc : entry.getValue()) { + Map namedDocIds = namedDocIdSets.get(doc.shardIndex); + if (namedDocIds == null) { + namedDocIds = new HashMap<>(); + namedDocIdSets.set(doc.shardIndex, namedDocIds); + } + IntArrayList docIds = namedDocIds.get(entry.getKey()); + if (docIds == null) { + docIds = new IntArrayList(); + namedDocIds.put(entry.getKey(), docIds); + } + docIds.add(doc.doc); + } + } + } + + /** + * Merges suggestion docs and retrieves top N suggestions by name + * The suggestion query results are trimmed as a side-effect + * + * @return a map of suggestion name and Top-n docs + */ + public Map sortSuggestDocs(AtomicArray resultSet) throws IOException { + List> results = resultSet.asList(); + if (results.isEmpty()) { + return Collections.emptyMap(); + } else if (results.size() == 1) { + Suggest suggest = results.get(0).value.queryResult().suggest(); + if (suggest == null || suggest.hasScoreDocs() == false) { + return Collections.emptyMap(); + } + return suggest.toNamedScoreDocs(results.get(0).index); + } else if (optimizeSingleShard) { + boolean canOptimize = false; + boolean noFetchRequired = true; + Suggest result = null; + int shardIndex = -1; + // optimize in case of finding single shard query result + for (AtomicArray.Entry entry : results) { + Suggest suggest = entry.value.queryResult().suggest(); + if (suggest != null && suggest.hasScoreDocs()) { + noFetchRequired = false; + if (result != null) { // we already have one, can't really optimize + canOptimize = false; + break; + } + canOptimize = true; + result = suggest; + shardIndex = entry.index; + } + } + if (canOptimize) { + return result.toNamedScoreDocs(shardIndex); + } else if (noFetchRequired) { + return Collections.emptyMap(); + } + } + + final Map> namedTopDocs = new HashMap<>(2); + // collect top n docs for every shard by suggestion name + for (AtomicArray.Entry result : results) { + Suggest suggest = result.value.queryResult().suggest(); + if (suggest == null) { + continue; + } + Map namedShardScoreDocs = suggest.toNamedScoreDocs(result.index); + for (Map.Entry entry : namedShardScoreDocs.entrySet()) { + String suggestionName = entry.getKey(); + ScoreDoc[] scoreDocs = entry.getValue(); + if (scoreDocs.length == 0) { + continue; + } + Map.Entry topDocsAndTopN = namedTopDocs.get(suggestionName); + if (topDocsAndTopN == null) { + topDocsAndTopN = new AbstractMap.SimpleEntry<>(suggest.topN(suggestionName), new TopDocs[results.size()]); + } + TopDocs[] topDocSet = topDocsAndTopN.getValue(); + topDocSet[result.index] = new TopDocs(scoreDocs.length, scoreDocs, scoreDocs[0].score); + namedTopDocs.put(suggestionName, topDocsAndTopN); + } + } + if (namedTopDocs.isEmpty()) { + return Collections.emptyMap(); + } + final Map namedScoreDocs = new HashMap<>(namedTopDocs.size()); + final int[] optionSizes = new int[results.size()]; + // get global top n docs for every suggestion name + // trim suggestions in query result to reflect the global top n + for (Map.Entry> entry : namedTopDocs.entrySet()) { + String suggestionName = entry.getKey(); + Map.Entry topDocsAndTopN = entry.getValue(); + if (topDocsAndTopN == null) { + continue; + } + TopDocs[] topDocsList = topDocsAndTopN.getValue(); + Integer topN = topDocsAndTopN.getKey(); + if (topDocsList != null) { + for (int i = 0; i < topDocsList.length; i++) { + if (topDocsList[i] == null) { + topDocsList[i] = Lucene.EMPTY_TOP_DOCS; + } + } + ScoreDoc[] scoreDocs = TopDocs.merge(topN, topDocsList).scoreDocs; + namedScoreDocs.put(suggestionName, scoreDocs); + for (ScoreDoc scoreDoc : scoreDocs) { + optionSizes[scoreDoc.shardIndex]++; + } + for (int i = 0; i < optionSizes.length; i++) { + Suggest suggest = results.get(i).value.queryResult().suggest(); + suggest.trim(suggestionName, optionSizes[i]); + optionSizes[i] = 0; + } + } + } + return namedScoreDocs; + } } diff --git a/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java b/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java index f915686620095..69329a73f4130 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java @@ -56,6 +56,7 @@ import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.lookup.SourceLookup; +import org.elasticsearch.search.suggest.completion.NamedDocIdsFetchPhase; import java.io.IOException; import java.util.ArrayList; @@ -75,10 +76,12 @@ public class FetchPhase implements SearchPhase { private final FetchSubPhase[] fetchSubPhases; @Inject - public FetchPhase(Set fetchSubPhases, InnerHitsFetchSubPhase innerHitsFetchSubPhase) { + public FetchPhase(Set fetchSubPhases, InnerHitsFetchSubPhase innerHitsFetchSubPhase, NamedDocIdsFetchPhase namedDocIdsFetchPhase) { innerHitsFetchSubPhase.setFetchPhase(this); - this.fetchSubPhases = fetchSubPhases.toArray(new FetchSubPhase[fetchSubPhases.size() + 1]); + namedDocIdsFetchPhase.setFetchPhase(this); + this.fetchSubPhases = fetchSubPhases.toArray(new FetchSubPhase[fetchSubPhases.size() + 2]); this.fetchSubPhases[fetchSubPhases.size()] = innerHitsFetchSubPhase; + this.fetchSubPhases[fetchSubPhases.size() + 1] = namedDocIdsFetchPhase; } @Override diff --git a/core/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java b/core/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java index ed8c0358dbb6e..53d42b7a0d11a 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java @@ -26,6 +26,7 @@ import org.elasticsearch.transport.TransportResponse; import java.io.IOException; +import java.util.*; import static org.elasticsearch.search.internal.InternalSearchHits.StreamContext; @@ -37,6 +38,7 @@ public class FetchSearchResult extends TransportResponse implements FetchSearchR private long id; private SearchShardTarget shardTarget; private InternalSearchHits hits; + private Map namedHits; // client side counter private transient int counter; @@ -77,6 +79,17 @@ public InternalSearchHits hits() { return hits; } + public Map namedHits() { + return namedHits; + } + + public void namedHits(String name, InternalSearchHits hits) { + if (namedHits == null) { + namedHits = new HashMap<>(); + } + namedHits.put(name, hits); + } + public FetchSearchResult initCounter() { counter = 0; return this; @@ -97,6 +110,16 @@ public void readFrom(StreamInput in) throws IOException { super.readFrom(in); id = in.readLong(); hits = InternalSearchHits.readSearchHits(in, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM)); + if (in.readBoolean()) { + int namedHitsSize = in.readVInt(); + namedHits = new HashMap<>(namedHitsSize); + for (int i = 0; i < namedHitsSize; i++) { + String name = in.readString(); + InternalSearchHits namedHitList = InternalSearchHits.readSearchHits(in, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM)); + namedHits.put(name, namedHitList); + } + } + } @Override @@ -104,5 +127,15 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeLong(id); hits.writeTo(out, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM)); + if (namedHits != null) { + out.writeBoolean(true); + out.writeVInt(namedHits.size()); + for (Map.Entry entry : namedHits.entrySet()) { + out.writeString(entry.getKey()); + entry.getValue().writeTo(out, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM)); + } + } else { + out.writeBoolean(false); + } } } diff --git a/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java b/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java index 803ab737cbccb..42d3c310ed9a0 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java @@ -22,15 +22,14 @@ import com.carrotsearch.hppc.IntArrayList; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.ScoreDoc; -import org.elasticsearch.Version; import org.elasticsearch.action.search.SearchScrollRequest; -import org.elasticsearch.action.search.type.ParsedScrollId; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.transport.TransportRequest; import java.io.IOException; +import java.util.*; /** * Shard level fetch base request. Holds all the info needed to execute a fetch. @@ -42,6 +41,8 @@ public class ShardFetchRequest extends TransportRequest { private int[] docIds; + private Map namedDocIds; + private int size; private ScoreDoc lastEmittedDoc; @@ -57,12 +58,19 @@ public ShardFetchRequest(SearchScrollRequest request, long id, IntArrayList list this.lastEmittedDoc = lastEmittedDoc; } - protected ShardFetchRequest(TransportRequest originalRequest, long id, IntArrayList list, ScoreDoc lastEmittedDoc) { + protected ShardFetchRequest(TransportRequest originalRequest, long id, IntArrayList list, Map namedList, ScoreDoc lastEmittedDoc) { super(originalRequest); this.id = id; this.docIds = list.buffer; this.size = list.size(); this.lastEmittedDoc = lastEmittedDoc; + if (namedList != null) { + this.namedDocIds = new HashMap<>(namedList.size()); + for (Map.Entry entry : namedList.entrySet()) { + IntArrayList docIdList = entry.getValue(); + namedDocIds.put(entry.getKey(), Arrays.copyOfRange(docIdList.buffer, 0, docIdList.size())); + } + } } public long id() { @@ -73,6 +81,11 @@ public int[] docIds() { return docIds; } + public Map namedDocIds() { + return namedDocIds; + } + + public int docIdsSize() { return size; } @@ -98,6 +111,19 @@ public void readFrom(StreamInput in) throws IOException { } else if (flag != 0) { throw new IOException("Unknown flag: " + flag); } + if (in.readBoolean()) { + int namedDocIdSize = in.readVInt(); + namedDocIds = new HashMap<>(namedDocIdSize); + for (int i = 0; i < namedDocIdSize; i++) { + String name = in.readString(); + int docIdSize = in.readVInt(); + int[] docIds = new int[docIdSize]; + for (int j = 0; j < docIdSize; j++) { + docIds[j] = in.readVInt(); + } + namedDocIds.put(name, docIds); + } + } } @Override @@ -117,5 +143,19 @@ public void writeTo(StreamOutput out) throws IOException { out.writeByte((byte) 2); Lucene.writeScoreDoc(out, lastEmittedDoc); } + if (namedDocIds != null) { + out.writeBoolean(true); + out.writeVInt(namedDocIds.size()); + for (Map.Entry entry : namedDocIds.entrySet()) { + int[] docIds = entry.getValue(); + out.writeString(entry.getKey()); + out.writeVInt(docIds.length); + for (int docId : docIds) { + out.writeVInt(docId); + } + } + } else { + out.writeBoolean(false); + } } } diff --git a/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java b/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java index cc53b48f1351a..7a939357d9ee9 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; +import java.util.Map; /** * Shard level fetch request used with search. Holds indices taken from the original search request @@ -41,12 +42,12 @@ public class ShardFetchSearchRequest extends ShardFetchRequest implements Indice public ShardFetchSearchRequest() { } - public ShardFetchSearchRequest(SearchRequest request, long id, IntArrayList list) { - this(request, id, list, null); + public ShardFetchSearchRequest(SearchRequest request, long id, IntArrayList list, Map namedList) { + this(request, id, list, namedList, null); } - public ShardFetchSearchRequest(SearchRequest request, long id, IntArrayList list, ScoreDoc lastEmittedDoc) { - super(request, id, list, lastEmittedDoc); + public ShardFetchSearchRequest(SearchRequest request, long id, IntArrayList list, Map namedList, ScoreDoc lastEmittedDoc) { + super(request, id, list, namedList, lastEmittedDoc); this.originalIndices = new OriginalIndices(request); } diff --git a/core/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java b/core/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java index fdaac5e96aa18..a25757e4a52e5 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java +++ b/core/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java @@ -127,6 +127,7 @@ public class DefaultSearchContext extends SearchContext { private int[] docIdsToLoad; private int docsIdsToLoadFrom; private int docsIdsToLoadSize; + private Map namedDocIds; private SearchContextAggregations aggregations; private SearchContextHighlight highlight; private SuggestionSearchContext suggest; @@ -643,6 +644,17 @@ public SearchContext docIdsToLoad(int[] docIdsToLoad, int docsIdsToLoadFrom, int return this; } + @Override + public Map namedDocIdsToLoad() { + return namedDocIds; + } + + @Override + public SearchContext namedDocIdsToLoad(Map namedDocIds) { + this.namedDocIds = namedDocIds; + return this; + } + @Override public void accessed(long accessTime) { this.lastAccessTime = accessTime; diff --git a/core/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java b/core/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java index 2f79d03234e6a..8fe3d5554b5d9 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java +++ b/core/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java @@ -461,6 +461,17 @@ public SearchContext docIdsToLoad(int[] docIdsToLoad, int docsIdsToLoadFrom, int return in.docIdsToLoad(docIdsToLoad, docsIdsToLoadFrom, docsIdsToLoadSize); } + @Override + public Map namedDocIdsToLoad() { + return in.namedDocIdsToLoad(); + } + + @Override + public SearchContext namedDocIdsToLoad(Map namedDocIds) { + in.namedDocIdsToLoad(namedDocIds); + return this; + } + @Override public void accessed(long accessTime) { in.accessed(accessTime); diff --git a/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java b/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java index 7a349cfbbb927..cb7b8aaac560c 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java +++ b/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java @@ -431,6 +431,13 @@ public static class Fields { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + innerToXContent(builder, params); + builder.endObject(); + return builder; + } + + public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException { List metaFields = new ArrayList<>(); List otherFields = new ArrayList<>(); if (fields != null && !fields.isEmpty()) { @@ -446,7 +453,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } } - builder.startObject(); // For inner_hit hits shard is null and that is ok, because the parent search hit has all this information. // Even if this was included in the inner_hit hits this would be the same, so better leave it out. if (explanation() != null && shard != null) { @@ -529,7 +535,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } builder.endObject(); } - builder.endObject(); return builder; } diff --git a/core/src/main/java/org/elasticsearch/search/internal/SearchContext.java b/core/src/main/java/org/elasticsearch/search/internal/SearchContext.java index a7e45d18fb67a..df856cb153f99 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/SearchContext.java +++ b/core/src/main/java/org/elasticsearch/search/internal/SearchContext.java @@ -292,6 +292,10 @@ public final boolean nowInMillisUsed() { public abstract SearchContext docIdsToLoad(int[] docIdsToLoad, int docsIdsToLoadFrom, int docsIdsToLoadSize); + public abstract Map namedDocIdsToLoad(); + + public abstract SearchContext namedDocIdsToLoad(Map namedDocIds); + public abstract void accessed(long accessTime); public abstract long lastAccessTime(); diff --git a/core/src/main/java/org/elasticsearch/search/suggest/Suggest.java b/core/src/main/java/org/elasticsearch/search/suggest/Suggest.java index a4920e6826ffc..d261da2115e2a 100644 --- a/core/src/main/java/org/elasticsearch/search/suggest/Suggest.java +++ b/core/src/main/java/org/elasticsearch/search/suggest/Suggest.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.search.suggest; +import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.ElasticsearchException; @@ -64,6 +65,8 @@ public int compare(Option first, Option second) { private Map>> suggestMap; + private Map completionSuggestionMap; + public Suggest() { this.name = null; } @@ -92,7 +95,92 @@ public Iterator>> iterator() { public int size() { return suggestions.size(); } - + + /** + * Get the request size of suggestions for name + * TODO: only works for CompletionSuggestion + */ + public int topN(String name) { + loadCompletionMap(); + CompletionSuggestion suggestion = completionSuggestionMap.get(name); + if (suggestion != null) { + return suggestion.getSize(); + } + throw new IllegalStateException("[" + name + "] is not found"); + } + + /** + * Trim name suggestions to a size of size + * TODO: only works for CompletionSuggestion + */ + public void trim(String name, int size) { + loadCompletionMap(); + CompletionSuggestion completionSuggestion = completionSuggestionMap.get(name); + if (completionSuggestion != null) { + completionSuggestion.trim(size); + } + } + + public Suggestion> getCompletionSuggestion(String name) { + loadCompletionMap(); + return completionSuggestionMap.get(name); + } + + /** + * Whether any suggestions had query hits + */ + public boolean hasScoreDocs() { + loadCompletionMap(); + for (CompletionSuggestion suggestion : completionSuggestionMap.values()) { + ScoreDoc[] scoreDocs = suggestion.getScoreDocs(); + if (scoreDocs != null) { + return true; + } + } + return false; + } + + /** + * Returns a map of suggestion names and their hits, setting + * shardIndex for every doc + */ + public Map toNamedScoreDocs(int shardIndex) { + loadCompletionMap(); + Map namedScoreDocs = new HashMap<>(completionSuggestionMap.size()); + for (Map.Entry suggestionEntry : completionSuggestionMap.entrySet()) { + ScoreDoc[] scoreDocs = suggestionEntry.getValue().getScoreDocs(); + if (shardIndex >= 0) { + for (ScoreDoc scoreDoc : scoreDocs) { + scoreDoc.shardIndex = shardIndex; + } + } + namedScoreDocs.put(suggestionEntry.getKey(), scoreDocs); + } + return namedScoreDocs; + } + + private void loadCompletionMap() { + if (completionSuggestionMap == null) { + if (suggestions.size() == 0) { + completionSuggestionMap = Collections.emptyMap(); + } else if (suggestions.size() == 1) { + if (suggestions.get(0) instanceof CompletionSuggestion) { + completionSuggestionMap = Collections.singletonMap(suggestions.get(0).getName(), ((CompletionSuggestion) suggestions.get(0))); + } else { + completionSuggestionMap = Collections.emptyMap(); + } + } else { + completionSuggestionMap = new HashMap<>(suggestions.size()); + for (Suggestion> suggestion : suggestions) { + if (suggestion instanceof CompletionSuggestion) { + CompletionSuggestion prev = completionSuggestionMap.put(suggestion.getName(), ((CompletionSuggestion) suggestion)); + assert prev == null; + } + } + } + } + } + public >> T getSuggestion(String name) { if (suggestions.isEmpty() || name == null) { return null; @@ -147,18 +235,18 @@ public void writeTo(StreamOutput out) throws IOException { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - if(name == null) { + final boolean useSuggestName = params.paramAsBoolean("use_suggest_namespace", true); + if (useSuggestName && name != null) { + builder.startObject(name); for (Suggestion suggestion : suggestions) { suggestion.toXContent(builder, params); } + builder.endObject(); } else { - builder.startObject(name); for (Suggestion suggestion : suggestions) { suggestion.toXContent(builder, params); } - builder.endObject(); } - return builder; } @@ -227,11 +315,23 @@ public int getType() { return TYPE; } + public int getSize() { + return size; + } + @Override public Iterator iterator() { return entries.iterator(); } + public ScoreDoc[] getScoreDocs() { + return null; + } + + public List options() { + return null; + } + /** * @return The entries for this suggestion. */ @@ -450,7 +550,7 @@ public List getOptions() { return options; } - void trim(int size) { + public void trim(int size) { int optionsToRemove = Math.max(0, options.size() - size); for (int i = 0; i < optionsToRemove; i++) { options.remove(options.size() - 1); diff --git a/core/src/main/java/org/elasticsearch/search/suggest/SuggestBinaryParseElement.java b/core/src/main/java/org/elasticsearch/search/suggest/SuggestBinaryParseElement.java new file mode 100644 index 0000000000000..30961b98a253c --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/suggest/SuggestBinaryParseElement.java @@ -0,0 +1,45 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.suggest; + +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.internal.SearchContext; + +/** + * + */ +public final class SuggestBinaryParseElement extends SuggestParseElement { + + @Inject + public SuggestBinaryParseElement(Suggesters suggesters) { + super(suggesters); + } + + @Override + public void parse(XContentParser parser, SearchContext context) throws Exception { + byte[] suggestSource = parser.binaryValue(); + assert suggestSource.length > 0; + try (XContentParser suggestSourceParser = XContentFactory.xContent(suggestSource).createParser(suggestSource)) { + suggestSourceParser.nextToken(); // move past the first START_OBJECT + super.parse(suggestSourceParser, context); + } + } +} diff --git a/core/src/main/java/org/elasticsearch/search/suggest/SuggestParseElement.java b/core/src/main/java/org/elasticsearch/search/suggest/SuggestParseElement.java index 74ddf6a049812..3481888894cd6 100644 --- a/core/src/main/java/org/elasticsearch/search/suggest/SuggestParseElement.java +++ b/core/src/main/java/org/elasticsearch/search/suggest/SuggestParseElement.java @@ -21,23 +21,19 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.search.SearchParseElement; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.suggest.SuggestionSearchContext.SuggestionContext; import org.elasticsearch.search.suggest.completion.CompletionSuggestParser; import org.elasticsearch.search.suggest.completion.old.CompletionSuggester; -import java.io.IOException; +import java.util.HashMap; import java.util.Map; -import static com.google.common.collect.Maps.newHashMap; - /** * */ -public final class SuggestParseElement implements SearchParseElement { +public class SuggestParseElement implements SearchParseElement { private Suggesters suggesters; @Inject @@ -47,16 +43,11 @@ public SuggestParseElement(Suggesters suggesters) { @Override public void parse(XContentParser parser, SearchContext context) throws Exception { - SuggestionSearchContext suggestionSearchContext = parseInternal(parser, context.mapperService(), context.queryParserService(), context.shardTarget().index(), context.shardTarget().shardId()); - context.suggest(suggestionSearchContext); - } - - public SuggestionSearchContext parseInternal(XContentParser parser, MapperService mapperService, IndexQueryParserService queryParserService, String index, int shardId) throws IOException { SuggestionSearchContext suggestionSearchContext = new SuggestionSearchContext(); BytesRef globalText = null; String fieldName = null; - Map suggestionContexts = newHashMap(); + Map suggestionContexts = new HashMap<>(); XContentParser.Token token; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { @@ -99,7 +90,7 @@ public SuggestionSearchContext parseInternal(XContentParser parser, MapperServic if (contextParser instanceof CompletionSuggestParser) { ((CompletionSuggestParser) contextParser).setOldCompletionSuggester(((CompletionSuggester) suggesters.get("completion_old"))); } - suggestionContext = contextParser.parse(parser, mapperService, queryParserService); + suggestionContext = contextParser.parse(parser, context.mapperService(), context.queryParserService()); } } if (suggestionContext != null) { @@ -115,7 +106,6 @@ public SuggestionSearchContext parseInternal(XContentParser parser, MapperServic } suggestionContexts.put(suggestionName, suggestionContext); } - } } @@ -123,12 +113,11 @@ public SuggestionSearchContext parseInternal(XContentParser parser, MapperServic String suggestionName = entry.getKey(); SuggestionContext suggestionContext = entry.getValue(); - suggestionContext.setShard(shardId); - suggestionContext.setIndex(index); - SuggestUtils.verifySuggestion(mapperService, globalText, suggestionContext); + suggestionContext.setShard(context.shardTarget().shardId()); + suggestionContext.setIndex(context.shardTarget().getIndex()); + SuggestUtils.verifySuggestion(context.mapperService(), globalText, suggestionContext); suggestionSearchContext.addSuggestion(suggestionName, suggestionContext); } - - return suggestionSearchContext; + context.suggest(suggestionSearchContext); } } diff --git a/core/src/main/java/org/elasticsearch/search/suggest/SuggestPhase.java b/core/src/main/java/org/elasticsearch/search/suggest/SuggestPhase.java index 58a4502abf845..258ef682be182 100644 --- a/core/src/main/java/org/elasticsearch/search/suggest/SuggestPhase.java +++ b/core/src/main/java/org/elasticsearch/search/suggest/SuggestPhase.java @@ -27,6 +27,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.search.SearchParseElement; import org.elasticsearch.search.SearchPhase; +import org.elasticsearch.search.fetch.ShardFetchRequest; +import org.elasticsearch.search.internal.ContextIndexSearcher; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.suggest.Suggest.Suggestion; import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry; @@ -44,52 +46,46 @@ public class SuggestPhase extends AbstractComponent implements SearchPhase { private final SuggestParseElement parseElement; + private final SuggestBinaryParseElement binaryParseElement; + @Inject - public SuggestPhase(Settings settings, SuggestParseElement suggestParseElement) { + public SuggestPhase(Settings settings, SuggestParseElement suggestParseElement, SuggestBinaryParseElement binaryParseElement) { super(settings); this.parseElement = suggestParseElement; + this.binaryParseElement = binaryParseElement; } @Override public Map parseElements() { ImmutableMap.Builder parseElements = ImmutableMap.builder(); parseElements.put("suggest", parseElement); + parseElements.put("suggest_binary", binaryParseElement); return parseElements.build(); } - public SuggestParseElement parseElement() { - return parseElement; - } - @Override public void preProcess(SearchContext context) { } @Override public void execute(SearchContext context) { - final SuggestionSearchContext suggest = context.suggest(); - if (suggest == null) { - return; - } - context.queryResult().suggest(execute(suggest, context.searcher())); - } - - public Suggest execute(SuggestionSearchContext suggest, IndexSearcher searcher) { try { - CharsRefBuilder spare = new CharsRefBuilder(); - final List>> suggestions = new ArrayList<>(suggest.suggestions().size()); - - for (Map.Entry entry : suggest.suggestions().entrySet()) { - SuggestionSearchContext.SuggestionContext suggestion = entry.getValue(); - Suggester suggester = suggestion.getSuggester(); - Suggestion> result = suggester.execute(entry.getKey(), suggestion, searcher, spare); - if (result != null) { - assert entry.getKey().equals(result.name); - suggestions.add(result); + final ContextIndexSearcher searcher = context.searcher(); + final SuggestionSearchContext suggest = context.suggest(); + if (suggest != null) { + CharsRefBuilder spare = new CharsRefBuilder(); + final List>> suggestions = new ArrayList<>(suggest.suggestions().size()); + for (Map.Entry entry : suggest.suggestions().entrySet()) { + SuggestionSearchContext.SuggestionContext suggestion = entry.getValue(); + Suggester suggester = suggestion.getSuggester(); + Suggestion> result = suggester.execute(entry.getKey(), suggestion, searcher, spare); + if (result != null) { + assert entry.getKey().equals(result.name); + suggestions.add(result); + } } + context.queryResult().suggest(new Suggest(Suggest.Fields.SUGGEST, suggestions)); } - - return new Suggest(Suggest.Fields.SUGGEST, suggestions); } catch (IOException e) { throw new ElasticsearchException("I/O exception during suggest phase", e); } diff --git a/core/src/main/java/org/elasticsearch/search/suggest/Suggesters.java b/core/src/main/java/org/elasticsearch/search/suggest/Suggesters.java index c9ff10c8d385c..110795f115c9f 100644 --- a/core/src/main/java/org/elasticsearch/search/suggest/Suggesters.java +++ b/core/src/main/java/org/elasticsearch/search/suggest/Suggesters.java @@ -38,7 +38,7 @@ public Suggesters() { } public Suggesters(Map suggesters) { - super("suggester", Suggester.class, new HashSet<>(Arrays.asList("phrase", "term", "completion", "completion_old")), Suggesters.class, SuggestParseElement.class, SuggestPhase.class); + super("suggester", Suggester.class, new HashSet<>(Arrays.asList("phrase", "term", "completion", "completion_old")), Suggesters.class, SuggestParseElement.class, SuggestBinaryParseElement.class, SuggestPhase.class); this.parsers = Collections.unmodifiableMap(suggesters); } diff --git a/core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggester.java b/core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggester.java index 3cbfcee6b3aae..e283640e641ab 100644 --- a/core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggester.java +++ b/core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggester.java @@ -24,21 +24,17 @@ import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Weight; import org.apache.lucene.search.suggest.xdocument.CompletionQuery; -import org.apache.lucene.search.suggest.xdocument.TopSuggestDocs; import org.apache.lucene.search.suggest.xdocument.TopSuggestDocsCollector; import org.apache.lucene.util.CharsRefBuilder; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.unit.Fuzziness; import org.elasticsearch.index.mapper.core.CompletionFieldMapper; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.SuggestContextParser; import org.elasticsearch.search.suggest.Suggester; -import org.elasticsearch.search.suggest.completion.CompletionSuggestion.Entry.Option; import org.elasticsearch.search.suggest.completion.context.ContextMappings; import java.io.IOException; -import java.util.*; public class CompletionSuggester extends Suggester { @@ -55,43 +51,9 @@ protected Suggest.Suggestion results = new LinkedHashMap<>(suggestionContext.getSize()); TopSuggestDocsCollector collector = new TopSuggestDocsCollector(suggestionContext.getSize()); suggest(searcher, toQuery(suggestionContext), collector); - for (TopSuggestDocs.SuggestScoreDoc suggestDoc : collector.get().scoreLookupDocs()) { - // TODO: currently we can get multiple entries with the same docID - // this has to be fixed at the lucene level - // This has other implications: - // if we index a suggestion with n contexts, the suggestion and all its contexts - // would count as n hits rather than 1, so we have to multiply the desired size - // with n to get a suggestion with all n contexts - final String key = suggestDoc.key.toString(); - final float score = suggestDoc.score; - final Map.Entry contextEntry; - if (suggestionContext.fieldType().hasContextMappings() && suggestDoc.context != null) { - contextEntry = suggestionContext.fieldType().getContextMappings().getNamedContext(suggestDoc.context); - } else { - assert suggestDoc.context == null; - contextEntry = null; - } - final Option value = results.get(suggestDoc.doc); - if (value == null) { - final Option option = new Option(suggestDoc.doc, new StringText(key), score, contextEntry); - results.put(suggestDoc.doc, option); - } else { - value.addContextEntry(contextEntry); - if (value.getScore() < score) { - value.setScore(score); - } - } - } - final List