Skip to content
11 changes: 11 additions & 0 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1690,6 +1690,17 @@ public boolean isRecovering() {
*/
public abstract void maybePruneDeletes();

/**
* Returns the maximum auto-generated timestamp of append-only requests has been processed by this engine.
*/
public abstract long getMaxAutoIdTimestamp();

/**
* Sets the maximum auto-generated timestamp of append-only requests tracked by this engine to {@code newTimestamp}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to speak about updating the unsafe marker here?

* The update only takes effect if the current timestamp is smaller the new given parameter.
*/
public abstract void updateMaxAutoIdTimestamp(long newTimestamp);

@FunctionalInterface
public interface TranslogRecoveryRunner {
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1014,8 +1014,7 @@ private boolean mayHaveBeenIndexedBefore(Index index) {
final boolean mayHaveBeenIndexBefore;
if (index.isRetry()) {
mayHaveBeenIndexBefore = true;
maxUnsafeAutoIdTimestamp.updateAndGet(curr -> Math.max(index.getAutoGeneratedIdTimestamp(), curr));
assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp();
updateMaxAutoIdTimestamp(index.getAutoGeneratedIdTimestamp());
} else {
// in this case we force
mayHaveBeenIndexBefore = maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp();
Expand Down Expand Up @@ -2531,4 +2530,16 @@ void updateRefreshedCheckpoint(long checkpoint) {
assert refreshedCheckpoint.get() >= checkpoint : refreshedCheckpoint.get() + " < " + checkpoint;
}
}

@Override
public long getMaxAutoIdTimestamp() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just push the maxUnsafeAutoIdTimestamp up to engine and make the methods final

return maxUnsafeAutoIdTimestamp.get();
}

@Override
public void updateMaxAutoIdTimestamp(long newTimestamp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - updateMaxUnsafeAutoIdTimestamp

assert newTimestamp >= -1 : "invalid timestamp [" + newTimestamp + "]";
maxUnsafeAutoIdTimestamp.updateAndGet(curr -> Math.max(curr, newTimestamp));
assert newTimestamp <= maxUnsafeAutoIdTimestamp.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.Lock;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.core.internal.io.IOUtils;
Expand Down Expand Up @@ -365,4 +366,14 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) {
@Override
public void maybePruneDeletes() {
}

@Override
public long getMaxAutoIdTimestamp() {
return IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
}

@Override
public void updateMaxAutoIdTimestamp(long newTimestamp) {

}
}
15 changes: 15 additions & 0 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,21 @@ public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) {
getEngine().trimOperationsFromTranslog(operationPrimaryTerm, aboveSeqNo);
}

/**
* Sets the maximum auto-generated timestamp of append-only requests tracked by this shard to {@code newTimestamp}.
* The update only takes effect if the current timestamp is smaller the new given parameter.
*/
public void updateMaxAutoIdTimestamp(long newTimestamp) {
getEngine().updateMaxAutoIdTimestamp(newTimestamp);
}

/**
* Returns the maximum auto-generated timestamp of append-only requests has been processed by this shard.
*/
public long getMaxAutoIdTimestamp() {
return getEngine().getMaxAutoIdTimestamp();
}

public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine.Operation.Origin origin) throws IOException {
// If a translog op is replayed on the primary (eg. ccr), we need to use external instead of null for its version type.
final VersionType versionType = (origin == Engine.Operation.Origin.PRIMARY) ? VersionType.EXTERNAL : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ public void messageReceived(final RecoveryTranslogOperationsRequest request, fin
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
final RecoveryTarget recoveryTarget = recoveryRef.target();
try {
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps());
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(), request.maxAutoIdTimestamp());
channel.sendResponse(new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint()));
} catch (MapperException exception) {
// in very rare cases a translog replay from primary is processed before a mapping update on this node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ public RecoveryResponse recoverToTarget() throws IOException {
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()),
shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger);

// DISCUSS: Is it possible to have an operation gets delivered via recovery first, then delivered via replication?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bleskes and @ywelsch Could you please check this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think this is possible. We add the target to the replication group, and then collect the operations from the translog (or Lucene with soft deletes) to send to the target. It's possible that an operation can arrive on the primary, enter the translog on the primary, and then an evil OS scheduler puts the thread handling the replication to the target to sleep. At this moment recovery can execute copying the operation to the target. Then, our thread can wake up and the operation arrive by replication.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jasontedor. I had a same thought but was not sure because I failed to come up with a test. Now I have an idea to write that test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a few sinister tests along these lines, where we latch operations in the engine to stall them for nefarious purposes. 😇

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I have a test now. However, this corner case is protected by SeqNo. We are all good now.

