Skip to content

Commit 23d076f

Browse files
committed
Fix ShardSplittingQuery to respect nested documents. (#27398)
Today if nested docs are used in an index that is split the operation will only work correctly if the index is not routing partitioned or unless routing is used. This change fixes the query that selectes the docs to delete to also select all parents nested docs as well. Closes #27378
1 parent b3b3e45 commit 23d076f

File tree

5 files changed

+345
-87
lines changed

5 files changed

+345
-87
lines changed

core/src/main/java/org/elasticsearch/index/shard/ShardSplittingQuery.java

Lines changed: 139 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
package org.elasticsearch.index.shard;
2020

2121
import org.apache.lucene.index.FieldInfo;
22+
import org.apache.lucene.index.IndexReaderContext;
2223
import org.apache.lucene.index.LeafReader;
2324
import org.apache.lucene.index.LeafReaderContext;
2425
import org.apache.lucene.index.PostingsEnum;
26+
import org.apache.lucene.index.ReaderUtil;
2527
import org.apache.lucene.index.StoredFieldVisitor;
2628
import org.apache.lucene.index.Terms;
2729
import org.apache.lucene.index.TermsEnum;
@@ -33,19 +35,23 @@
3335
import org.apache.lucene.search.Scorer;
3436
import org.apache.lucene.search.TwoPhaseIterator;
3537
import org.apache.lucene.search.Weight;
38+
import org.apache.lucene.search.join.BitSetProducer;
39+
import org.apache.lucene.util.BitSet;
3640
import org.apache.lucene.util.BitSetIterator;
37-
import org.apache.lucene.util.Bits;
3841
import org.apache.lucene.util.BytesRef;
3942
import org.apache.lucene.util.FixedBitSet;
4043
import org.elasticsearch.Version;
4144
import org.elasticsearch.cluster.metadata.IndexMetaData;
4245
import org.elasticsearch.cluster.routing.OperationRouting;
46+
import org.elasticsearch.common.lucene.search.Queries;
4347
import org.elasticsearch.index.mapper.IdFieldMapper;
4448
import org.elasticsearch.index.mapper.RoutingFieldMapper;
4549
import org.elasticsearch.index.mapper.Uid;
4650

4751
import java.io.IOException;
52+
import java.util.function.Function;
4853
import java.util.function.IntConsumer;
54+
import java.util.function.IntPredicate;
4955
import java.util.function.Predicate;
5056

5157
/**
@@ -56,16 +62,17 @@
5662
final class ShardSplittingQuery extends Query {
5763
private final IndexMetaData indexMetaData;
5864
private final int shardId;
65+
private final BitSetProducer nestedParentBitSetProducer;
5966

60-
ShardSplittingQuery(IndexMetaData indexMetaData, int shardId) {
67+
ShardSplittingQuery(IndexMetaData indexMetaData, int shardId, boolean hasNested) {
6168
if (indexMetaData.getCreationVersion().before(Version.V_6_0_0_rc2)) {
6269
throw new IllegalArgumentException("Splitting query can only be executed on an index created with version "
6370
+ Version.V_6_0_0_rc2 + " or higher");
6471
}
6572
this.indexMetaData = indexMetaData;
6673
this.shardId = shardId;
74+
this.nestedParentBitSetProducer = hasNested ? newParentDocBitSetProducer() : null;
6775
}
68-
6976
@Override
7077
public Weight createWeight(IndexSearcher searcher, boolean needsScores, float boost) {
7178
return new ConstantScoreWeight(this, boost) {
@@ -84,44 +91,87 @@ public Scorer scorer(LeafReaderContext context) throws IOException {
8491
Uid.decodeId(ref.bytes, ref.offset, ref.length), null);
8592
return shardId == targetShardId;
8693
};
87-
if (terms == null) { // this is the common case - no partitioning and no _routing values
94+
if (terms == null) {
95+
// this is the common case - no partitioning and no _routing values
96+
// in this case we also don't do anything special with regards to nested docs since we basically delete
97+
// by ID and parent and nested all have the same id.
8898
assert indexMetaData.isRoutingPartitionedIndex() == false;
8999
findSplitDocs(IdFieldMapper.NAME, includeInShard, leafReader, bitSet::set);
90100
} else {
101+
final BitSet parentBitSet;
102+
if (nestedParentBitSetProducer == null) {
103+
parentBitSet = null;
104+
} else {
105+
parentBitSet = nestedParentBitSetProducer.getBitSet(context);
106+
if (parentBitSet == null) {
107+
return null; // no matches
108+
}
109+
}
91110
if (indexMetaData.isRoutingPartitionedIndex()) {
92111
// this is the heaviest invariant. Here we have to visit all docs stored fields do extract _id and _routing
93112
// this this index is routing partitioned.
94-
Visitor visitor = new Visitor();
95-
return new ConstantScoreScorer(this, score(),
96-
new RoutingPartitionedDocIdSetIterator(leafReader, visitor));
113+
Visitor visitor = new Visitor(leafReader);
114+
TwoPhaseIterator twoPhaseIterator =
115+
parentBitSet == null ? new RoutingPartitionedDocIdSetIterator(visitor) :
116+
new NestedRoutingPartitionedDocIdSetIterator(visitor, parentBitSet);
117+
return new ConstantScoreScorer(this, score(), twoPhaseIterator);
97118
} else {
119+
// here we potentially guard the docID consumers with our parent bitset if we have one.
120+
// this ensures that we are only marking root documents in the nested case and if necessary
121+
// we do a second pass to mark the corresponding children in markChildDocs
122+
Function<IntConsumer, IntConsumer> maybeWrapConsumer = consumer -> {
123+
if (parentBitSet != null) {
124+
return docId -> {
125+
if (parentBitSet.get(docId)) {
126+
consumer.accept(docId);
127+
}
128+
};
129+
}
130+
return consumer;
131+
};
98132
// in the _routing case we first go and find all docs that have a routing value and mark the ones we have to delete
99133
findSplitDocs(RoutingFieldMapper.NAME, ref -> {
100134
int targetShardId = OperationRouting.generateShardId(indexMetaData, null, ref.utf8ToString());
101135
return shardId == targetShardId;
102-
}, leafReader, bitSet::set);
136+
}, leafReader, maybeWrapConsumer.apply(bitSet::set));
137+
103138
// now if we have a mixed index where some docs have a _routing value and some don't we have to exclude the ones
104139
// with a routing value from the next iteration an delete / select based on the ID.
105140
if (terms.getDocCount() != leafReader.maxDoc()) {
106141
// this is a special case where some of the docs have no routing values this sucks but it's possible today
107142
FixedBitSet hasRoutingValue = new FixedBitSet(leafReader.maxDoc());
108-
findSplitDocs(RoutingFieldMapper.NAME, ref -> false, leafReader,
109-
hasRoutingValue::set);
143+
findSplitDocs(RoutingFieldMapper.NAME, ref -> false, leafReader, maybeWrapConsumer.apply(hasRoutingValue::set));
144+
IntConsumer bitSetConsumer = maybeWrapConsumer.apply(bitSet::set);
110145
findSplitDocs(IdFieldMapper.NAME, includeInShard, leafReader, docId -> {
111146
if (hasRoutingValue.get(docId) == false) {
112-
bitSet.set(docId);
147+
bitSetConsumer.accept(docId);
113148
}
114149
});
115150
}
116151
}
152+
if (parentBitSet != null) {
153+
// if nested docs are involved we also need to mark all child docs that belong to a matching parent doc.
154+
markChildDocs(parentBitSet, bitSet);
155+
}
117156
}
157+
118158
return new ConstantScoreScorer(this, score(), new BitSetIterator(bitSet, bitSet.length()));
119159
}
120-
121-
122160
};
123161
}
124162

163+
private void markChildDocs(BitSet parentDocs, BitSet matchingDocs) {
164+
int currentDeleted = 0;
165+
while (currentDeleted < matchingDocs.length() &&
166+
(currentDeleted = matchingDocs.nextSetBit(currentDeleted)) != DocIdSetIterator.NO_MORE_DOCS) {
167+
int previousParent = parentDocs.prevSetBit(Math.max(0, currentDeleted-1));
168+
for (int i = previousParent + 1; i < currentDeleted; i++) {
169+
matchingDocs.set(i);
170+
}
171+
currentDeleted++;
172+
}
173+
}
174+
125175
@Override
126176
public String toString(String field) {
127177
return "shard_splitting_query";
@@ -145,8 +195,8 @@ public int hashCode() {
145195
return classHash() ^ result;
146196
}
147197

148-
private static void findSplitDocs(String idField, Predicate<BytesRef> includeInShard,
149-
LeafReader leafReader, IntConsumer consumer) throws IOException {
198+
private static void findSplitDocs(String idField, Predicate<BytesRef> includeInShard, LeafReader leafReader,
199+
IntConsumer consumer) throws IOException {
150200
Terms terms = leafReader.terms(idField);
151201
TermsEnum iterator = terms.iterator();
152202
BytesRef idTerm;
@@ -162,15 +212,17 @@ private static void findSplitDocs(String idField, Predicate<BytesRef> includeInS
162212
}
163213
}
164214

165-
private static final class Visitor extends StoredFieldVisitor {
166-
int leftToVisit = 2;
167-
final BytesRef spare = new BytesRef();
168-
String routing;
169-
String id;
215+
/* this class is a stored fields visitor that reads _id and/or _routing from the stored fields which is necessary in the case
216+
of a routing partitioned index sine otherwise we would need to un-invert the _id and _routing field which is memory heavy */
217+
private final class Visitor extends StoredFieldVisitor {
218+
final LeafReader leafReader;
219+
private int leftToVisit = 2;
220+
private final BytesRef spare = new BytesRef();
221+
private String routing;
222+
private String id;
170223

171-
void reset() {
172-
routing = id = null;
173-
leftToVisit = 2;
224+
Visitor(LeafReader leafReader) {
225+
this.leafReader = leafReader;
174226
}
175227

176228
@Override
@@ -210,36 +262,91 @@ public Status needsField(FieldInfo fieldInfo) throws IOException {
210262
return leftToVisit == 0 ? Status.STOP : Status.NO;
211263
}
212264
}
265+
266+
boolean matches(int doc) throws IOException {
267+
routing = id = null;
268+
leftToVisit = 2;
269+
leafReader.document(doc, this);
270+
assert id != null : "docID must not be null - we might have hit a nested document";
271+
int targetShardId = OperationRouting.generateShardId(indexMetaData, id, routing);
272+
return targetShardId != shardId;
273+
}
213274
}
214275

215276
/**
216277
* This two phase iterator visits every live doc and selects all docs that don't belong into this
217278
* shard based on their id and routing value. This is only used in a routing partitioned index.
218279
*/
219-
private final class RoutingPartitionedDocIdSetIterator extends TwoPhaseIterator {
220-
private final LeafReader leafReader;
280+
private static final class RoutingPartitionedDocIdSetIterator extends TwoPhaseIterator {
221281
private final Visitor visitor;
222282

223-
RoutingPartitionedDocIdSetIterator(LeafReader leafReader, Visitor visitor) {
224-
super(DocIdSetIterator.all(leafReader.maxDoc())); // we iterate all live-docs
225-
this.leafReader = leafReader;
283+
RoutingPartitionedDocIdSetIterator(Visitor visitor) {
284+
super(DocIdSetIterator.all(visitor.leafReader.maxDoc())); // we iterate all live-docs
226285
this.visitor = visitor;
227286
}
228287

229288
@Override
230289
public boolean matches() throws IOException {
290+
return visitor.matches(approximation.docID());
291+
}
292+
293+
@Override
294+
public float matchCost() {
295+
return 42; // that's obvious, right?
296+
}
297+
}
298+
299+
/**
300+
* This TwoPhaseIterator marks all nested docs of matching parents as matches as well.
301+
*/
302+
private static final class NestedRoutingPartitionedDocIdSetIterator extends TwoPhaseIterator {
303+
private final Visitor visitor;
304+
private final BitSet parentDocs;
305+
private int nextParent = -1;
306+
private boolean nextParentMatches;
307+
308+
NestedRoutingPartitionedDocIdSetIterator(Visitor visitor, BitSet parentDocs) {
309+
super(DocIdSetIterator.all(visitor.leafReader.maxDoc())); // we iterate all live-docs
310+
this.parentDocs = parentDocs;
311+
this.visitor = visitor;
312+
}
313+
314+
@Override
315+
public boolean matches() throws IOException {
316+
// the educated reader might ask why this works, it does because all live doc ids (root docs and nested docs) are evaluated in
317+
// order and that way we don't need to seek backwards as we do in other nested docs cases.
231318
int doc = approximation.docID();
232-
visitor.reset();
233-
leafReader.document(doc, visitor);
234-
int targetShardId = OperationRouting.generateShardId(indexMetaData, visitor.id, visitor.routing);
235-
return targetShardId != shardId;
319+
if (doc > nextParent) {
320+
// we only check once per nested/parent set
321+
nextParent = parentDocs.nextSetBit(doc);
322+
// never check a child document against the visitor, they neihter have _id nor _routing as stored fields
323+
nextParentMatches = visitor.matches(nextParent);
324+
}
325+
return nextParentMatches;
236326
}
237327

238328
@Override
239329
public float matchCost() {
240330
return 42; // that's obvious, right?
241331
}
242332
}
333+
334+
/*
335+
* this is used internally to obtain a bitset for parent documents. We don't cache this since we never access the same reader more
336+
* than once. There is no point in using BitsetFilterCache#BitSetProducerWarmer since we use this only as a delete by query which is
337+
* executed on a recovery-private index writer. There is no point in caching it and it won't have a cache hit either.
338+
*/
339+
private static BitSetProducer newParentDocBitSetProducer() {
340+
return context -> {
341+
Query query = Queries.newNonNestedFilter();
342+
final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context);
343+
final IndexSearcher searcher = new IndexSearcher(topLevelContext);
344+
searcher.setQueryCache(null);
345+
final Weight weight = searcher.createNormalizedWeight(query, false);
346+
Scorer s = weight.scorer(context);
347+
return s == null ? null : BitSet.of(s.iterator(), context.reader().maxDoc());
348+
};
349+
}
243350
}
244351

245352

core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdate
115115
indexShard.mapperService().merge(sourceMetaData, MapperService.MergeReason.MAPPING_RECOVERY, true);
116116
// now that the mapping is merged we can validate the index sort configuration.
117117
Sort indexSort = indexShard.getIndexSort();
118+
final boolean hasNested = indexShard.mapperService().hasNested();
118119
final boolean isSplit = sourceMetaData.getNumberOfShards() < indexShard.indexSettings().getNumberOfShards();
119120
assert isSplit == false || sourceMetaData.getCreationVersion().onOrAfter(Version.V_6_0_0_alpha1) : "for split we require a " +
120121
"single type but the index is created before 6.0.0";
@@ -127,7 +128,7 @@ boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdate
127128
final long maxUnsafeAutoIdTimestamp =
128129
shards.stream().mapToLong(LocalShardSnapshot::maxUnsafeAutoIdTimestamp).max().getAsLong();
129130
addIndices(indexShard.recoveryState().getIndex(), directory, indexSort, sources, maxSeqNo, maxUnsafeAutoIdTimestamp,
130-
indexShard.indexSettings().getIndexMetaData(), indexShard.shardId().id(), isSplit);
131+
indexShard.indexSettings().getIndexMetaData(), indexShard.shardId().id(), isSplit, hasNested);
131132
internalRecoverFromStore(indexShard);
132133
// just trigger a merge to do housekeeping on the
133134
// copied segments - we will also see them in stats etc.
@@ -142,8 +143,8 @@ boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdate
142143
}
143144

