Skip to content

Commit f3d21a2

Browse files
committed
Propagate auto_id_timestamp in primary-replica resync
A follow-up of elastic#33693 to propagate max_seen_auto_id_timestamp in a primary-replica resync.
1 parent ea3f3e4 commit f3d21a2

File tree

6 files changed

+82
-12
lines changed

6 files changed

+82
-12
lines changed

server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import java.io.IOException;
3030
import java.util.Arrays;
31+
import java.util.Objects;
3132

3233
/**
3334
* Represents a batch of operations sent from the primary to its replicas during the primary-replica resync.
@@ -36,22 +37,28 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest<Resyn
3637

3738
private long trimAboveSeqNo;
3839
private Translog.Operation[] operations;
40+
private long maxSeenAutoIdTimestampOnPrimary;
3941

4042
ResyncReplicationRequest() {
4143
super();
4244
}
4345

44-
public ResyncReplicationRequest(final ShardId shardId, final long trimAboveSeqNo,
45-
final Translog.Operation[] operations) {
46+
public ResyncReplicationRequest(final ShardId shardId, final long trimAboveSeqNo, final long maxSeenAutoIdTimestampOnPrimary,
47+
final Translog.Operation[]operations) {
4648
super(shardId);
4749
this.trimAboveSeqNo = trimAboveSeqNo;
50+
this.maxSeenAutoIdTimestampOnPrimary = maxSeenAutoIdTimestampOnPrimary;
4851
this.operations = operations;
4952
}
5053

5154
public long getTrimAboveSeqNo() {
5255
return trimAboveSeqNo;
5356
}
5457

58+
public long getMaxSeenAutoIdTimestampOnPrimary() {
59+
return maxSeenAutoIdTimestampOnPrimary;
60+
}
61+
5562
public Translog.Operation[] getOperations() {
5663
return operations;
5764
}
@@ -73,6 +80,11 @@ public void readFrom(final StreamInput in) throws IOException {
7380
} else {
7481
trimAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
7582
}
83+
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
84+
maxSeenAutoIdTimestampOnPrimary = in.readZLong();
85+
} else {
86+
maxSeenAutoIdTimestampOnPrimary = -1;
87+
}
7688
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
7789
}
7890

@@ -82,6 +94,9 @@ public void writeTo(final StreamOutput out) throws IOException {
8294
if (out.getVersion().onOrAfter(Version.V_6_4_0)) {
8395
out.writeZLong(trimAboveSeqNo);
8496
}
97+
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
98+
out.writeZLong(maxSeenAutoIdTimestampOnPrimary);
99+
}
85100
out.writeArray(Translog.Operation::writeOperation, operations);
86101
}
87102

@@ -90,13 +105,13 @@ public boolean equals(final Object o) {
90105
if (this == o) return true;
91106
if (o == null || getClass() != o.getClass()) return false;
92107
final ResyncReplicationRequest that = (ResyncReplicationRequest) o;
93-
return trimAboveSeqNo == that.trimAboveSeqNo
108+
return trimAboveSeqNo == that.trimAboveSeqNo && maxSeenAutoIdTimestampOnPrimary == that.maxSeenAutoIdTimestampOnPrimary
94109
&& Arrays.equals(operations, that.operations);
95110
}
96111

97112
@Override
98113
public int hashCode() {
99-
return Long.hashCode(trimAboveSeqNo) + 31 * Arrays.hashCode(operations);
114+
return Objects.hash(trimAboveSeqNo, maxSeenAutoIdTimestampOnPrimary, operations);
100115
}
101116

102117
@Override
@@ -106,6 +121,7 @@ public String toString() {
106121
", timeout=" + timeout +
107122
", index='" + index + '\'' +
108123
", trimAboveSeqNo=" + trimAboveSeqNo +
124+
", maxSeenAutoIdTimestampOnPrimary=" + maxSeenAutoIdTimestampOnPrimary +
109125
", ops=" + operations.length +
110126
"}";
111127
}

server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,12 @@ protected WriteReplicaResult shardOperationOnReplica(ResyncReplicationRequest re
119119

120120
public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
121121
Translog.Location location = null;
122+
/*
123+
* Operations received from resync do not have auto_id_timestamp individually, we need to bootstrap this max_seen_timestamp
124+
* (at least the highest timestamp from any of these operations) to make sure that we will disable optimization for the same
125+
* append-only requests with timestamp (sources of these operations) that are replicated; otherwise we may have duplicates.
126+
*/
127+
replica.updateMaxUnsafeAutoIdTimestamp(request.getMaxSeenAutoIdTimestampOnPrimary());
122128
for (Translog.Operation operation : request.getOperations()) {
123129
final Engine.Result operationResult = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA);
124130
if (operationResult.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {

server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,11 @@ public void onFailure(final Exception e) {
135135
}
136136
}
137137
};
138-
138+
// We must capture the timestamp after snapshotting a snapshot of operations to make sure
139+
// that the auto_id_timestamp of every operation in the snapshot is at most this value.
140+
final long maxSeenAutoIdTimestamp = indexShard.getMaxSeenAutoIdTimestamp();
139141
resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPendingPrimaryTerm(), wrappedSnapshot,
140-
startingSeqNo, maxSeqNo, resyncListener);
142+
startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, resyncListener);
141143
} catch (Exception e) {
142144
try {
143145
IOUtils.close(snapshot);
@@ -150,7 +152,7 @@ public void onFailure(final Exception e) {
150152
}
151153

152154
private void resync(final ShardId shardId, final String primaryAllocationId, final long primaryTerm, final Translog.Snapshot snapshot,
153-
long startingSeqNo, long maxSeqNo, ActionListener<ResyncTask> listener) {
155+
long startingSeqNo, long maxSeqNo, long maxSeenAutoIdTimestamp, ActionListener<ResyncTask> listener) {
154156
ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId);
155157
ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-)
156158
ActionListener<Void> wrappedListener = new ActionListener<Void>() {
@@ -170,7 +172,7 @@ public void onFailure(Exception e) {
170172
};
171173
try {
172174
new SnapshotSender(logger, syncAction, resyncTask, shardId, primaryAllocationId, primaryTerm, snapshot, chunkSize.bytesAsInt(),
173-
startingSeqNo, maxSeqNo, wrappedListener).run();
175+
startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, wrappedListener).run();
174176
} catch (Exception e) {
175177
wrappedListener.onFailure(e);
176178
}
@@ -191,6 +193,7 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R
191193
private final Translog.Snapshot snapshot;
192194
private final long startingSeqNo;
193195
private final long maxSeqNo;
196+
private final long maxSeenAutoIdTimestamp;
194197
private final int chunkSizeInBytes;
195198
private final ActionListener<Void> listener;
196199
private final AtomicBoolean firstMessage = new AtomicBoolean(true);
@@ -199,7 +202,8 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R
199202
private AtomicBoolean closed = new AtomicBoolean();
200203

201204
SnapshotSender(Logger logger, SyncAction syncAction, ResyncTask task, ShardId shardId, String primaryAllocationId, long primaryTerm,
202-
Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo, ActionListener<Void> listener) {
205+
Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo,
206+
long maxSeenAutoIdTimestamp, ActionListener<Void> listener) {
203207
this.logger = logger;
204208
this.syncAction = syncAction;
205209
this.task = task;
@@ -210,6 +214,7 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R
210214
this.chunkSizeInBytes = chunkSizeInBytes;
211215
this.startingSeqNo = startingSeqNo;
212216
this.maxSeqNo = maxSeqNo;
217+
this.maxSeenAutoIdTimestamp = maxSeenAutoIdTimestamp;
213218
this.listener = listener;
214219
task.setTotalOperations(snapshot.totalOperations());
215220
}
@@ -260,7 +265,7 @@ protected void doRun() throws Exception {
260265
if (!operations.isEmpty() || trimmedAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
261266
task.setPhase("sending_ops");
262267
ResyncReplicationRequest request =
263-
new ResyncReplicationRequest(shardId, trimmedAboveSeqNo, operations.toArray(EMPTY_ARRAY));
268+
new ResyncReplicationRequest(shardId, trimmedAboveSeqNo, maxSeenAutoIdTimestamp, operations.toArray(EMPTY_ARRAY));
264269
logger.trace("{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(),
265270
new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get());
266271
firstMessage.set(false);

server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public void testSerialization() throws IOException {
3838
final Translog.Index index = new Translog.Index("type", "id", 0, randomNonNegativeLong(),
3939
randomNonNegativeLong(), bytes, null, -1);
4040
final ShardId shardId = new ShardId(new Index("index", "uuid"), 0);
41-
final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, 42L, new Translog.Operation[]{index});
41+
final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, 42L, 100, new Translog.Operation[]{index});
4242

4343
final BytesStreamOutput out = new BytesStreamOutput();
4444
before.writeTo(out);

server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.logging.log4j.Logger;
2323
import org.apache.lucene.index.IndexWriter;
2424
import org.apache.lucene.index.IndexableField;
25+
import org.elasticsearch.Version;
2526
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
2627
import org.elasticsearch.action.bulk.BulkShardRequest;
2728
import org.elasticsearch.action.index.IndexRequest;
@@ -633,6 +634,46 @@ public long indexTranslogOperations(final List<Translog.Operation> operations, f
633634
}
634635
}
635636

637+
public void testTransferMaxSeenAutoIdTimestampOnResync() throws Exception {
638+
try (ReplicationGroup shards = createGroup(2)) {
639+
shards.startAll();
640+
IndexShard primary = shards.getPrimary();
641+
IndexShard replica1 = shards.getReplicas().get(0);
642+
IndexShard replica2 = shards.getReplicas().get(1);
643+
long maxTimestampOnReplica1 = -1;
644+
long maxTimestampOnReplica2 = -1;
645+
List<IndexRequest> replicationRequests = new ArrayList<>();
646+
for (int numDocs = between(1, 10), i = 0; i < numDocs; i++) {
647+
final IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON);
648+
indexRequest.process(Version.CURRENT, null, index.getName());
649+
final IndexRequest copyRequest;
650+
if (randomBoolean()) {
651+
copyRequest = copyIndexRequest(indexRequest);
652+
indexRequest.onRetry();
653+
} else {
654+
copyRequest = copyIndexRequest(indexRequest);
655+
copyRequest.onRetry();
656+
}
657+
replicationRequests.add(copyRequest);
658+
final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, primary);
659+
if (randomBoolean()) {
660+
indexOnReplica(bulkShardRequest, shards, replica1);
661+
maxTimestampOnReplica1 = Math.max(maxTimestampOnReplica1, indexRequest.getAutoGeneratedTimestamp());
662+
} else {
663+
indexOnReplica(bulkShardRequest, shards, replica2);
664+
maxTimestampOnReplica2 = Math.max(maxTimestampOnReplica2, indexRequest.getAutoGeneratedTimestamp());
665+
}
666+
}
667+
assertThat(replica1.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica1));
668+
assertThat(replica2.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica2));
669+
shards.promoteReplicaToPrimary(replica1).get();
670+
assertThat(replica2.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica1));
671+
for (IndexRequest request : replicationRequests) {
672+
shards.index(request); // deliver via normal replication
673+
}
674+
}
675+
}
676+
636677
public static class BlockingTarget extends RecoveryTarget {
637678

638679
private final CountDownLatch recoveryBlocked;

server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception {
7676
// Index doc but not advance local checkpoint.
7777
shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
7878
SourceToParse.source(shard.shardId().getIndexName(), "_doc", Integer.toString(i), new BytesArray("{}"), XContentType.JSON),
79-
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
79+
randomNonNegativeLong(), randomBoolean());
8080
}
8181

8282
long globalCheckPoint = numDocs > 0 ? randomIntBetween(0, numDocs - 1) : 0;
@@ -105,6 +105,8 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception {
105105
.findFirst()
106106
.isPresent(),
107107
is(false));
108+
109+
assertThat(resyncRequest.getMaxSeenAutoIdTimestampOnPrimary(), equalTo(shard.getMaxSeenAutoIdTimestamp()));
108110
}
109111
if (syncNeeded && globalCheckPoint < numDocs - 1) {
110112
if (shard.indexSettings.isSoftDeleteEnabled()) {

0 commit comments

Comments
 (0)