Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
Expand Down Expand Up @@ -150,6 +151,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
final long version = indexResult.getVersion();
indexRequest.version(version);
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
indexRequest.seqNo(indexResult.getSeqNo());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe assert at the end of this method that the seqNo is set on the replica request?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 1c71393.

assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(), indexResult.getSeqNo(),
indexResult.getVersion(), indexResult.isCreated());
Expand All @@ -173,6 +175,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
// update the request with the version so it will go to the replicas
deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery());
deleteRequest.version(deleteResult.getVersion());
deleteRequest.seqNo(deleteResult.getSeqNo());
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
response = new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(), deleteResult.getSeqNo(),
deleteResult.getVersion(), deleteResult.isFound());
Expand All @@ -182,6 +185,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
break;
default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found");
}

// update the bulk item request because update request execution can mutate the bulk item request
request.items()[requestIndex] = replicaRequest;
if (operationResult == null) { // in case of noop update operation
Expand Down Expand Up @@ -282,6 +286,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
final long version = updateOperationResult.getVersion();
indexRequest.version(version);
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
indexRequest.seqNo(updateOperationResult.getSeqNo());
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
}
break;
Expand All @@ -292,6 +297,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
// update the request with the version so it will go to the replicas
deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery());
deleteRequest.version(updateOperationResult.getVersion());
deleteRequest.seqNo(updateOperationResult.getSeqNo());
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
}
break;
Expand Down Expand Up @@ -342,6 +348,10 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
replicaRequest = new BulkItemRequest(request.items()[requestIndex].id(), updateDeleteRequest);
break;
}
assert (replicaRequest.request() instanceof IndexRequest
&& ((IndexRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) ||
(replicaRequest.request() instanceof DeleteRequest
&& ((DeleteRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO);
// successful operation
break; // out of retry loop
} else if (updateOperationResult.getFailure() instanceof VersionConflictEngineException == false) {
Expand All @@ -364,10 +374,10 @@ protected WriteReplicaResult shardOperationOnReplica(BulkShardRequest request, I
switch (docWriteRequest.opType()) {
case CREATE:
case INDEX:
operationResult = executeIndexRequestOnReplica(((IndexRequest) docWriteRequest), replica);
operationResult = executeIndexRequestOnReplica((IndexRequest) docWriteRequest, replica);
break;
case DELETE:
operationResult = executeDeleteRequestOnReplica(((DeleteRequest) docWriteRequest), replica);
operationResult = executeDeleteRequestOnReplica((DeleteRequest) docWriteRequest, replica);
break;
default:
throw new IllegalStateException("Unexpected request operation type on replica: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, Inde
// update the request with the version so it will go to the replicas
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
request.version(result.getVersion());
request.seqNo(result.getSeqNo());
assert request.versionType().validateVersionForWrites(request.version());
response = new DeleteResponse(
primary.shardId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,18 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
throw new IllegalArgumentException(openMode.toString());
}
logger.trace("recovered [{}]", seqNoStats);
indexWriter = writer;
seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats);
// norelease
/*
* We have no guarantees that all operations above the local checkpoint are in the Lucene commit or the translog. This means
* that we there might be operations greater than the local checkpoint that will not be replayed. Here we force the local
* checkpoint to the maximum sequence number in the commit (at the potential expense of correctness).
*/
while (seqNoService.getLocalCheckpoint() < seqNoService.getMaxSeqNo()) {
final long next = seqNoService.getLocalCheckpoint() + 1;
seqNoService.markSeqNoAsCompleted(next);
}
indexWriter = writer;
translog = openTranslog(engineConfig, writer, seqNoService::getGlobalCheckpoint);
assert translog.getGeneration() != null;
} catch (IOException | TranslogCorruptedException e) {
Expand Down Expand Up @@ -638,16 +648,23 @@ private IndexResult innerIndex(Index index) throws IOException {
}
}
final long expectedVersion = index.version();
if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) {
// skip index operation because of version conflict on recovery
indexResult = new IndexResult(expectedVersion, SequenceNumbersService.UNASSIGNED_SEQ_NO, false);
} else {
final long seqNo;
if (index.origin() == Operation.Origin.PRIMARY) {
final boolean conflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this is a replica and we have a version conflict and throws an exception? I think we still end up not marking the seq no as completed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about something like this?

diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index e75dc47..d16c11e 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -86,6 +86,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Function;
 import java.util.function.LongSupplier;
+import java.util.function.Supplier;
 
 public class InternalEngine extends Engine {
 
@@ -444,11 +445,18 @@ public class InternalEngine extends Engine {
         }
     }
 
-    private boolean checkVersionConflict(
+    /**
+     * checks for version conflicts and returns the right result object if conflict was detected. returns `null`
+     * if no conflicts was found and indexing should proceed as normal
+     */
+    private <T extends Result> T checkVersionConflict(
             final Operation op,
             final long currentVersion,
             final long expectedVersion,
-            final boolean deleted) {
+            final boolean deleted,
+            final Supplier<T> resultOnSuccess,
+            final Function<Exception, T> resultOnFailure) {
+        final T result;
         if (op.versionType() == VersionType.FORCE) {
             if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
                 // If index was created in 5.0 or later, 'force' is not allowed at all
@@ -461,15 +469,17 @@ public class InternalEngine extends Engine {
 
         if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
             if (op.origin().isRecovery()) {
-                // version conflict, but okay
-                return true;
+                // version conflict, but okay, mark as success
+                result = resultOnSuccess.get();
             } else {
                 // fatal version conflict
-                throw new VersionConflictEngineException(shardId, op.type(), op.id(),
-                        op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
+                result = resultOnFailure.apply(new VersionConflictEngineException(shardId, op.type(), op.id(),
+                    op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
             }
+        } else {
+            result = null;
         }
-        return false;
+        return result;
     }
 
     private long checkDeletedAndGCed(VersionValue versionValue) {
@@ -584,6 +594,7 @@ public class InternalEngine extends Engine {
         final Translog.Location location;
         final long updatedVersion;
         IndexResult indexResult = null;
+        long seqNo = index.seqNo();
         try (Releasable ignored = acquireLock(index.uid())) {
             lastWriteNanos = index.startTime();
             /* if we have an autoGeneratedID that comes into the engine we can potentially optimize
@@ -648,23 +659,18 @@ public class InternalEngine extends Engine {
                 }
             }
             final long expectedVersion = index.version();
-            final boolean conflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted);
+            IndexResult result = checkVersionConflict(index, currentVersion, expectedVersion, deleted,
+                () -> new IndexResult(currentVersion, index.seqNo(), false),
+                exception -> new IndexResult(exception, currentVersion, index.seqNo()));
 
-            final long seqNo;
-            if (index.origin() == Operation.Origin.PRIMARY) {
-                if (!conflict) {
+            if (result == null) {
+
+                if (index.origin() == Operation.Origin.PRIMARY) {
                     seqNo = seqNoService.generateSeqNo();
-                } else {
-                    seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
                 }
-            } else {
-                seqNo = index.seqNo();
-            }
 
-            if (conflict) {
-                // skip index operation because of version conflict on recovery
-                indexResult = new IndexResult(expectedVersion, seqNo, false);
-            } else {
+                index.parsedDoc().updateSeqID(seqNo, index.primaryTerm());
+
                 updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
                 index.parsedDoc().version().setLongValue(updatedVersion);
 
@@ -686,8 +692,8 @@ public class InternalEngine extends Engine {
                 }
                 indexResult = new IndexResult(updatedVersion, seqNo, deleted);
                 location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
-                        ? translog.add(new Translog.Index(index, indexResult))
-                        : null;
+                    ? translog.add(new Translog.Index(index, indexResult))
+                    : null;
                 versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
                 indexResult.setTranslogLocation(location);
             }
@@ -695,7 +701,7 @@ public class InternalEngine extends Engine {
             indexResult.freeze();
             return indexResult;
         } finally {
-            if (indexResult != null && indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
+            if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
                 seqNoService.markSeqNoAsCompleted(indexResult.getSeqNo());
             }
         }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strengthened the test to include the replica case (it caught the issue), and incorporated your suggestion.


final long seqNo;
if (index.origin() == Operation.Origin.PRIMARY) {
if (!conflict) {
seqNo = seqNoService.generateSeqNo();
} else {
seqNo = index.seqNo();
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
} else {
seqNo = index.seqNo();
}

if (conflict) {
// skip index operation because of version conflict on recovery
indexResult = new IndexResult(expectedVersion, seqNo, false);
} else {
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
index.parsedDoc().version().setLongValue(updatedVersion);

Expand Down Expand Up @@ -764,16 +781,24 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
}

final long expectedVersion = delete.version();
if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) {
// skip executing delete because of version conflict on recovery
deleteResult = new DeleteResult(expectedVersion, SequenceNumbersService.UNASSIGNED_SEQ_NO, true);
} else {
final long seqNo;
if (delete.origin() == Operation.Origin.PRIMARY) {

final boolean conflict = checkVersionConflict(delete, currentVersion, expectedVersion, deleted);

final long seqNo;
if (delete.origin() == Operation.Origin.PRIMARY) {
if (!conflict) {
seqNo = seqNoService.generateSeqNo();
} else {
seqNo = delete.seqNo();
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
} else {
seqNo = delete.seqNo();
}

if (conflict) {
// skip executing delete because of version conflict on recovery
deleteResult = new DeleteResult(expectedVersion, seqNo, true);
} else {
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue);
deleteResult = new DeleteResult(updatedVersion, seqNo, found);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,14 @@ public synchronized long getCheckpoint() {
* updates the global checkpoint on a replica shard (after it has been updated by the primary).
*/
synchronized void updateCheckpointOnReplica(long globalCheckpoint) {
/*
* The global checkpoint here is a local knowledge which is updated under the mandate of the primary. It can happen that the primary
* information is lagging compared to a replica (e.g., if a replica is promoted to primary but has stale info relative to other
* replica shards). In these cases, the local knowledge of the global checkpoint could be higher than sync from the lagging primary.
*/
if (this.globalCheckpoint <= globalCheckpoint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment about when the current global checkpoint can be higher? here is what I wrote in #10708

Note that the global checkpoint is a local knowledge of that is update under the mandate of the primary. It may be that the primary information is lagging compared to a replica. This can happen when a replica is promoted to a primary (but still has stale info).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 1c71393.

this.globalCheckpoint = globalCheckpoint;
logger.trace("global checkpoint updated from primary to [{}]", globalCheckpoint);
} else {
throw new IllegalArgumentException("global checkpoint from primary should never decrease. current [" +
this.globalCheckpoint + "], got [" + globalCheckpoint + "]");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,9 @@ class FinalizeRecoveryRequestHandler implements TransportRequestHandler<Recovery

@Override
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception {
try (RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()))
{
recoveryRef.status().finalizeRecovery();
try (RecoveriesCollection.RecoveryRef recoveryRef =
onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
recoveryRef.status().finalizeRecovery(request.globalCheckpoint());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.elasticsearch.indices.recovery;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;

Expand All @@ -29,15 +31,16 @@
public class RecoveryFinalizeRecoveryRequest extends TransportRequest {

private long recoveryId;

private ShardId shardId;
private long globalCheckpoint;

public RecoveryFinalizeRecoveryRequest() {
}

RecoveryFinalizeRecoveryRequest(long recoveryId, ShardId shardId) {
RecoveryFinalizeRecoveryRequest(final long recoveryId, final ShardId shardId, final long globalCheckpoint) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.globalCheckpoint = globalCheckpoint;
}

public long recoveryId() {
Expand All @@ -48,17 +51,30 @@ public ShardId shardId() {
return shardId;
}

public long globalCheckpoint() {
return globalCheckpoint;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
globalCheckpoint = in.readZLong();
} else {
globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(recoveryId);
shardId.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeZLong(globalCheckpoint);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,8 @@ public void finalizeRecovery() {
StopWatch stopWatch = new StopWatch().start();
logger.trace("[{}][{}] finalizing recovery to {}", indexName, shardId, request.targetNode());
cancellableThreads.execute(() -> {
recoveryTarget.finalizeRecovery();
shard.markAllocationIdAsInSync(recoveryTarget.getTargetAllocationId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

paranoia - can we flip this around and mark the target allocation as "in sync" before we give it the global checkpoint? it at least reads better as "we know you are in sync and therefore every global checkpoint advances will take you into account"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 1c71393.

recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint());
});

if (request.isPrimaryRelocation()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,8 @@ public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAut
}

@Override
public void finalizeRecovery() {
public void finalizeRecovery(final long globalCheckpoint) {
indexShard().updateGlobalCheckpointOnReplica(globalCheckpoint);
final IndexShard indexShard = indexShard();
indexShard.finalizeRecovery();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ public interface RecoveryTargetHandler {
void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException;

/**
* The finalize request clears unreferenced translog files, refreshes the engine now that
* new segments are available, and enables garbage collection of
* tombstone files.
**/
void finalizeRecovery();
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and
* updates the global checkpoint.
*
* @param globalCheckpoint the global checkpoint on the recovery source
*/
void finalizeRecovery(long globalCheckpoint);

/**
* Blockingly waits for cluster state with at least clusterStateVersion to be available
Expand Down Expand Up @@ -82,4 +83,5 @@ void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReferenc
* @return the allocation id of the target shard.
*/
String getTargetAllocationId();

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAut
}

@Override
public void finalizeRecovery() {
public void finalizeRecovery(final long globalCheckpoint) {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FINALIZE,
new RecoveryFinalizeRecoveryRequest(recoveryId, shardId),
new RecoveryFinalizeRecoveryRequest(recoveryId, shardId, globalCheckpoint),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
Expand Down
Loading