diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 09c17d4c36029..c6c7e204ed3f4 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -778,6 +778,8 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING, // deprecated SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING, SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE, + SearchService.CLUSTER_CONCURRENT_INTRA_SEGMENT_SEARCH_MODE, + SearchService.CONCURRENT_INTRA_SEGMENT_SEARCH_PARTITION_SIZE_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index ba442aaddae82..82ed7712b5b01 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -250,6 +250,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING, // deprecated IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_MODE, IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_MAX_SLICE_COUNT, + IndexSettings.INDEX_CONCURRENT_INTRA_SEGMENT_SEARCH_MODE, + IndexSettings.INDEX_CONCURRENT_INTRA_SEGMENT_PARTITION_SIZE, IndexSettings.ALLOW_DERIVED_FIELDS, // Settings for star tree index diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 1e94cad4e01d9..b821f7cd53511 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -75,6 +75,8 @@ import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING; import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING; import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING; +import static org.opensearch.search.SearchService.CONCURRENT_INTRA_SEGMENT_DEFAULT_PARTITION_SIZE_VALUE; +import static org.opensearch.search.SearchService.CONCURRENT_INTRA_SEGMENT_MINIMUM_PARTITION_SIZE_VALUE; import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_DEFAULT_SLICE_COUNT_VALUE; import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_MIN_SLICE_COUNT_VALUE; import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_MODE_ALL; @@ -713,6 +715,7 @@ public static IndexMergePolicy fromString(String text) { Property.Deprecated ); + // TODO : Should use enum public static final Setting INDEX_CONCURRENT_SEGMENT_SEARCH_MODE = Setting.simpleString( "index.search.concurrent_segment_search.mode", CONCURRENT_SEGMENT_SEARCH_MODE_NONE, @@ -739,6 +742,32 @@ public static IndexMergePolicy fromString(String text) { Property.IndexScope ); + public static final Setting INDEX_CONCURRENT_INTRA_SEGMENT_SEARCH_MODE = Setting.simpleString( + "index.search.concurrent_intra_segment_search.mode", + CONCURRENT_SEGMENT_SEARCH_MODE_NONE, + value -> { + // TODO : Add support for Auto mode with Intra Segment Search + switch (value) { + case CONCURRENT_SEGMENT_SEARCH_MODE_ALL: + case CONCURRENT_SEGMENT_SEARCH_MODE_NONE: + // valid setting + break; + default: + throw new IllegalArgumentException("Setting value must be one of [all, none]"); + } + }, + Property.Dynamic, + Property.IndexScope + ); + + public static final Setting INDEX_CONCURRENT_INTRA_SEGMENT_PARTITION_SIZE = Setting.intSetting( + "index.search.concurrent_intra_segment_search.partition_size", + CONCURRENT_INTRA_SEGMENT_DEFAULT_PARTITION_SIZE_VALUE, + CONCURRENT_INTRA_SEGMENT_MINIMUM_PARTITION_SIZE_VALUE, + Property.Dynamic, + Property.IndexScope + ); + public static final Setting INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING = Setting.boolSetting( "index.optimize_doc_id_lookup.fuzzy_set.enabled", false, diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index 0dd4c3344af1e..3af8ee1b2c893 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -121,6 +121,7 @@ import static org.opensearch.search.SearchService.AGGREGATION_REWRITE_FILTER_SEGMENT_THRESHOLD; import static org.opensearch.search.SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD; +import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_INTRA_SEGMENT_SEARCH_MODE; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_MODE_ALL; @@ -210,7 +211,9 @@ final class DefaultSearchContext extends SearchContext { private final FetchPhase fetchPhase; private final Function requestToAggReduceContextBuilder; private final String concurrentSearchMode; + private final String concurrentIntraSegmentSearchMode; private final SetOnce requestShouldUseConcurrentSearch = new SetOnce<>(); + private final SetOnce requestShouldUseConcurrentIntraSegmentSearch = new SetOnce<>(); private final int maxAggRewriteFilters; private final int filterRewriteSegmentThreshold; private final int cardinalityAggregationPruningThreshold; @@ -247,6 +250,7 @@ final class DefaultSearchContext extends SearchContext { this.clusterService = clusterService; this.engineSearcher = readerContext.acquireSearcher("search"); this.concurrentSearchMode = evaluateConcurrentSearchMode(executor); + this.concurrentIntraSegmentSearchMode = evaluateConcurrentIntraSegmentSearchMode(executor); this.searcher = new ContextIndexSearcher( engineSearcher.getIndexReader(), engineSearcher.getSimilarity(), @@ -1018,6 +1022,31 @@ && aggregations().factories() != null } } + @Override + public boolean shouldUseIntraSegmentConcurrentSearch() { + assert requestShouldUseConcurrentIntraSegmentSearch.get() != null : "requestShouldUseConcurrentIntraSegmentSearch must be set"; + assert concurrentIntraSegmentSearchMode != null : "concurrentIntraSegmentSearchMode must be set"; + // TODO : Handle auto mode here + return (concurrentSearchMode.equals(CONCURRENT_SEGMENT_SEARCH_MODE_ALL)) + && Boolean.TRUE.equals(requestShouldUseConcurrentIntraSegmentSearch.get()); + } + + public void evaluateRequestShouldUseIntraSegmentConcurrentSearch() { + if (sort != null && sort.isSortOnTimeSeriesField()) { + requestShouldUseConcurrentIntraSegmentSearch.set(false); + } else if (aggregations() != null) { + requestShouldUseConcurrentIntraSegmentSearch.set(false); + } else if (terminateAfter != DEFAULT_TERMINATE_AFTER) { + requestShouldUseConcurrentIntraSegmentSearch.set(false); + } else if (trackTotalHitsUpTo != TRACK_TOTAL_HITS_DISABLED) { // TODO : Need to handle TotalHitCountCollectorManager + requestShouldUseConcurrentIntraSegmentSearch.set(false); + } else if (concurrentIntraSegmentSearchMode.equals(CONCURRENT_SEGMENT_SEARCH_MODE_ALL)) { // TODO : Handle auto mode here + requestShouldUseConcurrentIntraSegmentSearch.set(true); + } else { + requestShouldUseConcurrentIntraSegmentSearch.set(false); + } + } + public void setProfilers(Profilers profilers) { this.profilers = profilers; } @@ -1115,6 +1144,23 @@ private String evaluateConcurrentSearchMode(Executor concurrentSearchExecutor) { ); } + private String evaluateConcurrentIntraSegmentSearchMode(Executor concurrentSearchExecutor) { + // Skip concurrent search for system indices, throttled requests, or if dependencies are missing + if (indexShard.isSystem() + || indexShard.indexSettings().isSearchThrottled() + || clusterService == null + || concurrentSearchExecutor == null) { + return CONCURRENT_SEGMENT_SEARCH_MODE_NONE; + } + + Settings indexSettings = indexService.getIndexSettings().getSettings(); + ClusterSettings clusterSettings = clusterService.getClusterSettings(); + return indexSettings.get( + IndexSettings.INDEX_CONCURRENT_INTRA_SEGMENT_SEARCH_MODE.getKey(), + clusterSettings.get(CLUSTER_CONCURRENT_INTRA_SEGMENT_SEARCH_MODE) + ); + } + /** * Returns the target maximum slice count to use for concurrent segment search. * @@ -1142,6 +1188,16 @@ public int getTargetMaxSliceCount() { } + @Override + public int getSegmentPartitionSize() { + return indexService.getIndexSettings() + .getSettings() + .getAsInt( + IndexSettings.INDEX_CONCURRENT_INTRA_SEGMENT_PARTITION_SIZE.getKey(), + clusterService.getClusterSettings().get(SearchService.CONCURRENT_INTRA_SEGMENT_SEARCH_PARTITION_SIZE_SETTING) + ); + } + @Override public boolean shouldUseTimeSeriesDescSortOptimization() { return indexShard.isTimeSeriesDescSortOptimizationEnabled() diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 6baf3b514198e..31764eb6d132e 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -273,6 +273,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Property.Deprecated ); + // TODO : Put these three into an enum // Allow concurrent segment search for all requests public static final String CONCURRENT_SEGMENT_SEARCH_MODE_ALL = "all"; @@ -285,6 +286,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv public static final Setting CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE = Setting.simpleString( "search.concurrent_segment_search.mode", CONCURRENT_SEGMENT_SEARCH_MODE_AUTO, + // TODO : This should go inside the enum value -> { switch (value) { case CONCURRENT_SEGMENT_SEARCH_MODE_ALL: @@ -314,6 +316,38 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Property.Dynamic, Property.NodeScope ); + + public static final String CONCURRENT_INTRA_SEGMENT_PARTITION_SIZE_KEY = "search.concurrent_intra_segment_search.partition_size"; + public static final int CONCURRENT_INTRA_SEGMENT_DEFAULT_PARTITION_SIZE_VALUE = 10_000; + public static final int CONCURRENT_INTRA_SEGMENT_MINIMUM_PARTITION_SIZE_VALUE = 2; + + public static final Setting CLUSTER_CONCURRENT_INTRA_SEGMENT_SEARCH_MODE = Setting.simpleString( + "search.concurrent_intra_segment_search.mode", + CONCURRENT_SEGMENT_SEARCH_MODE_NONE, + // TODO : This should go inside the enum + value -> { + switch (value) { + // TODO : Handle auto mode. + case CONCURRENT_SEGMENT_SEARCH_MODE_ALL: + case CONCURRENT_SEGMENT_SEARCH_MODE_NONE: + // valid setting + break; + default: + throw new IllegalArgumentException("Setting value must be one of [all, none]"); + } + }, + Property.Dynamic, + Property.NodeScope + ); + + public static final Setting CONCURRENT_INTRA_SEGMENT_SEARCH_PARTITION_SIZE_SETTING = Setting.intSetting( + CONCURRENT_INTRA_SEGMENT_PARTITION_SIZE_KEY, + CONCURRENT_INTRA_SEGMENT_DEFAULT_PARTITION_SIZE_VALUE, + CONCURRENT_INTRA_SEGMENT_MINIMUM_PARTITION_SIZE_VALUE, + Property.Dynamic, + Property.NodeScope + ); + // value 0 means rewrite filters optimization in aggregations will be disabled @ExperimentalApi public static final Setting MAX_AGGREGATION_REWRITE_FILTERS = Setting.intSetting( @@ -1383,6 +1417,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc // nothing to parse... if (source == null) { context.evaluateRequestShouldUseConcurrentSearch(); + context.evaluateRequestShouldUseIntraSegmentConcurrentSearch(); return; } @@ -1563,6 +1598,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc context.collapse(collapseContext); } context.evaluateRequestShouldUseConcurrentSearch(); + context.evaluateRequestShouldUseIntraSegmentConcurrentSearch(); if (source.profile()) { final Function>> pluginProfileMetricsSupplier = (query) -> pluginProfilers.stream() .flatMap(p -> p.getQueryProfileMetrics(context, query).stream()) diff --git a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java index 6cb018320e4f0..27f2d67214de4 100644 --- a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java @@ -537,7 +537,7 @@ public CollectionStatistics collectionStatistics(String field) throws IOExceptio */ @Override protected LeafSlice[] slices(List leaves) { - return slicesInternal(leaves, searchContext.getTargetMaxSliceCount()); + return slicesInternal(leaves, new MaxTargetSliceSupplier.SliceInputConfig(searchContext)); } public DirectoryReader getDirectoryReader() { @@ -607,17 +607,53 @@ private boolean canMatchSearchAfter(LeafReaderContext ctx) throws IOException { } // package-private for testing - LeafSlice[] slicesInternal(List leaves, int targetMaxSlice) { + LeafSlice[] slicesInternal(List leaves, MaxTargetSliceSupplier.SliceInputConfig sliceInputConfig) { LeafSlice[] leafSlices; - if (targetMaxSlice == 0) { + if (sliceInputConfig.targetMaxSliceCount == 0) { // use the default lucene slice calculation leafSlices = super.slices(leaves); logger.debug("Slice count using lucene default [{}]", leafSlices.length); } else { // use the custom slice calculation based on targetMaxSlice - leafSlices = MaxTargetSliceSupplier.getSlices(leaves, targetMaxSlice); + leafSlices = MaxTargetSliceSupplier.getSlices(leaves, sliceInputConfig); logger.debug("Slice count using max target slice supplier [{}]", leafSlices.length); } + // FIXME: Remove before merging + printDistributionLogs(leaves, leafSlices); return leafSlices; } + + // FIXME: Remove before merging + private static void printDistributionLogs(List leaves, LeafSlice[] leafSlices) { + StringBuilder res = new StringBuilder(); + long total = 0; + for (LeafReaderContext leaf : leaves) { + res.append(" Leaf ["); + res.append(leaf.ord); + res.append(", "); + res.append(leaf.reader().maxDoc()); + res.append(']'); + total += leaf.reader().maxDoc(); + } + res.append(" Total Docs = ").append(total).append(" "); + logger.info("Input leaves {}", res.toString()); + res.setLength(0); + for (LeafSlice slice : leafSlices) { + res.append(" LeafSlice[ "); + res.append(" numParts = "); + res.append(slice.partitions.length); + res.append(" "); + total = 0; + for (LeafReaderContextPartition partition : slice.partitions) { + res.append("Part [ docs = "); + res.append(partition.maxDocId - partition.minDocId); + total += partition.maxDocId - partition.minDocId; + res.append("]"); + } + res.append(", Total Docs = ").append(total).append(" "); + res.append(" ]"); + } + logger.info("Output leaf slices {}", res.toString()); + } + } diff --git a/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java b/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java index 1660e5ac8343f..601da50f6f21f 100644 --- a/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java @@ -578,6 +578,11 @@ public int getTargetMaxSliceCount() { return in.getTargetMaxSliceCount(); } + @Override + public int getSegmentPartitionSize() { + return in.getSegmentPartitionSize(); + } + @Override public boolean shouldUseTimeSeriesDescSortOptimization() { return in.shouldUseTimeSeriesDescSortOptimization(); diff --git a/server/src/main/java/org/opensearch/search/internal/MaxTargetSliceSupplier.java b/server/src/main/java/org/opensearch/search/internal/MaxTargetSliceSupplier.java index 51ebdc68ba099..6e9bea9e7620f 100644 --- a/server/src/main/java/org/opensearch/search/internal/MaxTargetSliceSupplier.java +++ b/server/src/main/java/org/opensearch/search/internal/MaxTargetSliceSupplier.java @@ -9,12 +9,15 @@ package org.opensearch.search.internal; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.PriorityQueue; /** @@ -27,52 +30,191 @@ */ final class MaxTargetSliceSupplier { - static IndexSearcher.LeafSlice[] getSlices(List leaves, int targetMaxSlice) { + static IndexSearcher.LeafSlice[] getSlices(List leaves, SliceInputConfig sliceInputConfig) { + + int targetMaxSlice = sliceInputConfig.targetMaxSliceCount; + if (targetMaxSlice <= 0) { throw new IllegalArgumentException("MaxTargetSliceSupplier called with unexpected slice count of " + targetMaxSlice); } - // slice count should not exceed the segment count - int targetSliceCount = Math.min(targetMaxSlice, leaves.size()); + boolean isIntraSegmentEnabled = sliceInputConfig.intraSegmentEnabled; + int segmentSizeToSplit = sliceInputConfig.segmentSizeToSplit; // Smallest partition of a segment + int minSegmentSizeToSplit = segmentSizeToSplit * 2; // At least 2 partitions would make sense + + List partitions = new ArrayList<>(leaves.size()); - // Make a copy so we can sort: - List sortedLeaves = new ArrayList<>(leaves); + Map leafToLastUnassignedDocId = new HashMap<>(leaves.size()); + + for (LeafReaderContext leafReaderContext : leaves) { + // Split a segment if it meets the configured size. + if (isIntraSegmentEnabled == true && leafReaderContext.reader().maxDoc() >= minSegmentSizeToSplit) { + partitions.addAll(partitionSegment(leafReaderContext, segmentSizeToSplit, targetMaxSlice)); + } else { + partitions.add(createAppropriatePartition(leafReaderContext, 0, DocIdSetIterator.NO_MORE_DOCS)); + } + leafToLastUnassignedDocId.put(leafReaderContext.ord, 0); + } - // Sort by maxDoc, descending: - sortedLeaves.sort(Collections.reverseOrder(Comparator.comparingInt(l -> l.reader().maxDoc()))); + // slice count should not exceed the partition count + int targetSliceCount = Math.min(targetMaxSlice, partitions.size()); - final List> groupedLeaves = new ArrayList<>(targetSliceCount); - for (int i = 0; i < targetSliceCount; ++i) { - groupedLeaves.add(new ArrayList<>()); + if (targetSliceCount == 0) { + return new IndexSearcher.LeafSlice[0]; } - PriorityQueue groupQueue = new PriorityQueue<>(); + // Sort all the partitions based on their doc counts in descending order. + partitions.sort(Collections.reverseOrder(Comparator.comparingInt(MaxTargetSliceSupplier::getPartitionDocCount))); + + PriorityQueue queue = new PriorityQueue<>(targetSliceCount); for (int i = 0; i < targetSliceCount; i++) { - groupQueue.offer(new Group(i)); + queue.add(new LeafSliceBuilder()); + } + + for (IndexSearcher.LeafReaderContextPartition partition : partitions) { + LeafSliceBuilder leafSliceBuilder = queue.poll(); + leafSliceBuilder.addLeafPartition(partition); + queue.offer(leafSliceBuilder); } - Group minGroup; - for (int i = 0; i < sortedLeaves.size(); ++i) { - minGroup = groupQueue.poll(); - groupedLeaves.get(minGroup.index).add(IndexSearcher.LeafReaderContextPartition.createForEntireSegment(sortedLeaves.get(i))); - minGroup.sum += sortedLeaves.get(i).reader().maxDoc(); - groupQueue.offer(minGroup); + + // Perform de-duplication + IndexSearcher.LeafSlice[] leafSlices = new IndexSearcher.LeafSlice[targetSliceCount]; + int index = 0; + + for (LeafSliceBuilder leafSliceBuilder : queue) { + leafSlices[index++] = leafSliceBuilder.build(leafToLastUnassignedDocId); } - return groupedLeaves.stream().map(IndexSearcher.LeafSlice::new).toArray(IndexSearcher.LeafSlice[]::new); + return leafSlices; } - static class Group implements Comparable { - final int index; - int sum; + static class SliceInputConfig { + final int targetMaxSliceCount; + final boolean intraSegmentEnabled; + final int segmentSizeToSplit; + + SliceInputConfig(SearchContext searchContext) { + targetMaxSliceCount = searchContext.getTargetMaxSliceCount(); + intraSegmentEnabled = searchContext.shouldUseIntraSegmentConcurrentSearch(); + segmentSizeToSplit = searchContext.getSegmentPartitionSize(); + } + + SliceInputConfig(int targetMaxSliceCount, boolean intraSegmentEnabled, int segmentSizeToSplit) { + this.targetMaxSliceCount = targetMaxSliceCount; + this.intraSegmentEnabled = intraSegmentEnabled; + this.segmentSizeToSplit = segmentSizeToSplit; + } + + } + + private static class LeafSliceBuilder implements Comparable { + + private int totalSize = 0; + private final Map segmentOrdToMergedPartition = new HashMap<>(); + + void addLeafPartition(IndexSearcher.LeafReaderContextPartition leafReaderContextPartition) { + IndexSearcher.LeafReaderContextPartition effectivePartition = leafReaderContextPartition; + int effectivePartitionDocCount = getPartitionDocCount(effectivePartition); + // Merging 2 LeafReaderContextPartition that fall within same slice. + // IndexSearcher in Lucene will throw an exception if not merged as it doesn't help parallelism. + if (segmentOrdToMergedPartition.containsKey(leafReaderContextPartition.ctx.ord)) { + IndexSearcher.LeafReaderContextPartition storedPartition = segmentOrdToMergedPartition.get( + leafReaderContextPartition.ctx.ord + ); + effectivePartitionDocCount += getPartitionDocCount(storedPartition); + effectivePartition = createAppropriatePartition(leafReaderContextPartition.ctx, 0, effectivePartitionDocCount); + } + segmentOrdToMergedPartition.put(effectivePartition.ctx.ord, effectivePartition); + totalSize += effectivePartitionDocCount; + } - public Group(int index) { - this.index = index; - this.sum = 0; + /** + * Called when all the leaf partitions are added. + * @param leafToLastUnassignedDocId : Map used to track and generate the real From and To docIds for each segment as + * @return : Leaf slice containing all partitions with + */ + IndexSearcher.LeafSlice build(Map leafToLastUnassignedDocId) { + List partitions = new ArrayList<>(segmentOrdToMergedPartition.size()); + for (IndexSearcher.LeafReaderContextPartition leafReaderContextPartition : segmentOrdToMergedPartition.values()) { + int fromDocId = leafToLastUnassignedDocId.get(leafReaderContextPartition.ctx.ord); + int toDocId = fromDocId + getPartitionDocCount(leafReaderContextPartition); + partitions.add(createAppropriatePartition(leafReaderContextPartition.ctx, fromDocId, toDocId)); + leafToLastUnassignedDocId.put(leafReaderContextPartition.ctx.ord, toDocId); + } + return new IndexSearcher.LeafSlice(partitions); } @Override - public int compareTo(Group other) { - return Integer.compare(this.sum, other.sum); + public int compareTo(LeafSliceBuilder o) { + return Integer.compare(totalSize, o.totalSize); } } + + /** + * Consider a segment with 31_000 documents and user has configured 10_000 ( denoted by partitonSize parameter ) + * as minimum size for the partition of a segment. We first determine number of partitions, 31_000 / 10_000 = 3.
+ * Then, we determine the remainingDocs = 31_000 % 10_000 = 1000 that need to be divided.
+ * Then, it's divided equally amongst all partitions as 10_000 + ( 1000 / 3 ) = 10_333
+ * Still one partition would get one extra doc which is also considered. So, net result is:
+ * [ 31_000 ] = [ 10_334. 10_333, 10_333 ] + */ + private static List partitionSegment( + LeafReaderContext leaf, + int partitionSize, + int targetMaxSlice + ) { + + int segmentMaxDoc = leaf.reader().maxDoc(); + int numPartitions = segmentMaxDoc / partitionSize; + + // Max number of splits/partitions for a segment should not exceed the available slices. + if (numPartitions > targetMaxSlice) { + numPartitions = targetMaxSlice; + partitionSize = segmentMaxDoc / numPartitions; + } + + int remainingDocs = segmentMaxDoc % partitionSize; + int minPartitionSize = partitionSize + (remainingDocs / numPartitions); + int partitionsWithOneExtraDoc = remainingDocs % numPartitions; + + List partitions = new ArrayList<>(numPartitions); + + int currentStartDocId = 0, currentEndDocId; + + for (int i = 0; i < numPartitions; ++i) { + currentEndDocId = currentStartDocId + minPartitionSize; + currentEndDocId += (i < partitionsWithOneExtraDoc) ? 1 : 0; + partitions.add(createAppropriatePartition(leaf, currentStartDocId, currentEndDocId)); + currentStartDocId = currentEndDocId; + } + + return partitions; + } + + static int getPartitionDocCount(IndexSearcher.LeafReaderContextPartition partition) { + int effectiveMaxDocId = partition.maxDocId; + if (partition.maxDocId == DocIdSetIterator.NO_MORE_DOCS) { + effectiveMaxDocId = partition.ctx.reader().maxDoc(); + } + return effectiveMaxDocId - partition.minDocId; + } + + /** + * We want to ensure that all optimisations in Lucene relying on endDocId being NO_MORE_DOCS keeps working. + * We don't want a segment to be created as LeafReaderContextPartition.createFromAndTo(leafReaderContext, 0, leafReaderContext.reader().maxDoc()) + */ + private static IndexSearcher.LeafReaderContextPartition createAppropriatePartition( + LeafReaderContext leafReaderContext, + int startDocId, + int endDocId + ) { + endDocId = (endDocId == DocIdSetIterator.NO_MORE_DOCS) ? leafReaderContext.reader().maxDoc() : endDocId; + int docCount = endDocId - startDocId; + if (docCount == leafReaderContext.reader().maxDoc()) { + return IndexSearcher.LeafReaderContextPartition.createForEntireSegment(leafReaderContext); + } else { + return IndexSearcher.LeafReaderContextPartition.createFromAndTo(leafReaderContext, startDocId, endDocId); + } + } + } diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index 5bae9a7790108..1de664a01b902 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -433,6 +433,10 @@ public boolean shouldUseConcurrentSearch() { return false; } + public boolean shouldUseIntraSegmentConcurrentSearch() { + return false; + } + /** * Returns local bucket count thresholds based on concurrent segment search status */ @@ -539,4 +543,8 @@ public int cardinalityAggregationPruningThreshold() { public boolean keywordIndexOrDocValuesEnabled() { return false; } + + public int getSegmentPartitionSize() { + return 2; + } } diff --git a/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java b/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java index dd23318e61f7e..394d01557a1d0 100644 --- a/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java +++ b/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java @@ -91,6 +91,7 @@ import java.util.Collections; import java.util.IdentityHashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -99,6 +100,7 @@ import static org.opensearch.search.internal.ExitableDirectoryReader.ExitablePointValues; import static org.opensearch.search.internal.ExitableDirectoryReader.ExitableTerms; import static org.opensearch.search.internal.IndexReaderUtils.getLeaves; +import static org.opensearch.search.internal.IndexReaderUtils.verifyPartitionCountInSlices; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Mockito.mock; @@ -341,7 +343,7 @@ public void testSlicesInternal() throws Exception { // Case 1: Verify the slice count when lucene default slice computation is used IndexSearcher.LeafSlice[] slices = searcher.slicesInternal( leaves, - SearchService.CONCURRENT_SEGMENT_SEARCH_MIN_SLICE_COUNT_VALUE + new MaxTargetSliceSupplier.SliceInputConfig(SearchService.CONCURRENT_SEGMENT_SEARCH_MIN_SLICE_COUNT_VALUE, false, 2) ); int expectedSliceCount = 2; // 2 slices will be created since max segment per slice of 5 will be reached @@ -352,14 +354,11 @@ public void testSlicesInternal() throws Exception { // Case 2: Verify the slice count when custom max slice computation is used expectedSliceCount = 4; - slices = searcher.slicesInternal(leaves, expectedSliceCount); + slices = searcher.slicesInternal(leaves, new MaxTargetSliceSupplier.SliceInputConfig(expectedSliceCount, false, 2)); - // 4 slices will be created with 3 leaves in first&last slices and 2 leaves in other slices + // 4 slices will be created with 3 leaves in first and last slices and 2 leaves in other slices assertEquals(expectedSliceCount, slices.length); - assertEquals(3, slices[0].partitions.length); - assertEquals(2, slices[1].partitions.length); - assertEquals(2, slices[2].partitions.length); - assertEquals(3, slices[3].partitions.length); + verifyPartitionCountInSlices(slices, Map.of(3, 2, 2, 2)); } } } @@ -411,12 +410,9 @@ public void testGetSlicesWithNonNullExecutorButCSDisabled() throws Exception { int expectedSliceCount = 4; IndexSearcher.LeafSlice[] slices = searcher.slices(leaves); - // 4 slices will be created with 3 leaves in first&last slices and 2 leaves in other slices + // 4 slices will be created with 3 leaves in first & last slices and 2 leaves in other slices assertEquals(expectedSliceCount, slices.length); - assertEquals(3, slices[0].partitions.length); - assertEquals(2, slices[1].partitions.length); - assertEquals(2, slices[2].partitions.length); - assertEquals(3, slices[3].partitions.length); + verifyPartitionCountInSlices(slices, Map.of(3, 2, 2, 2)); } } } diff --git a/server/src/test/java/org/opensearch/search/internal/IndexReaderUtils.java b/server/src/test/java/org/opensearch/search/internal/IndexReaderUtils.java index 16958da77f1a3..b8fb740332d14 100644 --- a/server/src/test/java/org/opensearch/search/internal/IndexReaderUtils.java +++ b/server/src/test/java/org/opensearch/search/internal/IndexReaderUtils.java @@ -17,20 +17,32 @@ import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NoMergePolicy; +import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.store.Directory; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import static org.apache.lucene.tests.util.LuceneTestCase.newDirectory; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; public class IndexReaderUtils { + public static List getLeaves(int leafCount) throws Exception { + return getLeaves(leafCount, 1); + } + /** * Utility to create leafCount number of {@link LeafReaderContext} * @param leafCount count of leaves to create + * @param docsPerLeaf : Number of documents per Leaf * @return created leaves */ - public static List getLeaves(int leafCount) throws Exception { + public static List getLeaves(int leafCount, int docsPerLeaf) throws Exception { try ( final Directory directory = newDirectory(); final IndexWriter iw = new IndexWriter( @@ -39,11 +51,12 @@ public static List getLeaves(int leafCount) throws Exception ) ) { for (int i = 0; i < leafCount; ++i) { - Document document = new Document(); - final String fieldValue = "value" + i; - document.add(new StringField("field1", fieldValue, Field.Store.NO)); - document.add(new StringField("field2", fieldValue, Field.Store.NO)); - iw.addDocument(document); + for (int j = 0; j < docsPerLeaf; ++j) { + Document document = new Document(); + final String fieldValue = "value" + i; + document.add(new StringField("field1", fieldValue, Field.Store.NO)); + iw.addDocument(document); + } iw.commit(); } try (DirectoryReader directoryReader = DirectoryReader.open(directory)) { @@ -52,4 +65,64 @@ public static List getLeaves(int leafCount) throws Exception } } } + + /** + * For the Input [ 2 -> 1, 1 -> 2 ] + * This verifies that 1 slice has 2 partitions and 2 slices has 1 partition each. + */ + public static void verifyPartitionCountInSlices(IndexSearcher.LeafSlice[] slices, Map leafSizesToSliceCount) { + Map leafCountToNumSlices = new HashMap<>(); + for (IndexSearcher.LeafSlice slice : slices) { + int existingCount = leafCountToNumSlices.getOrDefault(slice.partitions.length, 0); + leafCountToNumSlices.put(slice.partitions.length, existingCount + 1); + } + assertEquals(leafSizesToSliceCount, leafCountToNumSlices); + } + + /** + * For the input [ 10 -> 1, 9 -> 2 ] + * This verifies that there is one partition with 10 docs and 2 partitions with 9 docs each. + */ + public static void verifyPartitionDocCountAcrossSlices( + IndexSearcher.LeafSlice[] slices, + Map expectedDocCountToPartitionCount + ) { + Map actualDocCountToPartitionCount = new HashMap<>(); + for (IndexSearcher.LeafSlice slice : slices) { + for (IndexSearcher.LeafReaderContextPartition partition : slice.partitions) { + int partitionDocCount = MaxTargetSliceSupplier.getPartitionDocCount(partition); + int existingSliceCount = actualDocCountToPartitionCount.getOrDefault(partitionDocCount, 0); + actualDocCountToPartitionCount.put(partitionDocCount, existingSliceCount + 1); + } + } + assertEquals(expectedDocCountToPartitionCount, actualDocCountToPartitionCount); + } + + /** + * For the input [ 2 -> 1, 3 -> 1 ] + * This verifies that there is one slice with total 2 docs across all it's partitions and 1 slice with total 3 docs. + */ + public static void verifyDocCountAcrossSlices(IndexSearcher.LeafSlice[] slices, Map expectedDocCountToSliceCount) { + Map actualDocCountToSliceCount = new HashMap<>(); + for (IndexSearcher.LeafSlice slice : slices) { + int totalDocCount = 0; + for (IndexSearcher.LeafReaderContextPartition partition : slice.partitions) { + totalDocCount += MaxTargetSliceSupplier.getPartitionDocCount(partition); + } + int existingSliceCount = actualDocCountToSliceCount.getOrDefault(totalDocCount, 0); + actualDocCountToSliceCount.put(totalDocCount, existingSliceCount + 1); + } + assertEquals(expectedDocCountToSliceCount, actualDocCountToSliceCount); + } + + public static void verifyUniqueSegmentPartitionsPerSlices(IndexSearcher.LeafSlice[] slices) { + for (IndexSearcher.LeafSlice slice : slices) { + Set partitionSeen = new HashSet<>(); + for (IndexSearcher.LeafReaderContextPartition partition : slice.partitions) { + assertFalse(partitionSeen.contains(partition.ctx.ord)); + partitionSeen.add(partition.ctx.ord); + } + } + } + } diff --git a/server/src/test/java/org/opensearch/search/internal/MaxTargetSliceSupplierTests.java b/server/src/test/java/org/opensearch/search/internal/MaxTargetSliceSupplierTests.java index 96a03540566c2..9d77dbdf81a60 100644 --- a/server/src/test/java/org/opensearch/search/internal/MaxTargetSliceSupplierTests.java +++ b/server/src/test/java/org/opensearch/search/internal/MaxTargetSliceSupplierTests.java @@ -8,29 +8,28 @@ package org.opensearch.search.internal; -import org.apache.lucene.analysis.standard.StandardAnalyzer; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field; -import org.apache.lucene.document.StringField; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.store.Directory; import org.opensearch.test.OpenSearchTestCase; import java.util.ArrayList; import java.util.List; +import java.util.Map; import static org.opensearch.search.internal.IndexReaderUtils.getLeaves; +import static org.opensearch.search.internal.IndexReaderUtils.verifyDocCountAcrossSlices; +import static org.opensearch.search.internal.IndexReaderUtils.verifyPartitionCountInSlices; +import static org.opensearch.search.internal.IndexReaderUtils.verifyPartitionDocCountAcrossSlices; +import static org.opensearch.search.internal.IndexReaderUtils.verifyUniqueSegmentPartitionsPerSlices; public class MaxTargetSliceSupplierTests extends OpenSearchTestCase { public void testSliceCountGreaterThanLeafCount() throws Exception { int expectedSliceCount = 2; - IndexSearcher.LeafSlice[] slices = MaxTargetSliceSupplier.getSlices(getLeaves(expectedSliceCount), 5); + IndexSearcher.LeafSlice[] slices = MaxTargetSliceSupplier.getSlices( + getLeaves(expectedSliceCount), + new MaxTargetSliceSupplier.SliceInputConfig(5, false, 0) + ); // verify slice count is same as leaf count assertEquals(expectedSliceCount, slices.length); for (int i = 0; i < expectedSliceCount; ++i) { @@ -39,12 +38,21 @@ public void testSliceCountGreaterThanLeafCount() throws Exception { } public void testNegativeSliceCount() { - assertThrows(IllegalArgumentException.class, () -> MaxTargetSliceSupplier.getSlices(new ArrayList<>(), randomIntBetween(-3, 0))); + assertThrows( + IllegalArgumentException.class, + () -> MaxTargetSliceSupplier.getSlices( + new ArrayList<>(), + new MaxTargetSliceSupplier.SliceInputConfig(randomIntBetween(-3, 0), false, 0) + ) + ); } public void testSingleSliceWithMultipleLeaves() throws Exception { int leafCount = randomIntBetween(1, 10); - IndexSearcher.LeafSlice[] slices = MaxTargetSliceSupplier.getSlices(getLeaves(leafCount), 1); + IndexSearcher.LeafSlice[] slices = MaxTargetSliceSupplier.getSlices( + getLeaves(leafCount), + new MaxTargetSliceSupplier.SliceInputConfig(1, false, 0) + ); assertEquals(1, slices.length); assertEquals(leafCount, slices[0].partitions.length); } @@ -55,7 +63,10 @@ public void testSliceCountLessThanLeafCount() throws Exception { // Case 1: test with equal number of leaves per slice int expectedSliceCount = 3; - IndexSearcher.LeafSlice[] slices = MaxTargetSliceSupplier.getSlices(leaves, expectedSliceCount); + IndexSearcher.LeafSlice[] slices = MaxTargetSliceSupplier.getSlices( + leaves, + new MaxTargetSliceSupplier.SliceInputConfig(expectedSliceCount, false, 0) + ); int expectedLeavesPerSlice = leafCount / expectedSliceCount; assertEquals(expectedSliceCount, slices.length); @@ -64,64 +75,72 @@ public void testSliceCountLessThanLeafCount() throws Exception { } // Case 2: test with first 2 slice more leaves than others + // [ 3, 3, 3, 2, 2 ] Slices with count of leaves inside them. + // Ordering shouldn't matter as overall query time will be same. expectedSliceCount = 5; - slices = MaxTargetSliceSupplier.getSlices(leaves, expectedSliceCount); - int expectedLeavesInFirst2Slice = 3; - int expectedLeavesInOtherSlice = 2; + slices = MaxTargetSliceSupplier.getSlices(leaves, new MaxTargetSliceSupplier.SliceInputConfig(expectedSliceCount, false, 0)); assertEquals(expectedSliceCount, slices.length); - for (int i = 0; i < expectedSliceCount; ++i) { - if (i < 2) { - assertEquals(expectedLeavesInFirst2Slice, slices[i].partitions.length); - } else { - assertEquals(expectedLeavesInOtherSlice, slices[i].partitions.length); - } - } + verifyPartitionCountInSlices(slices, Map.of(3, 2, 2, 3)); } public void testEmptyLeaves() { - IndexSearcher.LeafSlice[] slices = MaxTargetSliceSupplier.getSlices(new ArrayList<>(), 2); + IndexSearcher.LeafSlice[] slices = MaxTargetSliceSupplier.getSlices( + new ArrayList<>(), + new MaxTargetSliceSupplier.SliceInputConfig(2, false, 0) + ); assertEquals(0, slices.length); } public void testOptimizedGroup() throws Exception { - try ( - final Directory directory = newDirectory(); - final IndexWriter iw = new IndexWriter( - directory, - new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(NoMergePolicy.INSTANCE) - ) - ) { - final String fieldValue = "value"; - for (int i = 0; i < 3; ++i) { - Document document = new Document(); - document.add(new StringField("field1", fieldValue, Field.Store.NO)); - iw.addDocument(document); - } - iw.commit(); - for (int i = 0; i < 1; ++i) { - Document document = new Document(); - document.add(new StringField("field1", fieldValue, Field.Store.NO)); - iw.addDocument(document); - } - iw.commit(); - for (int i = 0; i < 1; ++i) { - Document document = new Document(); - document.add(new StringField("field1", fieldValue, Field.Store.NO)); - iw.addDocument(document); - } - iw.commit(); - - try (DirectoryReader directoryReader = DirectoryReader.open(directory)) { - List leaves = directoryReader.leaves(); - assertEquals(3, leaves.size()); - IndexSearcher.LeafSlice[] slices = MaxTargetSliceSupplier.getSlices(leaves, 2); - assertEquals(1, slices[0].partitions.length); - assertEquals(3, slices[0].getMaxDocs()); - - assertEquals(2, slices[1].partitions.length); - assertEquals(2, slices[1].getMaxDocs()); - } - } + List leaves = new ArrayList<>(getLeaves(1, 3)); + leaves.addAll(getLeaves(2, 1)); + + assertEquals(3, leaves.size()); + IndexSearcher.LeafSlice[] slices = MaxTargetSliceSupplier.getSlices( + leaves, + new MaxTargetSliceSupplier.SliceInputConfig(2, false, 0) + ); + verifyPartitionCountInSlices(slices, Map.of(2, 1, 1, 1)); + verifyDocCountAcrossSlices(slices, Map.of(2, 1, 3, 1)); } + + public void testPartitioningForOneLeaf() throws Exception { + List leaf = IndexReaderUtils.getLeaves(1, 121); + int maxSliceCount = 10; + IndexSearcher.LeafSlice[] slices = MaxTargetSliceSupplier.getSlices( + leaf, + new MaxTargetSliceSupplier.SliceInputConfig(maxSliceCount, true, 10) + ); + verifyUniqueSegmentPartitionsPerSlices(slices); + // 1 partition each in 10 slices + verifyPartitionCountInSlices(slices, Map.of(1, 10)); + // 9 partitions with 12 docs and 1 partition with 13 docs + verifyPartitionDocCountAcrossSlices(slices, Map.of(12, 9, 13, 1)); + + maxSliceCount = 7; + slices = MaxTargetSliceSupplier.getSlices(leaf, new MaxTargetSliceSupplier.SliceInputConfig(maxSliceCount, true, 10)); + verifyUniqueSegmentPartitionsPerSlices(slices); + // 1 partition each in 7 slices + verifyPartitionCountInSlices(slices, Map.of(1, 7)); + // 2 partitions with 18 docs and 5 partition with 17 docs + verifyPartitionDocCountAcrossSlices(slices, Map.of(18, 2, 17, 5)); + } + + public void testPartitioningForMultipleLeaves() throws Exception { + List leaves = new ArrayList<>(IndexReaderUtils.getLeaves(1, 20)); + // This segment won't be split any further + leaves.addAll(IndexReaderUtils.getLeaves(1, 19)); + int maxSliceCount = 2; + IndexSearcher.LeafSlice[] slices = MaxTargetSliceSupplier.getSlices( + leaves, + new MaxTargetSliceSupplier.SliceInputConfig(maxSliceCount, true, 10) + ); + verifyUniqueSegmentPartitionsPerSlices(slices); + // 1 partition in each slice + verifyPartitionCountInSlices(slices, Map.of(1, 2)); + // 1 partitions with 19 docs and 1 partitions with 20 docs + verifyPartitionDocCountAcrossSlices(slices, Map.of(19, 1, 20, 1)); + } + } diff --git a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java index f03cbe266df86..a9016e30af058 100644 --- a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java +++ b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java @@ -704,6 +704,11 @@ public int getTargetMaxSliceCount() { return maxSliceCount; } + @Override + public int getSegmentPartitionSize() { + return 2; + } + @Override public boolean shouldUseTimeSeriesDescSortOptimization() { return indexShard != null