Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.search.fetch.FetchSubPhase;
import org.opensearch.search.fetch.subphase.highlight.Highlighter;
import org.opensearch.search.query.QueryCollectorContextSpecFactory;
import org.opensearch.search.query.QueryPhaseSearcher;
import org.opensearch.search.rescore.Rescorer;
import org.opensearch.search.rescore.RescorerBuilder;
Expand Down Expand Up @@ -227,6 +228,10 @@ default Optional<ExecutorServiceProvider> getIndexSearcherExecutorProvider() {
return Optional.empty();
}

default List<QueryCollectorContextSpecFactory> getCollectorContextSpecFactories() {
return emptyList();
}

/**
* Executor service provider
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@
import org.opensearch.search.fetch.subphase.highlight.Highlighter;
import org.opensearch.search.fetch.subphase.highlight.PlainHighlighter;
import org.opensearch.search.fetch.subphase.highlight.UnifiedHighlighter;
import org.opensearch.search.query.QueryCollectorContextSpecRegistry;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.search.query.QueryPhaseSearcher;
import org.opensearch.search.query.QueryPhaseSearcherWrapper;
Expand Down Expand Up @@ -350,6 +351,7 @@ public SearchModule(Settings settings, List<SearchPlugin> plugins) {
indexSearcherExecutorProvider = registerIndexSearcherExecutorProvider(plugins);
namedWriteables.addAll(SortValue.namedWriteables());
concurrentSearchDeciderFactories = registerConcurrentSearchDeciderFactories(plugins);
registerQueryCollectorContextSpec(plugins);
}

private Collection<ConcurrentSearchRequestDecider.Factory> registerConcurrentSearchDeciderFactories(List<SearchPlugin> plugins) {
Expand Down Expand Up @@ -1297,6 +1299,10 @@ private SearchPlugin.ExecutorServiceProvider registerIndexSearcherExecutorProvid
return provider;
}

private void registerQueryCollectorContextSpec(List<SearchPlugin> plugins) {
registerFromPlugin(plugins, SearchPlugin::getCollectorContextSpecFactories, QueryCollectorContextSpecRegistry::registerFactory);
}

public FetchPhase getFetchPhase() {
return new FetchPhase(fetchSubPhases);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.query;

import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;

import java.io.IOException;

/**
* interface of QueryCollectorContextSpec
*/
public interface QueryCollectorContextSpec {
String getProfileName();

Collector create(Collector in) throws IOException;

CollectorManager<?, ReduceableSearchResult> createManager(CollectorManager<?, ReduceableSearchResult> in) throws IOException;

void postProcess(QuerySearchResult result) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.query;

import org.apache.lucene.search.Query;
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;

/**
* interface of QueryCollectorContext spec factory
*/
public interface QueryCollectorContextSpecFactory {
/**
* check if query is supported to initialize the factory
* @param query sent in the search request
* @return true if query satisfies the factory initialization criteria
*/
boolean supports(Query query);

/**
*
* @param searchContext context needed to create collector context spec
* @param hasFilterCollector flag true if filter collector there
* @return QueryCollectorContextSpec
* @throws IOException
*/
QueryCollectorContextSpec createQueryCollectorContextSpec(SearchContext searchContext, boolean hasFilterCollector) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.query;

import org.apache.lucene.search.Query;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

/**
* Registry class to load all collector context spec factories during cluster bootstrapping
*/
public class QueryCollectorContextSpecRegistry {

private static final List<QueryCollectorContextSpecFactory> registry = new CopyOnWriteArrayList<>();

static QueryCollectorContextSpecFactory getFactory(Query query) {

return registry.stream().filter(entry -> entry.supports(query)).findFirst().orElse(null);
}

/**
* Register factory
* @param factory collector context spec factory defined in plugin
*/
public static void registerFactory(QueryCollectorContextSpecFactory factory) {
registry.add(factory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,35 @@ protected boolean searchWithCollector(
boolean hasTimeout
) throws IOException {
// create the top docs collector last when the other collectors are known
final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, hasFilterCollector);
return searchWithCollector(searchContext, searcher, query, collectors, topDocsFactory, hasFilterCollector, hasTimeout);
QueryCollectorContext queryCollectorContext;
QueryCollectorContextSpecFactory queryCollectorContextSpecFactory = QueryCollectorContextSpecRegistry.getFactory(query);
if (queryCollectorContextSpecFactory == null) {
queryCollectorContext = createTopDocsCollectorContext(searchContext, hasFilterCollector);
} else {
QueryCollectorContextSpec queryCollectorContextSpec = queryCollectorContextSpecFactory.createQueryCollectorContextSpec(
searchContext,
hasFilterCollector
);
queryCollectorContext = new QueryCollectorContext(queryCollectorContextSpec.getProfileName()) {
@Override
Collector create(Collector in) throws IOException {
return queryCollectorContextSpec.create(in);
}

@Override
CollectorManager<?, ReduceableSearchResult> createManager(CollectorManager<?, ReduceableSearchResult> in)
throws IOException {
return queryCollectorContextSpec.createManager(in);
}

@Override
void postProcess(QuerySearchResult result) throws IOException {
queryCollectorContextSpec.postProcess(result);
}
};
}

return searchWithCollector(searchContext, searcher, query, collectors, queryCollectorContext, hasFilterCollector, hasTimeout);
}

protected boolean searchWithCollector(
Expand Down
Loading