144145
void addIndices(final RecoveryState.Index indexRecoveryStats, final Directory target, final Sort indexSort, final Directory[] sources,
145-
final long maxSeqNo, final long maxUnsafeAutoIdTimestamp, IndexMetaData indexMetaData, int shardId, boolean split)
146-
throws IOException {
146+
final long maxSeqNo, final long maxUnsafeAutoIdTimestamp, IndexMetaData indexMetaData, int shardId, boolean split,
147+
boolean hasNested) throws IOException {
147148
final Directory hardLinkOrCopyTarget = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target);
148149
IndexWriterConfig iwc = new IndexWriterConfig(null)
149150
.setCommitOnClose(false)
@@ -158,9 +159,8 @@ void addIndices(final RecoveryState.Index indexRecoveryStats, final Directory ta
158159

159160
try (IndexWriter writer = new IndexWriter(new StatsDirectoryWrapper(hardLinkOrCopyTarget, indexRecoveryStats), iwc)) {
160161
writer.addIndexes(sources);
161-
162162
if (split) {
163-
writer.deleteDocuments(new ShardSplittingQuery(indexMetaData, shardId));
163+
writer.deleteDocuments(new ShardSplittingQuery(indexMetaData, shardId, hasNested));
164164
}
165165
/*
166166
* We set the maximum sequence number and the local checkpoint on the target to the maximum of the maximum sequence numbers on

0 commit comments

Comments
 (0)