if (appendOnlyRequest && mayHaveBeenIndexedBefore(index) == false && index.seqNo() > maxSeqNoOfNonAppendOnlyOperations.get()) {
.

// If this is the case, we need to propagate the max_timestamp of all append-only, not only retry requests.
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
/*
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
Expand Down Expand Up @@ -551,8 +553,8 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long require
logger.trace("no translog operations to send");
}

final CancellableThreads.IOInterruptable sendBatch =
() -> targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps));
final CancellableThreads.IOInterruptable sendBatch = () ->
targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps, shard.getMaxAutoIdTimestamp()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using a new one for every batch that is to be sent, I would prefer to capture this after we call cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); in RecoverySourceHandler, and then only pass that same value. You could also add a comment then and there saying why we do it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to do this after the snapshot was captured. That said, I'm +1 on explicitly capturing it once at the right moment and use the same value.


// send operations in batches
Translog.Operation operation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,13 +386,15 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar
}

@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException {
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxAutoIdTimestamp) throws IOException {
final RecoveryState.Translog translog = state().getTranslog();
translog.totalOperations(totalTranslogOps);
assert indexShard().recoveryState() == state();
if (indexShard().state() != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, indexShard().state());
}
indexShard().updateMaxAutoIdTimestamp(maxAutoIdTimestamp);
for (Translog.Operation operation : operations) {
Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY);
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ public interface RecoveryTargetHandler {

/**
* Index a set of translog operations on the target
* @param operations operations to index
* @param totalTranslogOps current number of total operations expected to be indexed
*
* @param operations operations to index
* @param totalTranslogOps current number of total operations expected to be indexed
* @param maxAutoIdTimestamp the maximum auto-generated timestamp from the primary shard
* @return the local checkpoint on the target shard
*/
long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException;
long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxAutoIdTimestamp) throws IOException;

