diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java index f7963a0642cb3..d9217cdbc582c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java @@ -41,6 +41,7 @@ import java.util.List; import java.util.OptionalInt; import java.util.Set; +import java.util.function.BiPredicate; import java.util.function.IntConsumer; import java.util.function.Predicate; @@ -180,6 +181,11 @@ private static int effectiveRoutingToHash(String effectiveRouting) { */ public void checkIndexSplitAllowed() {} + /// Returns a predicate that given the document id and a routing value + /// returns `true` if the document routes to the provided shard. + /// This API is specifically used by [ShardSplittingQuery]. + public abstract BiPredicate shardMatcherForSplit(int shardId); + /** * If this index is in the process of resharding, and the shard to which this request is being routed, * is a target shard that is not yet in HANDOFF state, then route it to the source shard. @@ -285,6 +291,16 @@ private void checkRoutingRequired(String id, @Nullable String routing) { throw new RoutingMissingException(indexName, id); } } + + @Override + public BiPredicate shardMatcherForSplit(int shardId) { + return (id, routing) -> { + // Note that we intentionally do not apply any adjustments for resharding. + // These adjustments are introduced for coordinator nodes and `ShardSplittingQuery` does not need them. + int routedToShardId = shardId(id, routing); + return routedToShardId == shardId; + }; + } } /** @@ -462,6 +478,12 @@ public void checkIndexSplitAllowed() { throw new IllegalArgumentException(error("index-split")); } + @Override + public BiPredicate shardMatcherForSplit(int shardId) { + // Splits of time series indices are not supported, see `checkIndexSplitAllowed()`. + throw new UnsupportedOperationException(error("index-split")); + } + @Override public void collectSearchShards(String routing, IntConsumer consumer) { throw new IllegalArgumentException(error("searching with a specified routing")); diff --git a/server/src/main/java/org/elasticsearch/index/shard/ShardSplittingQuery.java b/server/src/main/java/org/elasticsearch/index/shard/ShardSplittingQuery.java index 28b63ffff3e66..b09898fdae1c1 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/ShardSplittingQuery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/ShardSplittingQuery.java @@ -43,6 +43,7 @@ import org.elasticsearch.index.mapper.Uid; import java.io.IOException; +import java.util.function.BiPredicate; import java.util.function.Function; import java.util.function.IntConsumer; import java.util.function.Predicate; @@ -54,13 +55,13 @@ */ public final class ShardSplittingQuery extends Query { private final IndexMetadata indexMetadata; - private final IndexRouting indexRouting; + private final BiPredicate shardMatcher; private final int shardId; private final BitSetProducer nestedParentBitSetProducer; public ShardSplittingQuery(IndexMetadata indexMetadata, int shardId, boolean hasNested) { this.indexMetadata = indexMetadata; - this.indexRouting = IndexRouting.fromIndexMetadata(indexMetadata); + this.shardMatcher = IndexRouting.fromIndexMetadata(indexMetadata).shardMatcherForSplit(shardId); this.shardId = shardId; this.nestedParentBitSetProducer = hasNested ? newParentDocBitSetProducer(indexMetadata.getCreationVersion()) : null; } @@ -78,10 +79,10 @@ public ScorerSupplier scorerSupplier(LeafReaderContext context) throws IOExcepti LeafReader leafReader = context.reader(); FixedBitSet bitSet = new FixedBitSet(leafReader.maxDoc()); Terms terms = leafReader.terms(RoutingFieldMapper.NAME); - Predicate includeInShard = ref -> { + Predicate idOnlyPredicate = ref -> { // TODO IndexRouting should build the query somehow - int targetShardId = indexRouting.getShard(Uid.decodeId(ref.bytes, ref.offset, ref.length), null); - return shardId == targetShardId; + String id = Uid.decodeId(ref.bytes, ref.offset, ref.length); + return shardMatcher.test(id, null); }; return new ScorerSupplier() { @@ -92,7 +93,7 @@ public Scorer get(long leadCost) throws IOException { // in this case we also don't do anything special with regards to nested docs since we basically delete // by ID and parent and nested all have the same id. assert indexMetadata.isRoutingPartitionedIndex() == false; - findSplitDocs(IdFieldMapper.NAME, includeInShard, leafReader, bitSet::set); + findSplitDocs(IdFieldMapper.NAME, (idOnlyPredicate), leafReader, bitSet::set); } else { final BitSet parentBitSet; if (nestedParentBitSetProducer == null) { @@ -127,10 +128,13 @@ public Scorer get(long leadCost) throws IOException { }; // in the _routing case we first go and find all docs that have a routing value and mark the ones we have to // delete - findSplitDocs(RoutingFieldMapper.NAME, ref -> { - int targetShardId = indexRouting.getShard(null, ref.utf8ToString()); - return shardId == targetShardId; - }, leafReader, maybeWrapConsumer.apply(bitSet::set)); + Predicate routingOnlyPredicate = bytes -> shardMatcher.test(null, bytes.utf8ToString()); + findSplitDocs( + RoutingFieldMapper.NAME, + routingOnlyPredicate, + leafReader, + maybeWrapConsumer.apply(bitSet::set) + ); // TODO have the IndexRouting build the query and pass routingRequired in boolean routingRequired = indexMetadata.mapping() == null @@ -158,7 +162,7 @@ public Scorer get(long leadCost) throws IOException { maybeWrapConsumer.apply(hasRoutingValue::set) ); IntConsumer bitSetConsumer = maybeWrapConsumer.apply(bitSet::set); - findSplitDocs(IdFieldMapper.NAME, includeInShard, leafReader, docId -> { + findSplitDocs(IdFieldMapper.NAME, idOnlyPredicate, leafReader, docId -> { if (hasRoutingValue.get(docId) == false) { bitSetConsumer.accept(docId); } @@ -295,8 +299,7 @@ boolean matches(int doc) throws IOException { leftToVisit = 2; storedFields.document(doc, this); assert id != null : "docID must not be null - we might have hit a nested document"; - int targetShardId = indexRouting.getShard(id, routing); - return targetShardId != shardId; + return shardMatcher.test(id, routing) == false; } }