Skip to content

Commit 95cdf12

Browse files
committed
Truncate tlog cli should assign global checkpoint (#28192)
We are targeting to always have a safe index once the recovery is done. This invariant does not hold if the translog is manually truncated by users because the truncate translog cli resets the global checkpoint to unassigned. This commit assigns the global checkpoint to the max_seqno of the last commit when truncating translog. We can only safely do it because the truncate translog command will generate a new history uuid for that shard. With a new history UUID, sequence-based recovery between that shard and other old shards will be disabled. Relates #28181
1 parent cdc6085 commit 95cdf12

File tree

2 files changed

+31
-4
lines changed

2 files changed

+31
-4
lines changed

server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,19 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th
132132
}
133133

134134
// Retrieve the generation and UUID from the existing data
135-
commitData = commits.get(commits.size() - 1).getUserData();
135+
commitData = new HashMap<>(commits.get(commits.size() - 1).getUserData());
136136
String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY);
137137
String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY);
138+
final long globalCheckpoint;
139+
// In order to have a safe commit invariant, we have to assign the global checkpoint to the max_seqno of the last commit.
140+
// We can only safely do it because we will generate a new history uuid this shard.
141+
if (commitData.containsKey(SequenceNumbers.MAX_SEQ_NO)) {
142+
globalCheckpoint = Long.parseLong(commitData.get(SequenceNumbers.MAX_SEQ_NO));
143+
// Also advances the local checkpoint of the last commit to its max_seqno.
144+
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(globalCheckpoint));
145+
} else {
146+
globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
147+
}
138148
if (translogGeneration == null || translogUUID == null) {
139149
throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]",
140150
translogGeneration, translogUUID);
@@ -153,7 +163,7 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th
153163
// Write empty checkpoint and translog to empty files
154164
long gen = Long.parseLong(translogGeneration);
155165
int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID);
156-
writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen);
166+
writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen, globalCheckpoint);
157167

158168
terminal.println("Removing existing translog files");
159169
IOUtils.rm(translogFiles.toArray(new Path[]{}));
@@ -190,9 +200,9 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th
190200
}
191201

192202
/** Write a checkpoint file to the given location with the given generation */
193-
public static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration) throws IOException {
203+
static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration, long globalCheckpoint) throws IOException {
194204
Checkpoint emptyCheckpoint = Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration,
195-
SequenceNumbers.UNASSIGNED_SEQ_NO, translogGeneration);
205+
globalCheckpoint, translogGeneration);
196206
Checkpoint.write(FileChannel::open, filename, emptyCheckpoint,
197207
StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
198208
// fsync with metadata here to make sure.

server/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
3232
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
3333
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
34+
import org.elasticsearch.action.admin.indices.stats.ShardStats;
3435
import org.elasticsearch.action.index.IndexRequestBuilder;
3536
import org.elasticsearch.action.search.SearchPhaseExecutionException;
3637
import org.elasticsearch.cli.MockTerminal;
@@ -47,6 +48,7 @@
4748
import org.elasticsearch.index.Index;
4849
import org.elasticsearch.index.IndexSettings;
4950
import org.elasticsearch.index.MockEngineFactoryPlugin;
51+
import org.elasticsearch.index.seqno.SeqNoStats;
5052
import org.elasticsearch.index.shard.IndexShard;
5153
import org.elasticsearch.index.shard.ShardId;
5254
import org.elasticsearch.indices.IndicesService;
@@ -74,6 +76,7 @@
7476
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
7577
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
7678
import static org.hamcrest.Matchers.containsString;
79+
import static org.hamcrest.Matchers.equalTo;
7780
import static org.hamcrest.Matchers.greaterThan;
7881

7982
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0)
@@ -212,6 +215,10 @@ public void testCorruptTranslogTruncation() throws Exception {
212215
final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get("test").stream()
213216
.filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get();
214217
assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0));
218+
// Ensure that the global checkpoint and local checkpoint are restored from the max seqno of the last commit.
219+
final SeqNoStats seqNoStats = getSeqNoStats("test", 0);
220+
assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
221+
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
215222
}
216223

217224
public void testCorruptTranslogTruncationOfReplica() throws Exception {
@@ -312,6 +319,10 @@ public void testCorruptTranslogTruncationOfReplica() throws Exception {
312319
.filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get();
313320
// the replica translog was disabled so it doesn't know what hte global checkpoint is and thus can't do ops based recovery
314321
assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0));
322+
// Ensure that the global checkpoint and local checkpoint are restored from the max seqno of the last commit.
323+
final SeqNoStats seqNoStats = getSeqNoStats("test", 0);
324+
assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
325+
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
315326
}
316327

317328
private Set<Path> getTranslogDirs(String indexName) throws IOException {
@@ -356,4 +367,10 @@ private static void disableTranslogFlush(String index) {
356367
client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
357368
}
358369

370+
private SeqNoStats getSeqNoStats(String index, int shardId) {
371+
final ShardStats[] shardStats = client().admin().indices()
372+
.prepareStats(index).get()
373+
.getIndices().get(index).getShards();
374+
return shardStats[shardId].getSeqNoStats();
375+
}
359376
}

0 commit comments

Comments
 (0)