Skip to content

Commit df1380b

Browse files
authored
Remove versionType from translog (#31945)
With the introduction of sequence number, we no longer use versionType to resolve out of order collision in replication and recovery requests. This PR removes removes the versionType from translog. We can only remove it in 7.0 because it is still required in a mixed cluster between 6.x and 5.x.
1 parent 351bbb8 commit df1380b

File tree

16 files changed

+99
-156
lines changed

16 files changed

+99
-156
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -523,13 +523,12 @@ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse
523523
indexRequest.type(), indexRequest.id(), indexRequest.source(), indexRequest.getContentType())
524524
.routing(indexRequest.routing());
525525
result = replica.applyIndexOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(),
526-
indexRequest.versionType().versionTypeForReplicationAndRecovery(), indexRequest.getAutoGeneratedTimestamp(),
527-
indexRequest.isRetry(), sourceToParse);
526+
indexRequest.getAutoGeneratedTimestamp(), indexRequest.isRetry(), sourceToParse);
528527
break;
529528
case DELETE:
530529
DeleteRequest deleteRequest = (DeleteRequest) docWriteRequest;
531530
result = replica.applyDeleteOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(),
532-
deleteRequest.type(), deleteRequest.id(), deleteRequest.versionType().versionTypeForReplicationAndRecovery());
531+
deleteRequest.type(), deleteRequest.id());
533532
break;
534533
default:
535534
throw new IllegalStateException("Unexpected request operation type on replica: "

server/src/main/java/org/elasticsearch/index/VersionType.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,6 @@ public boolean validateVersionForReads(long version) {
8585
// not allowing Versions.NOT_FOUND as it is not a valid input value.
8686
return version > 0L || version == Versions.MATCH_ANY;
8787
}
88-
89-
@Override
90-
public VersionType versionTypeForReplicationAndRecovery() {
91-
// replicas get the version from the primary after increment. The same version is stored in
92-
// the transaction log. -> the should use the external semantics.
93-
return EXTERNAL;
94-
}
9588
},
9689
EXTERNAL((byte) 1) {
9790
@Override
@@ -333,14 +326,6 @@ public byte getValue() {
333326
*/
334327
public abstract boolean validateVersionForReads(long version);
335328

336-
/**
337-
* Some version types require different semantics for primary and replicas. This version allows
338-
* the type to override the default behavior.
339-
*/
340-
public VersionType versionTypeForReplicationAndRecovery() {
341-
return this;
342-
}
343-
344329
public static VersionType fromString(String versionType) {
345330
if ("internal".equals(versionType)) {
346331
return INTERNAL;

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1168,6 +1168,7 @@ public static class Index extends Operation {
11681168
public Index(Term uid, ParsedDocument doc, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin,
11691169
long startTime, long autoGeneratedIdTimestamp, boolean isRetry) {
11701170
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
1171+
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
11711172
this.doc = doc;
11721173
this.isRetry = isRetry;
11731174
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
@@ -1245,6 +1246,7 @@ public static class Delete extends Operation {
12451246
public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType,
12461247
Origin origin, long startTime) {
12471248
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
1249+
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
12481250
this.type = Objects.requireNonNull(type);
12491251
this.id = Objects.requireNonNull(id);
12501252
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -691,7 +691,7 @@ private boolean canOptimizeAddDocument(Index index) {
691691
return true;
692692
case PEER_RECOVERY:
693693
case REPLICA:
694-
assert index.version() == 1 && index.versionType() == VersionType.EXTERNAL
694+
assert index.version() == 1 && index.versionType() == null
695695
: "version: " + index.version() + " type: " + index.versionType();
696696
return true;
697697
case LOCAL_TRANSLOG_RECOVERY:
@@ -704,20 +704,6 @@ private boolean canOptimizeAddDocument(Index index) {
704704
return false;
705705
}
706706

707-
private boolean assertVersionType(final Engine.Operation operation) {
708-
if (operation.origin() == Operation.Origin.REPLICA ||
709-
operation.origin() == Operation.Origin.PEER_RECOVERY ||
710-
operation.origin() == Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
711-
// ensure that replica operation has expected version type for replication
712-
// ensure that versionTypeForReplicationAndRecovery is idempotent
713-
assert operation.versionType() == operation.versionType().versionTypeForReplicationAndRecovery()
714-
: "unexpected version type in request from [" + operation.origin().name() + "] " +
715-
"found [" + operation.versionType().name() + "] " +
716-
"expected [" + operation.versionType().versionTypeForReplicationAndRecovery().name() + "]";
717-
}
718-
return true;
719-
}
720-
721707
private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) {
722708
if (origin == Operation.Origin.PRIMARY) {
723709
assert assertOriginPrimarySequenceNumber(seqNo);
@@ -757,7 +743,6 @@ public IndexResult index(Index index) throws IOException {
757743
try (ReleasableLock releasableLock = readLock.acquire()) {
758744
ensureOpen();
759745
assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
760-
assert assertVersionType(index);
761746
try (Releasable ignored = versionMap.acquireLock(index.uid().bytes());
762747
Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) {
763748
lastWriteNanos = index.startTime();
@@ -860,9 +845,6 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio
860845
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of index [" + index.seqNo() + "]";
861846
}
862847
versionMap.enforceSafeAccess();
863-
// drop out of order operations
864-
assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() :
865-
"resolving out of order delivery based on versioning but version type isn't fit for it. got [" + index.versionType() + "]";
866848
// unlike the primary, replicas don't really care to about creation status of documents
867849
// this allows to ignore the case where a document was found in the live version maps in
868850
// a delete state and return false for the created flag in favor of code simplicity
@@ -1096,7 +1078,6 @@ private void updateDocs(final Term uid, final List<ParseContext.Document> docs,
10961078
public DeleteResult delete(Delete delete) throws IOException {
10971079
versionMap.enforceSafeAccess();
10981080
assert Objects.equals(delete.uid().field(), IdFieldMapper.NAME) : delete.uid().field();
1099-
assert assertVersionType(delete);
11001081
assert assertIncomingSequenceNumber(delete.origin(), delete.seqNo());
11011082
final DeleteResult deleteResult;
11021083
// NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:
@@ -1149,10 +1130,6 @@ public DeleteResult delete(Delete delete) throws IOException {
11491130

11501131
private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException {
11511132
assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
1152-
// drop out of order operations
1153-
assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() :
1154-
"resolving out of order delivery based on versioning but version type isn't fit for it. got ["
1155-
+ delete.versionType() + "]";
11561133
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr));
11571134
assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" +
11581135
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]";

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -645,22 +645,22 @@ private IndexShardState changeState(IndexShardState newState, String reason) {
645645

646646
public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType versionType, SourceToParse sourceToParse,
647647
long autoGeneratedTimestamp, boolean isRetry) throws IOException {
648+
assert versionType.validateVersionForWrites(version);
648649
return applyIndexOperation(SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, versionType, autoGeneratedTimestamp,
649650
isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse);
650651
}
651652

652-
public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, VersionType versionType,
653-
long autoGeneratedTimeStamp, boolean isRetry, SourceToParse sourceToParse)
653+
public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, long autoGeneratedTimeStamp,
654+
boolean isRetry, SourceToParse sourceToParse)
654655
throws IOException {
655-
return applyIndexOperation(seqNo, primaryTerm, version, versionType, autoGeneratedTimeStamp, isRetry,
656+
return applyIndexOperation(seqNo, primaryTerm, version, null, autoGeneratedTimeStamp, isRetry,
656657
Engine.Operation.Origin.REPLICA, sourceToParse);
657658
}
658659

659-
private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, long version, VersionType versionType,
660+
private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, long version, @Nullable VersionType versionType,
660661
long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin,
661662
SourceToParse sourceToParse) throws IOException {
662663
assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
663-
assert versionType.validateVersionForWrites(version);
664664
ensureWriteAllowed(origin);
665665
Engine.Index operation;
666666
try {
@@ -736,19 +736,18 @@ private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) {
736736

737737
public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String type, String id, VersionType versionType)
738738
throws IOException {
739+
assert versionType.validateVersionForWrites(version);
739740
return applyDeleteOperation(SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, type, id, versionType,
740741
Engine.Operation.Origin.PRIMARY);
741742
}
742743

743-
public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id,
744-
VersionType versionType) throws IOException {
745-
return applyDeleteOperation(seqNo, primaryTerm, version, type, id, versionType, Engine.Operation.Origin.REPLICA);
744+
public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id) throws IOException {
745+
return applyDeleteOperation(seqNo, primaryTerm, version, type, id, null, Engine.Operation.Origin.REPLICA);
746746
}
747747

748748
private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm, long version, String type, String id,
749-
VersionType versionType, Engine.Operation.Origin origin) throws IOException {
749+
@Nullable VersionType versionType, Engine.Operation.Origin origin) throws IOException {
750750
assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
751-
assert versionType.validateVersionForWrites(version);
752751
ensureWriteAllowed(origin);
753752
// When there is a single type, the unique identifier is only composed of the _id,
754753
// so there is no way to differenciate foo#1 from bar#1. This is especially an issue
@@ -1211,14 +1210,14 @@ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine
12111210
// we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all
12121211
// autoGeneratedID docs that are coming from the primary are updated correctly.
12131212
result = applyIndexOperation(index.seqNo(), index.primaryTerm(), index.version(),
1214-
index.versionType().versionTypeForReplicationAndRecovery(), index.getAutoGeneratedIdTimestamp(), true, origin,
1213+
null, index.getAutoGeneratedIdTimestamp(), true, origin,
12151214
source(shardId.getIndexName(), index.type(), index.id(), index.source(),
12161215
XContentHelper.xContentType(index.source())).routing(index.routing()));
12171216
break;
12181217
case DELETE:
12191218
final Translog.Delete delete = (Translog.Delete) operation;
12201219
result = applyDeleteOperation(delete.seqNo(), delete.primaryTerm(), delete.version(), delete.type(), delete.id(),
1221-
delete.versionType().versionTypeForReplicationAndRecovery(), origin);
1220+
null, origin);
12221221
break;
12231222
case NO_OP:
12241223
final Translog.NoOp noOp = (Translog.NoOp) operation;

0 commit comments

Comments
 (0)