Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
* requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization.
*/
assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]";
plan = IndexingStrategy.optimizedAppendOnly(index.seqNo());
plan = IndexingStrategy.optimizedAppendOnly(index.seqNo(), 1L);
} else {
if (appendOnlyRequest == false) {
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr));
Expand Down Expand Up @@ -927,7 +927,7 @@ protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOExc
plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L);
versionMap.enforceSafeAccess();
} else {
plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index));
plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index), 1L);
}
} else {
versionMap.enforceSafeAccess();
Expand Down Expand Up @@ -1082,8 +1082,8 @@ private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpda
Optional.of(earlyResultOnPreFlightError);
}

static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing) {
return new IndexingStrategy(true, false, true, false, seqNoForIndexing, 1, null);
public static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing, long versionForIndexing) {
return new IndexingStrategy(true, false, true, false, seqNoForIndexing, versionForIndexing, null);
}

static IndexingStrategy skipDueToVersionConflict(
Expand All @@ -1104,7 +1104,8 @@ static IndexingStrategy overrideExistingAsIfNotThere(
return new IndexingStrategy(true, true, true, false, seqNoForIndexing, versionForIndexing, null);
}

static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long seqNoForIndexing, long versionForIndexing) {
public static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long seqNoForIndexing,
long versionForIndexing) {
return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, seqNoForIndexing, versionForIndexing, null);
}

Expand Down Expand Up @@ -2331,6 +2332,16 @@ public void waitForOpsToComplete(long seqNo) throws InterruptedException {
localCheckpointTracker.waitForOpsToComplete(seqNo);
}

/**
* Checks if the given operation has been processed in this engine or not.
* @return true if the given operation was processed; otherwise false.
*/
protected boolean containsOperation(Operation op) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: call this "hasBeenProcessedBefore"?

assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "operation is not assigned seq_no";
assert versionMap.assertKeyedLockHeldByCurrentThread(op.uid().bytes());
return localCheckpointTracker.contains(op.seqNo());
}

