Skip to content

Commit

Permalink
[Refactor] LuceneChangesSnapshot to use accurate ops history (#2452)
Browse files Browse the repository at this point in the history
Improves the LuceneChangesSnapshot to get an accurate count of recovery
operations using sort by sequence number optimization.

Signed-off-by: Nicholas Walter Knize <[email protected]>
  • Loading branch information
nknize authored Mar 15, 2022
1 parent b69dc33 commit 757abdb
Show file tree
Hide file tree
Showing 15 changed files with 174 additions and 59 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
---
"Return empty object if field doesn't exist, but index does":
- skip:
version: "all"
reason: "AwaitsFix https://github.com/opensearch-project/OpenSearch/issues/2440"

- do:
indices.create:
index: test_index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ public void testShardChangesWithDefaultDocType() throws Exception {
}
IndexShard shard = indexService.getShard(0);
try (
Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot("test", 0, numOps - 1, true);
Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot("test", 0, numOps - 1, true, randomBoolean());
Translog.Snapshot translogSnapshot = getTranslog(shard).newSnapshot()
) {
List<Translog.Operation> opsFromLucene = TestTranslog.drainSnapshot(luceneSnapshot, true);
Expand Down
18 changes: 16 additions & 2 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -735,8 +735,22 @@ public enum SearcherScope {
* Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive).
* This feature requires soft-deletes enabled. If soft-deletes are disabled, this method will throw an {@link IllegalStateException}.
*/
public abstract Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange)
throws IOException;
public abstract Translog.Snapshot newChangesSnapshot(
String source,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean accurateCount
) throws IOException;

/**
* Counts the number of history operations in the given sequence number range
* @param source source of the request
* @param fromSeqNo from sequence number; included
* @param toSeqNumber to sequence number; included
* @return number of history operations
*/
public abstract int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNumber) throws IOException;

public abstract boolean hasCompleteOperationHistory(String reason, long startingSeqNo);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2772,7 +2772,13 @@ long getNumDocUpdates() {
}

@Override
public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
public Translog.Snapshot newChangesSnapshot(
String source,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean accurateCount
) throws IOException {
ensureOpen();
refreshIfNeeded(source, toSeqNo);
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
Expand All @@ -2782,7 +2788,8 @@ public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long
LuceneChangesSnapshot.DEFAULT_BATCH_SIZE,
fromSeqNo,
toSeqNo,
requiredFullRange
requiredFullRange,
accurateCount
);
searcher = null;
return snapshot;
Expand All @@ -2798,6 +2805,21 @@ public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long
}
}

public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNo) throws IOException {
ensureOpen();
refreshIfNeeded(source, toSeqNo);
try (Searcher s = acquireSearcher(source, SearcherScope.INTERNAL)) {
return LuceneChangesSnapshot.countNumberOfHistoryOperations(s, fromSeqNo, toSeqNo);
} catch (IOException e) {
try {
maybeFailEngine(source, e);
} catch (Exception innerException) {
e.addSuppressed(innerException);
}
throw e;
}
}

public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) {
return getMinRetainedSeqNo() <= startingSeqNo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,19 @@
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.util.ArrayUtil;
import org.opensearch.Version;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.fieldvisitor.FieldsVisitor;
import org.opensearch.index.mapper.SeqNoFieldMapper;
Expand Down Expand Up @@ -88,8 +91,14 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
* @param toSeqNo the maximum requesting seq# - inclusive
* @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo
*/
LuceneChangesSnapshot(Engine.Searcher engineSearcher, int searchBatchSize, long fromSeqNo, long toSeqNo, boolean requiredFullRange)
throws IOException {
LuceneChangesSnapshot(
Engine.Searcher engineSearcher,
int searchBatchSize,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean accurateCount
) throws IOException {
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
}
Expand All @@ -111,7 +120,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
this.indexSearcher.setQueryCache(null);
this.parallelArray = new ParallelArray(this.searchBatchSize);
final TopDocs topDocs = searchOperations(null);
final TopDocs topDocs = searchOperations(null, accurateCount);
this.totalHits = Math.toIntExact(topDocs.totalHits.value);
this.scoreDocs = topDocs.scoreDocs;
fillParallelArray(scoreDocs, parallelArray);
Expand Down Expand Up @@ -187,7 +196,7 @@ private int nextDocIndex() throws IOException {
// we have processed all docs in the current search - fetch the next batch
if (docIndex == scoreDocs.length && docIndex > 0) {
final ScoreDoc prev = scoreDocs[scoreDocs.length - 1];
scoreDocs = searchOperations(prev).scoreDocs;
scoreDocs = searchOperations((FieldDoc) prev, false).scoreDocs;
fillParallelArray(scoreDocs, parallelArray);
docIndex = 0;
}
Expand Down Expand Up @@ -236,16 +245,31 @@ private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray
}
}

private TopDocs searchOperations(ScoreDoc after) throws IOException {
final Query rangeQuery = new BooleanQuery.Builder().add(
LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo),
BooleanClause.Occur.MUST
)
// exclude non-root nested documents
.add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.MUST)
private static Query operationsRangeQuery(long fromSeqNo, long toSeqNo) {
return new BooleanQuery.Builder().add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo), BooleanClause.Occur.MUST)
.add(Queries.newNonNestedFilter(Version.CURRENT), BooleanClause.Occur.MUST) // exclude non-root nested docs
.build();
}

