Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.List;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.BiPredicate;
import java.util.function.IntConsumer;
import java.util.function.Predicate;

Expand Down Expand Up @@ -180,6 +181,11 @@ private static int effectiveRoutingToHash(String effectiveRouting) {
*/
public void checkIndexSplitAllowed() {}

/// Returns a predicate that given the document id and a routing value
/// returns `true` if the document routes to the provided shard.
/// This API is specifically used by [ShardSplittingQuery].
public abstract BiPredicate<String, String> shardMatcherForSplit(int shardId);

/**
* If this index is in the process of resharding, and the shard to which this request is being routed,
* is a target shard that is not yet in HANDOFF state, then route it to the source shard.
Expand Down Expand Up @@ -285,6 +291,16 @@ private void checkRoutingRequired(String id, @Nullable String routing) {
throw new RoutingMissingException(indexName, id);
}
}

@Override
public BiPredicate<String, String> shardMatcherForSplit(int shardId) {
return (id, routing) -> {
// Note that we intentionally do not apply any adjustments for resharding.
// These adjustments are introduced for coordinator nodes and `ShardSplittingQuery` does not need them.
int routedToShardId = shardId(id, routing);
return routedToShardId == shardId;
};
}
}

/**
Expand Down Expand Up @@ -462,6 +478,12 @@ public void checkIndexSplitAllowed() {
throw new IllegalArgumentException(error("index-split"));
}

@Override
public BiPredicate<String, String> shardMatcherForSplit(int shardId) {
// Splits of time series indices are not supported, see `checkIndexSplitAllowed()`.
throw new UnsupportedOperationException(error("index-split"));
}

@Override
public void collectSearchShards(String routing, IntConsumer consumer) {
throw new IllegalArgumentException(error("searching with a specified routing"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.index.mapper.Uid;

import java.io.IOException;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.Predicate;
Expand All @@ -54,13 +55,13 @@
*/
public final class ShardSplittingQuery extends Query {
private final IndexMetadata indexMetadata;
private final IndexRouting indexRouting;
private final BiPredicate<String, String> shardMatcher;
private final int shardId;
private final BitSetProducer nestedParentBitSetProducer;

public ShardSplittingQuery(IndexMetadata indexMetadata, int shardId, boolean hasNested) {
this.indexMetadata = indexMetadata;
this.indexRouting = IndexRouting.fromIndexMetadata(indexMetadata);
this.shardMatcher = IndexRouting.fromIndexMetadata(indexMetadata).shardMatcherForSplit(shardId);
this.shardId = shardId;
this.nestedParentBitSetProducer = hasNested ? newParentDocBitSetProducer(indexMetadata.getCreationVersion()) : null;
}
Expand All @@ -78,10 +79,10 @@ public ScorerSupplier scorerSupplier(LeafReaderContext context) throws IOExcepti
LeafReader leafReader = context.reader();
FixedBitSet bitSet = new FixedBitSet(leafReader.maxDoc());
Terms terms = leafReader.terms(RoutingFieldMapper.NAME);
Predicate<BytesRef> includeInShard = ref -> {
Predicate<BytesRef> idOnlyPredicate = ref -> {
// TODO IndexRouting should build the query somehow
int targetShardId = indexRouting.getShard(Uid.decodeId(ref.bytes, ref.offset, ref.length), null);
return shardId == targetShardId;
String id = Uid.decodeId(ref.bytes, ref.offset, ref.length);
return shardMatcher.test(id, null);
};

return new ScorerSupplier() {
Expand All @@ -92,7 +93,7 @@ public Scorer get(long leadCost) throws IOException {
// in this case we also don't do anything special with regards to nested docs since we basically delete
// by ID and parent and nested all have the same id.
assert indexMetadata.isRoutingPartitionedIndex() == false;
findSplitDocs(IdFieldMapper.NAME, includeInShard, leafReader, bitSet::set);
findSplitDocs(IdFieldMapper.NAME, (idOnlyPredicate), leafReader, bitSet::set);
} else {
final BitSet parentBitSet;
if (nestedParentBitSetProducer == null) {
Expand Down Expand Up @@ -127,10 +128,13 @@ public Scorer get(long leadCost) throws IOException {
};
// in the _routing case we first go and find all docs that have a routing value and mark the ones we have to
// delete
findSplitDocs(RoutingFieldMapper.NAME, ref -> {
int targetShardId = indexRouting.getShard(null, ref.utf8ToString());
return shardId == targetShardId;
}, leafReader, maybeWrapConsumer.apply(bitSet::set));
Predicate<BytesRef> routingOnlyPredicate = bytes -> shardMatcher.test(null, bytes.utf8ToString());
findSplitDocs(
RoutingFieldMapper.NAME,
routingOnlyPredicate,
leafReader,
maybeWrapConsumer.apply(bitSet::set)
);

// TODO have the IndexRouting build the query and pass routingRequired in
boolean routingRequired = indexMetadata.mapping() == null
Expand Down Expand Up @@ -158,7 +162,7 @@ public Scorer get(long leadCost) throws IOException {
maybeWrapConsumer.apply(hasRoutingValue::set)
);
IntConsumer bitSetConsumer = maybeWrapConsumer.apply(bitSet::set);
findSplitDocs(IdFieldMapper.NAME, includeInShard, leafReader, docId -> {
findSplitDocs(IdFieldMapper.NAME, idOnlyPredicate, leafReader, docId -> {
if (hasRoutingValue.get(docId) == false) {
bitSetConsumer.accept(docId);
}
Expand Down Expand Up @@ -295,8 +299,7 @@ boolean matches(int doc) throws IOException {
leftToVisit = 2;
storedFields.document(doc, this);
assert id != null : "docID must not be null - we might have hit a nested document";
int targetShardId = indexRouting.getShard(id, routing);
return targetShardId != shardId;
return shardMatcher.test(id, routing) == false;
}
}

Expand Down