Skip to content

Commit 96e2b6c

Browse files
committed
Propagate auto_id_timestamp in primary-replica resync (#33964)
A follow-up of #33693 to propagate max_seen_auto_id_timestamp in a primary-replica resync. Relates #33693
1 parent 2774c89 commit 96e2b6c

File tree

6 files changed

+86
-12
lines changed

6 files changed

+86
-12
lines changed

server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.elasticsearch.action.resync;
2020

2121
import org.elasticsearch.Version;
22+
import org.elasticsearch.action.index.IndexRequest;
2223
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
2324
import org.elasticsearch.common.io.stream.StreamInput;
2425
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -28,6 +29,7 @@
2829

2930
import java.io.IOException;
3031
import java.util.Arrays;
32+
import java.util.Objects;
3133

3234
/**
3335
* Represents a batch of operations sent from the primary to its replicas during the primary-replica resync.
@@ -36,22 +38,28 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest<Resyn
3638

3739
private long trimAboveSeqNo;
3840
private Translog.Operation[] operations;
41+
private long maxSeenAutoIdTimestampOnPrimary;
3942

4043
ResyncReplicationRequest() {
4144
super();
4245
}
4346

44-
public ResyncReplicationRequest(final ShardId shardId, final long trimAboveSeqNo,
45-
final Translog.Operation[] operations) {
47+
public ResyncReplicationRequest(final ShardId shardId, final long trimAboveSeqNo, final long maxSeenAutoIdTimestampOnPrimary,
48+
final Translog.Operation[]operations) {
4649
super(shardId);
4750
this.trimAboveSeqNo = trimAboveSeqNo;
51+
this.maxSeenAutoIdTimestampOnPrimary = maxSeenAutoIdTimestampOnPrimary;
4852
this.operations = operations;
4953
}
5054

5155
public long getTrimAboveSeqNo() {
5256
return trimAboveSeqNo;
5357
}
5458

59+
public long getMaxSeenAutoIdTimestampOnPrimary() {
60+
return maxSeenAutoIdTimestampOnPrimary;
61+
}
62+
5563
public Translog.Operation[] getOperations() {
5664
return operations;
5765
}
@@ -71,6 +79,11 @@ public void readFrom(final StreamInput in) throws IOException {
7179
} else {
7280
trimAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
7381
}
82+
if (in.getVersion().onOrAfter(Version.V_6_5_0)) {
83+
maxSeenAutoIdTimestampOnPrimary = in.readZLong();
84+
} else {
85+
maxSeenAutoIdTimestampOnPrimary = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
86+
}
7487
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
7588
}
7689

@@ -80,6 +93,9 @@ public void writeTo(final StreamOutput out) throws IOException {
8093
if (out.getVersion().onOrAfter(Version.V_6_4_0)) {
8194
out.writeZLong(trimAboveSeqNo);
8295
}
96+
if (out.getVersion().onOrAfter(Version.V_6_5_0)) {
97+
out.writeZLong(maxSeenAutoIdTimestampOnPrimary);
98+
}
8399
out.writeArray(Translog.Operation::writeOperation, operations);
84100
}
85101

@@ -88,13 +104,13 @@ public boolean equals(final Object o) {
88104
if (this == o) return true;
89105
if (o == null || getClass() != o.getClass()) return false;
90106
final ResyncReplicationRequest that = (ResyncReplicationRequest) o;
91-
return trimAboveSeqNo == that.trimAboveSeqNo
107+
return trimAboveSeqNo == that.trimAboveSeqNo && maxSeenAutoIdTimestampOnPrimary == that.maxSeenAutoIdTimestampOnPrimary
92108
&& Arrays.equals(operations, that.operations);
93109
}
94110

95111
@Override
96112
public int hashCode() {
97-
return Long.hashCode(trimAboveSeqNo) + 31 * Arrays.hashCode(operations);
113+
return Objects.hash(trimAboveSeqNo, maxSeenAutoIdTimestampOnPrimary, operations);
98114
}
99115

100116
@Override
@@ -104,6 +120,7 @@ public String toString() {
104120
", timeout=" + timeout +
105121
", index='" + index + '\'' +
106122
", trimAboveSeqNo=" + trimAboveSeqNo +
123+
", maxSeenAutoIdTimestampOnPrimary=" + maxSeenAutoIdTimestampOnPrimary +
107124
", ops=" + operations.length +
108125
"}";
109126
}

server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,12 @@ protected WriteReplicaResult shardOperationOnReplica(ResyncReplicationRequest re
119119

120120
public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
121121
Translog.Location location = null;
122+
/*
123+
* Operations received from resync do not have auto_id_timestamp individually, we need to bootstrap this max_seen_timestamp
124+
* (at least the highest timestamp from any of these operations) to make sure that we will disable optimization for the same
125+
* append-only requests with timestamp (sources of these operations) that are replicated; otherwise we may have duplicates.
126+
*/
127+
replica.updateMaxUnsafeAutoIdTimestamp(request.getMaxSeenAutoIdTimestampOnPrimary());
122128
for (Translog.Operation operation : request.getOperations()) {
123129
final Engine.Result operationResult = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA);
124130
if (operationResult.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {

server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,11 @@ public void onFailure(final Exception e) {
135135
}
136136
}
137137
};
138-
138+
// We must capture the timestamp after snapshotting a snapshot of operations to make sure
139+
// that the auto_id_timestamp of every operation in the snapshot is at most this value.
140+
final long maxSeenAutoIdTimestamp = indexShard.getMaxSeenAutoIdTimestamp();
139141
resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPendingPrimaryTerm(), wrappedSnapshot,
140-
startingSeqNo, maxSeqNo, resyncListener);
142+
startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, resyncListener);
141143
} catch (Exception e) {
142144
try {
143145
IOUtils.close(snapshot);
@@ -150,7 +152,7 @@ public void onFailure(final Exception e) {
150152
}
151153

152154
private void resync(final ShardId shardId, final String primaryAllocationId, final long primaryTerm, final Translog.Snapshot snapshot,
153-
long startingSeqNo, long maxSeqNo, ActionListener<ResyncTask> listener) {
155+
long startingSeqNo, long maxSeqNo, long maxSeenAutoIdTimestamp, ActionListener<ResyncTask> listener) {
154156
ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId);
155157
ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-)
156158
ActionListener<Void> wrappedListener = new ActionListener<Void>() {
@@ -170,7 +172,7 @@ public void onFailure(Exception e) {
170172
};
171173
try {
172174
new SnapshotSender(logger, syncAction, resyncTask, shardId, primaryAllocationId, primaryTerm, snapshot, chunkSize.bytesAsInt(),
173-
startingSeqNo, maxSeqNo, wrappedListener).run();
175+
startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, wrappedListener).run();
174176
} catch (Exception e) {
175177
wrappedListener.onFailure(e);
176178
}
@@ -191,6 +193,7 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R
191193
private final Translog.Snapshot snapshot;
192194
private final long startingSeqNo;
193195
private final long maxSeqNo;
196+
private final long maxSeenAutoIdTimestamp;
194197
private final int chunkSizeInBytes;
195198
private final ActionListener<Void> listener;
196199
private final AtomicBoolean firstMessage = new AtomicBoolean(true);
@@ -199,7 +202,8 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R
199202
private AtomicBoolean closed = new AtomicBoolean();
200203

201204
SnapshotSender(Logger logger, SyncAction syncAction, ResyncTask task, ShardId shardId, String primaryAllocationId, long primaryTerm,
202-
Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo, ActionListener<Void> listener) {
205+
Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo,
206+
long maxSeenAutoIdTimestamp, ActionListener<Void> listener) {
203207
this.logger = logger;
204208
this.syncAction = syncAction;
205209
this.task = task;
@@ -210,6 +214,7 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R
210214
this.chunkSizeInBytes = chunkSizeInBytes;
211215
this.startingSeqNo = startingSeqNo;
212216
this.maxSeqNo = maxSeqNo;
217+
this.maxSeenAutoIdTimestamp = maxSeenAutoIdTimestamp;
213218
this.listener = listener;
214219
task.setTotalOperations(snapshot.totalOperations());
215220
}
@@ -260,7 +265,7 @@ protected void doRun() throws Exception {
260265
if (!operations.isEmpty() || trimmedAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
261266
task.setPhase("sending_ops");
262267
ResyncReplicationRequest request =
263-
new ResyncReplicationRequest(shardId, trimmedAboveSeqNo, operations.toArray(EMPTY_ARRAY));
268+
new ResyncReplicationRequest(shardId, trimmedAboveSeqNo, maxSeenAutoIdTimestamp, operations.toArray(EMPTY_ARRAY));
264269
logger.trace("{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(),
265270
new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get());
266271
firstMessage.set(false);

server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public void testSerialization() throws IOException {
4040
final Translog.Index index = new Translog.Index("type", "id", 0, randomNonNegativeLong(),
4141
Versions.MATCH_ANY, VersionType.INTERNAL, bytes, null, null, -1);
4242
final ShardId shardId = new ShardId(new Index("index", "uuid"), 0);
43-
final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, 42L, new Translog.Operation[]{index});
43+
final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, 42L, 100, new Translog.Operation[]{index});
4444

4545
final BytesStreamOutput out = new BytesStreamOutput();
4646
before.writeTo(out);

server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.logging.log4j.Logger;
2323
import org.apache.lucene.index.IndexWriter;
2424
import org.apache.lucene.index.IndexableField;
25+
import org.elasticsearch.Version;
2526
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
2627
import org.elasticsearch.action.bulk.BulkShardRequest;
2728
import org.elasticsearch.action.index.IndexRequest;
@@ -634,6 +635,49 @@ public long indexTranslogOperations(final List<Translog.Operation> operations, f
634635
}
635636
}
636637

638+
public void testTransferMaxSeenAutoIdTimestampOnResync() throws Exception {
639+
try (ReplicationGroup shards = createGroup(2)) {
640+
shards.startAll();
641+
IndexShard primary = shards.getPrimary();
642+
IndexShard replica1 = shards.getReplicas().get(0);
643+
IndexShard replica2 = shards.getReplicas().get(1);
644+
long maxTimestampOnReplica1 = -1;
645+
long maxTimestampOnReplica2 = -1;
646+
List<IndexRequest> replicationRequests = new ArrayList<>();
647+
for (int numDocs = between(1, 10), i = 0; i < numDocs; i++) {
648+
final IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON);
649+
indexRequest.process(Version.CURRENT, null, index.getName());
650+
final IndexRequest copyRequest;
651+
if (randomBoolean()) {
652+
copyRequest = copyIndexRequest(indexRequest);
653+
indexRequest.onRetry();
654+
} else {
655+
copyRequest = copyIndexRequest(indexRequest);
656+
copyRequest.onRetry();
657+
}
658+
replicationRequests.add(copyRequest);
659+
final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, primary);
660+
if (randomBoolean()) {
661+
indexOnReplica(bulkShardRequest, shards, replica1);
662+
maxTimestampOnReplica1 = Math.max(maxTimestampOnReplica1, indexRequest.getAutoGeneratedTimestamp());
663+
} else {
664+
indexOnReplica(bulkShardRequest, shards, replica2);
665+
maxTimestampOnReplica2 = Math.max(maxTimestampOnReplica2, indexRequest.getAutoGeneratedTimestamp());
666+
}
667+
}
668+
assertThat(replica1.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica1));
669+
assertThat(replica2.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica2));
670+
shards.promoteReplicaToPrimary(replica1).get();
671+
assertThat(replica2.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica1));
672+
for (IndexRequest request : replicationRequests) {
673+
shards.index(request); // deliver via normal replication
674+
}
675+
for (IndexShard shard : shards) {
676+
assertThat(shard.getMaxSeenAutoIdTimestamp(), equalTo(Math.max(maxTimestampOnReplica1, maxTimestampOnReplica2)));
677+
}
678+
}
679+
}
680+
637681
public static class BlockingTarget extends RecoveryTarget {
638682

639683
private final CountDownLatch recoveryBlocked;

server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception {
7676
// Index doc but not advance local checkpoint.
7777
shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
7878
SourceToParse.source(shard.shardId().getIndexName(), "_doc", Integer.toString(i), new BytesArray("{}"), XContentType.JSON),
79-
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
79+
randomBoolean() ? IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP : randomNonNegativeLong(), true);
8080
}
8181

8282
long globalCheckPoint = numDocs > 0 ? randomIntBetween(0, numDocs - 1) : 0;
@@ -105,6 +105,8 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception {
105105
.findFirst()
106106
.isPresent(),
107107
is(false));
108+
109+
assertThat(resyncRequest.getMaxSeenAutoIdTimestampOnPrimary(), equalTo(shard.getMaxSeenAutoIdTimestamp()));
108110
}
109111
if (syncNeeded && globalCheckPoint < numDocs - 1) {
110112
if (shard.indexSettings.isSoftDeleteEnabled()) {

0 commit comments

Comments
 (0)