static int countNumberOfHistoryOperations(Engine.Searcher searcher, long fromSeqNo, long toSeqNo) throws IOException {
if (fromSeqNo > toSeqNo || fromSeqNo < 0 || toSeqNo < 0) {
throw new IllegalArgumentException("Invalid sequence range; fromSeqNo [" + fromSeqNo + "] toSeqNo [" + toSeqNo + "]");
}
IndexSearcher indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader()));
return indexSearcher.count(operationsRangeQuery(fromSeqNo, toSeqNo));
}

private TopDocs searchOperations(FieldDoc after, boolean accurate) throws IOException {
final Query rangeQuery = operationsRangeQuery(Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo);
final Sort sortedBySeqNo = new Sort(new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG));
return indexSearcher.searchAfter(after, rangeQuery, searchBatchSize, sortedBySeqNo);
final TopFieldCollector topFieldCollector = TopFieldCollector.create(
sortedBySeqNo,
searchBatchSize,
after,
accurate ? Integer.MAX_VALUE : 0
);
indexSearcher.search(rangeQuery, topFieldCollector);
return topFieldCollector.topDocs();
}

private Translog.Operation readDocAsOp(int docIndex) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,23 @@ public Closeable acquireHistoryRetentionLock() {
}

@Override
public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange) {
public Translog.Snapshot newChangesSnapshot(
String source,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean accurateCount
) {
return newEmptySnapshot();
}

@Override
public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNo) throws IOException {
try (Translog.Snapshot snapshot = newChangesSnapshot(source, fromSeqNo, toSeqNo, false, true)) {
return snapshot.totalOperations();
}
}

public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) {
// we can do operation-based recovery if we don't have to replay any operation.
return startingSeqNo > seqNoStats.getMaxSeqNo();
Expand Down
27 changes: 22 additions & 5 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2231,13 +2231,13 @@ public Closeable acquireHistoryRetentionLock() {
}

/**
*
* Creates a new history snapshot for reading operations since
* the provided starting seqno (inclusive) and ending seqno (inclusive)
* The returned snapshot can be retrieved from either Lucene index or translog files.
*/
public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo, long endSeqNo) throws IOException {
return getEngine().newChangesSnapshot(reason, startingSeqNo, endSeqNo, true);
public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo, long endSeqNo, boolean accurateCount)
throws IOException {
return getEngine().newChangesSnapshot(reason, startingSeqNo, endSeqNo, true, accurateCount);
}

/**
Expand All @@ -2257,6 +2257,17 @@ public long getMinRetainedSeqNo() {
return getEngine().getMinRetainedSeqNo();
}

/**
* Counts the number of history operations within the provided sequence numbers
* @param source source of the requester (e.g., peer-recovery)
* @param fromSeqNo from sequence number, included
* @param toSeqNo to sequence number, included
* @return number of history operations in the sequence number range
*/
public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNo) throws IOException {
return getEngine().countNumberOfHistoryOperations(source, fromSeqNo, toSeqNo);
}

/**
* Creates a new changes snapshot for reading operations whose seq_no are between {@code fromSeqNo}(inclusive)
* and {@code toSeqNo}(inclusive). The caller has to close the returned snapshot after finishing the reading.
Expand All @@ -2268,8 +2279,14 @@ public long getMinRetainedSeqNo() {
* if any operation between {@code fromSeqNo} and {@code toSeqNo} is missing.
* This parameter should be only enabled when the entire requesting range is below the global checkpoint.
*/
public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
return getEngine().newChangesSnapshot(source, fromSeqNo, toSeqNo, requiredFullRange);
public Translog.Snapshot newChangesSnapshot(
String source,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean accurateCount
) throws IOException {
return getEngine().newChangesSnapshot(source, fromSeqNo, toSeqNo, requiredFullRange, accurateCount);
}