/**
* Notifies the target of the files it is going to receive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.indices.recovery;

import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -34,15 +36,18 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
private ShardId shardId;
private List<Translog.Operation> operations;
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
private long maxAutoIdTimestamp;

public RecoveryTranslogOperationsRequest() {
}

RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List<Translog.Operation> operations, int totalTranslogOps) {
RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List<Translog.Operation> operations,
int totalTranslogOps, long maxAutoIdTimestamp) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.operations = operations;
this.totalTranslogOps = totalTranslogOps;
this.maxAutoIdTimestamp = maxAutoIdTimestamp;
}

public long recoveryId() {
Expand All @@ -61,13 +66,22 @@ public int totalTranslogOps() {
return totalTranslogOps;
}

public long maxAutoIdTimestamp() {
return maxAutoIdTimestamp;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
operations = Translog.readOperations(in, "recovery");
totalTranslogOps = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
maxAutoIdTimestamp = in.readZLong();
} else {
maxAutoIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
}
}

@Override
Expand All @@ -77,5 +91,8 @@ public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
Translog.writeOperations(out, operations);
out.writeVInt(totalTranslogOps);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeZLong(maxAutoIdTimestamp);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar
}

@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) {
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxAutoIdTimestamp) {
final RecoveryTranslogOperationsRequest translogOperationsRequest =
new RecoveryTranslogOperationsRequest(recoveryId, shardId, operations, totalTranslogOps);
new RecoveryTranslogOperationsRequest(recoveryId, shardId, operations, totalTranslogOps, maxAutoIdTimestamp);
final TransportFuture<RecoveryTranslogOperationsResponse> future = transportService.submitRequest(
targetNode,
PeerRecoveryTargetService.Actions.TRANSLOG_OPS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.Version;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkShardRequest;
Expand Down Expand Up @@ -141,10 +142,81 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa
}
}

public void testRetryAppendOnlyWhileRecovering() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you mean after recovering here.

try (ReplicationGroup shards = createGroup(0)) {
shards.startAll();
final IndexRequest originalRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON);
originalRequest.process(Version.CURRENT, null, index.getName());
final IndexRequest retryRequest = copyIndexRequest(originalRequest);
retryRequest.onRetry();
shards.index(retryRequest);
IndexShard replica = shards.addReplica();
shards.recoverReplica(replica);
shards.assertAllEqual(1);
shards.index(originalRequest);
shards.assertAllEqual(1);
assertThat(replica.getMaxAutoIdTimestamp(), equalTo(originalRequest.getAutoGeneratedTimestamp()));
assertThat(replica.getMaxAutoIdTimestamp(), equalTo(shards.getPrimary().getMaxAutoIdTimestamp()));
}
}

public void testAppendOnlyRecoveryThenReplication() throws Exception {
CountDownLatch indexedOnPrimary = new CountDownLatch(1);
CountDownLatch recoveryDone = new CountDownLatch(1);
try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(1)) {
@Override
protected EngineFactory getEngineFactory(ShardRouting routing) {
return config -> new InternalEngine(config) {
@Override
public IndexResult index(Index op) throws IOException {
IndexResult result = super.index(op);
if (op.origin() == Operation.Origin.PRIMARY) {
indexedOnPrimary.countDown();
// prevent the indexing on the primary from returning (it was added to Lucene and translog already)
// to make sure that this operation is replicated to the replica via recovery, then via replication.
try {
recoveryDone.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
return result;
}
};
}
}) {
shards.startAll();
Thread thread = new Thread(() -> {
IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON);
try {
shards.index(indexRequest);
} catch (Exception e) {
throw new AssertionError(e);
}
});
thread.start();
IndexShard replica = shards.addReplica();
Future<Void> fut = shards.asyncRecoverReplica(replica,
(shard, node) -> new RecoveryTarget(shard, node, recoveryListener, v -> {}){
@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException {
try {
indexedOnPrimary.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps);
}
});
fut.get();
recoveryDone.countDown();
thread.join();
shards.assertAllEqual(1);
}
}

public void testInheritMaxValidAutoIDTimestampOnRecovery() throws Exception {
//TODO: Enables this test with soft-deletes once we have timestamp
Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false).build();
try (ReplicationGroup shards = createGroup(0, settings)) {
try (ReplicationGroup shards = createGroup(0)) {
shards.startAll();
final IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON);
indexRequest.onRetry(); // force an update of the timestamp
Expand All @@ -161,6 +233,7 @@ public void testInheritMaxValidAutoIDTimestampOnRecovery() throws Exception {
assertNotEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, primarySegmentStats.getMaxUnsafeAutoIdTimestamp());
assertEquals(primarySegmentStats.getMaxUnsafeAutoIdTimestamp(), segmentsStats.getMaxUnsafeAutoIdTimestamp());
assertNotEquals(Long.MAX_VALUE, segmentsStats.getMaxUnsafeAutoIdTimestamp());
assertThat(replica.getMaxAutoIdTimestamp(), equalTo(shards.getPrimary().getMaxAutoIdTimestamp()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,9 +488,10 @@ protected EngineFactory getEngineFactory(ShardRouting routing) {
return new RecoveryTarget(indexShard, node, recoveryListener, l -> {
}) {
@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException {
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxAutoIdTimestamp) throws IOException {
opsSent.set(true);
return super.indexTranslogOperations(operations, totalTranslogOps);
return super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp);
}
};
});
Expand Down Expand Up @@ -557,7 +558,8 @@ protected EngineFactory getEngineFactory(final ShardRouting routing) {
replica,
(indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) {
@Override
public long indexTranslogOperations(final List<Translog.Operation> operations, final int totalTranslogOps)
public long indexTranslogOperations(final List<Translog.Operation> operations, final int totalTranslogOps,
final long maxAutoIdTimestamp)
throws IOException {
// index a doc which is not part of the snapshot, but also does not complete on replica
replicaEngineFactory.latchIndexers(1);
Expand Down Expand Up @@ -585,7 +587,7 @@ public long indexTranslogOperations(final List<Translog.Operation> operations, f
} catch (InterruptedException e) {
throw new AssertionError(e);
}
return super.indexTranslogOperations(operations, totalTranslogOps);
return super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp);
}
});
pendingDocActiveWithExtraDocIndexed.await();
Expand Down Expand Up @@ -671,11 +673,12 @@ private void blockIfNeeded(RecoveryState.Stage currentStage) {
}

@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException {
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxAutoIdTimestamp) throws IOException {
if (hasBlocked() == false) {
blockIfNeeded(RecoveryState.Stage.TRANSLOG);
}
return super.indexTranslogOperations(operations, totalTranslogOps);
return super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp);
}

@Override
Expand Down
Loading