@Override
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
return localCheckpointTracker.getStats(globalCheckpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ Releasable acquireLock(BytesRef uid) {
return keyedLock.acquire(uid);
}

private boolean assertKeyedLockHeldByCurrentThread(BytesRef uid) {
boolean assertKeyedLockHeldByCurrentThread(BytesRef uid) {
assert keyedLock.isHeldByCurrentThread(uid) : "Thread [" + Thread.currentThread().getName() + "], uid [" + uid.utf8ToString() + "]";
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,11 +575,11 @@ protected static BytesArray bytesArray(String string) {
return new BytesArray(string.getBytes(Charset.defaultCharset()));
}

protected static Term newUid(String id) {
public static Term newUid(String id) {
return new Term("_id", Uid.encodeId(id));
}

protected Term newUid(ParsedDocument doc) {
public static Term newUid(ParsedDocument doc) {
return newUid(doc.id());
}

Expand Down Expand Up @@ -643,7 +643,7 @@ public static List<Engine.Operation> generateSingleDocHistory(boolean forReplica
throw new UnsupportedOperationException("unknown version type: " + versionType);
}
if (randomBoolean()) {
op = new Engine.Index(id, testParsedDocument(docId, null, testDocumentWithTextField(valuePrefix + i), B_1, null),
op = new Engine.Index(id, testParsedDocument(docId, null, testDocumentWithTextField(valuePrefix + i), SOURCE, null),
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
version,
Expand Down Expand Up @@ -734,7 +734,7 @@ public static void assertOpsOnReplica(
}
}

protected void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine engine) throws InterruptedException {
public static void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine engine) throws InterruptedException {
Thread[] thread = new Thread[randomIntBetween(3, 5)];
CountDownLatch startGun = new CountDownLatch(thread.length);
AtomicInteger offset = new AtomicInteger(-1);
Expand Down Expand Up @@ -877,7 +877,7 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e
}
}

protected MapperService createMapperService(String type) throws IOException {
public static MapperService createMapperService(String type) throws IOException {
IndexMetaData indexMetaData = IndexMetaData.builder("test")
.settings(Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ccr.index.engine;

import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.InternalEngine;
Expand All @@ -18,6 +19,8 @@
*/
public final class FollowingEngine extends InternalEngine {

private final CounterMetric numOfOptimizedIndexing = new CounterMetric();

/**
* Construct a new following engine with the specified engine configuration.
*
Expand Down Expand Up @@ -51,7 +54,45 @@ private void preFlight(final Operation operation) {
@Override
protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException {
preFlight(index);
return planIndexingAsNonPrimary(index);
/*
* A note about optimization using sequence numbers:
*
* 1. Indexing operations are processed concurrently in an engine. However, operations of the same docID are processed
* one by one under the docID lock.
*
* 2. An engine itself can resolve correctly if an operation is delivered multiple times. However, if an operation is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this note is correct? we don't execute if we see we did before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you forget to push something? this statement is not correct. There is no notion of an "optimized op" (for replicas) just an op with a seq# about the MSU. Also "However, if an operation is optimized and delivered multiple times, it will be appended into Lucene more than once." reads weird. Maybe as simple as "Operations that are optimized using the MSU optimization may not be processed twice as this will create duplicates in lucene. To avoid it we check the local checkpoint tracker to see if an operation was already processed".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've applied your suggestion.

* optimized and delivered multiple times, it will be appended into Lucene more than once. We void this issue by
* never optimizing an operation if it was processed in the engine (using LocalCheckpointTracker).
*
* 3. When replicating operations to replicas or followers, we also carry the max seq_no_of_updates_or_deletes on the
* leader to followers. This transfer guarantees the MUS on a follower when operation O is processed at least the
* MUS on the leader when it was executed.
*
* 4. The following proves that docID(O) does not exist on a follower when operation O is applied if MSU(O) <= LCP < seqno(O):
*
* 4.1) If such operation O' with docID(O’) = docID(O), and LCP < seqno(O’), then MSU(O) >= MSU(O') because O' was
* delivered to the follower before O. MUS(0') on the leader is at least seqno(O) or seqno(0') and both > LCP.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0' (zero) -> O'

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is MUS(0') on the leader is at least seqno(O) or seqno(0') ?

* This contradicts the assumption [MSU(O) <= LCP].
*
* 4.2) MSU(O) < seqno(O) then docID(O) does not exist when O is applied on a leader. This means docID(O) does not exist
* after we apply every operation with docID = docID(O) and seqno < seqno(O). On the follower, we have applied every
* operation with seqno <= LCP, and there is no such O' with docID(O’) = docID(O) and LCP < seqno(O’)[4.1].
* These mean the follower has applied every operation with docID = docID(O) and seqno < seqno(O).
* Thus docID(O) does not exist on the follower.
*/
final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes();
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
if (containsOperation(index)) {
return IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version());

} else if (maxSeqNoOfUpdatesOrDeletes <= getLocalCheckpoint()) {
assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : "seq_no[" + index.seqNo() + "] <= msu[" + maxSeqNoOfUpdatesOrDeletes + "]";
numOfOptimizedIndexing.inc();
return InternalEngine.IndexingStrategy.optimizedAppendOnly(index.seqNo(), index.version());

} else {
return planIndexingAsNonPrimary(index);
}
}

@Override
Expand Down Expand Up @@ -85,4 +126,11 @@ protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) {
return true;
}

/**
* Returns the number of indexing operations that have been optimized (bypass version lookup) using sequence numbers in this engine.
* This metric is not persisted, and started from 0 when the engine is opened.
*/
public long getNumberOfOptimizedIndexing() {
return numOfOptimizedIndexing.count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
Expand All @@ -49,6 +50,7 @@
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
Expand Down Expand Up @@ -202,7 +204,7 @@ public void testFollowIndex() throws Exception {
for (int i = 0; i < firstBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
}

assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs);
unfollowIndex("index2");
client().execute(FollowIndexAction.INSTANCE, followRequest).get();
final int secondBatchNumDocs = randomIntBetween(2, 64);
Expand All @@ -226,6 +228,7 @@ public void testFollowIndex() throws Exception {
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
}
assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs + secondBatchNumDocs);
unfollowIndex("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), numberOfPrimaryShards);
}
Expand Down Expand Up @@ -342,6 +345,8 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
assertThat(bulkProcessor.awaitClose(1L, TimeUnit.MINUTES), is(true));

assertSameDocCount("index1", "index2");
assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfShards,
client().prepareSearch("index2").get().getHits().totalHits);
unfollowIndex("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), numberOfShards);
}
Expand Down Expand Up @@ -766,6 +771,27 @@ private void assertMaxSeqNoOfUpdatesIsTransferred(Index leaderIndex, Index follo
});
}

private void assertTotalNumberOfOptimizedIndexing(Index followerIndex, int numberOfShards, long expectedTotal) throws Exception {
assertBusy(() -> {
long[] numOfOptimizedOps = new long[numberOfShards];
for (int shardId = 0; shardId < numberOfShards; shardId++) {
for (String node : internalCluster().nodesInclude(followerIndex.getName())) {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexShard shard = indicesService.getShardOrNull(new ShardId(followerIndex, shardId));
if (shard != null) {
try {
FollowingEngine engine = ((FollowingEngine) IndexShardTestCase.getEngine(shard));
numOfOptimizedOps[shardId] = engine.getNumberOfOptimizedIndexing();
} catch (AlreadyClosedException e) {
throw new AssertionError(e); // causes assertBusy to retry
}
}
}
}
assertThat(Arrays.stream(numOfOptimizedOps).sum(), equalTo(expectedTotal));
});
}

public static FollowIndexAction.Request createFollowRequest(String leaderIndex, String followerIndex) {
FollowIndexAction.Request request = new FollowIndexAction.Request();
request.setLeaderIndex(leaderIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;

import java.io.IOException;
Expand Down Expand Up @@ -72,6 +73,9 @@ public void testSimpleCcrReplication() throws Exception {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
followerGroup.assertAllEqual(indexedDocIds.size());
});
for (IndexShard shard : followerGroup) {
assertThat(((FollowingEngine) (getEngine(shard))).getNumberOfOptimizedIndexing(), equalTo((long) docCount));
}
// Deletes should be replicated to the follower
List<String> deleteDocIds = randomSubsetOf(indexedDocIds);
for (String deleteId : deleteDocIds) {
Expand Down
Loading