public List<Segment> segments(boolean verbose) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void resync(final IndexShard indexShard, final ActionListener<ResyncTask>
// Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
// Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible
// Also fail the resync early if the shard is shutting down
snapshot = indexShard.newChangesSnapshot("resync", startingSeqNo, Long.MAX_VALUE, false);
snapshot = indexShard.newChangesSnapshot("resync", startingSeqNo, Long.MAX_VALUE, false, true);
final Translog.Snapshot originalSnapshot = snapshot;
final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public class RecoverySourceHandler {
private final CancellableThreads cancellableThreads = new CancellableThreads();
private final List<Closeable> resources = new CopyOnWriteArrayList<>();
private final ListenableFuture<RecoveryResponse> future = new ListenableFuture<>();
private static final String PEER_RECOVERY_NAME = "peer-recovery";
public static final String PEER_RECOVERY_NAME = "peer-recovery";

public RecoverySourceHandler(
IndexShard shard,
Expand Down Expand Up @@ -272,7 +272,7 @@ && isTargetSameHistory()
logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo);

try {
final int estimateNumOps = estimateNumberOfHistoryOperations(startingSeqNo);
final int estimateNumOps = countNumberOfHistoryOperations(startingSeqNo);
final Releasable releaseStore = acquireStore(shard.store());
resources.add(releaseStore);
sendFileStep.whenComplete(r -> IOUtils.close(wrappedSafeCommit, releaseStore), e -> {
Expand Down Expand Up @@ -319,7 +319,7 @@ && isTargetSameHistory()
sendFileStep.whenComplete(r -> {
assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]");
// For a sequence based recovery, the target can keep its local translog
prepareTargetForTranslog(estimateNumberOfHistoryOperations(startingSeqNo), prepareEngineStep);
prepareTargetForTranslog(countNumberOfHistoryOperations(startingSeqNo), prepareEngineStep);
}, onFailure);

prepareEngineStep.whenComplete(prepareEngineTime -> {
Expand All @@ -340,9 +340,15 @@ && isTargetSameHistory()

final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
if (logger.isTraceEnabled()) {
logger.trace("snapshot translog for recovery; current size is [{}]", estimateNumberOfHistoryOperations(startingSeqNo));
logger.trace("snapshot translog for recovery; current size is [{}]", countNumberOfHistoryOperations(startingSeqNo));
}
final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE, false);
final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot(
PEER_RECOVERY_NAME,
startingSeqNo,
Long.MAX_VALUE,
false,
true
);
resources.add(phase2Snapshot);
retentionLock.close();

Expand Down Expand Up @@ -403,10 +409,13 @@ private boolean isTargetSameHistory() {
return targetHistoryUUID.equals(shard.getHistoryUUID());
}

private int estimateNumberOfHistoryOperations(long startingSeqNo) throws IOException {
try (Translog.Snapshot snapshot = shard.newChangesSnapshot(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE, false)) {
return snapshot.totalOperations();
}
/**
* Counts the number of history operations from the starting sequence number
* @param startingSeqNo the starting sequence number to count; included
* @return number of history operations
*/
private int countNumberOfHistoryOperations(long startingSeqNo) throws IOException {
return shard.countNumberOfHistoryOperations(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE);
}

static void runUnderPrimaryPermit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,11 @@ public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSe

private boolean hasUncommittedOperations() throws IOException {
long localCheckpointOfCommit = Long.parseLong(indexShard.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
try (
Translog.Snapshot snapshot = indexShard.newChangesSnapshot("peer-recovery", localCheckpointOfCommit + 1, Long.MAX_VALUE, false)
) {
return snapshot.totalOperations() > 0;
}
return indexShard.countNumberOfHistoryOperations(
RecoverySourceHandler.PEER_RECOVERY_NAME,
localCheckpointOfCommit + 1,
Long.MAX_VALUE
) > 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6362,8 +6362,12 @@ public void onFailure(Exception e) {
@Override
protected void doRun() throws Exception {
latch.await();
Translog.Snapshot changes = engine.newChangesSnapshot("test", min, max, true);
changes.close();
if (randomBoolean()) {
Translog.Snapshot changes = engine.newChangesSnapshot("test", min, max, true, randomBoolean());
changes.close();
} else {
engine.countNumberOfHistoryOperations("test", min, max);
}
}
});
snapshotThreads[i].start();
Expand Down
Loading

0 comments on commit 757abdb

Please sign in to comment.