163163import java .util .stream .StreamSupport ;
164164
165165import static org .elasticsearch .index .mapper .SourceToParse .source ;
166- import static org .elasticsearch .index .seqno .SequenceNumbers .NO_OPS_PERFORMED ;
167166import static org .elasticsearch .index .seqno .SequenceNumbers .UNASSIGNED_SEQ_NO ;
168167
169168public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService .Shard {
@@ -1273,16 +1272,18 @@ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine
12731272 return result ;
12741273 }
12751274
1276- // package-private for testing
1277- int runTranslogRecovery (Engine engine , Translog .Snapshot snapshot ) throws IOException {
1278- recoveryState .getTranslog ().totalOperations (snapshot .totalOperations ());
1279- recoveryState .getTranslog ().totalOperationsOnStart (snapshot .totalOperations ());
1275+ /**
1276+ * Replays translog operations from the provided translog {@code snapshot} to the current engine using the given {@code origin}.
1277+ * The callback {@code onOperationRecovered} is notified after each translog operation is replayed successfully.
1278+ */
1279+ int runTranslogRecovery (Engine engine , Translog .Snapshot snapshot , Engine .Operation .Origin origin ,
1280+ Runnable onOperationRecovered ) throws IOException {
12801281 int opsRecovered = 0 ;
12811282 Translog .Operation operation ;
12821283 while ((operation = snapshot .next ()) != null ) {
12831284 try {
12841285 logger .trace ("[translog] recover op {}" , operation );
1285- Engine .Result result = applyTranslogOperation (operation , Engine . Operation . Origin . LOCAL_TRANSLOG_RECOVERY );
1286+ Engine .Result result = applyTranslogOperation (operation , origin );
12861287 switch (result .getResultType ()) {
12871288 case FAILURE :
12881289 throw result .getFailure ();
@@ -1295,7 +1296,7 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOExce
12951296 }
12961297
12971298 opsRecovered ++;
1298- recoveryState . getTranslog (). incrementRecoveredOperations ();
1299+ onOperationRecovered . run ();
12991300 } catch (Exception e ) {
13001301 if (ExceptionsHelper .status (e ) == RestStatus .BAD_REQUEST ) {
13011302 // mainly for MapperParsingException and Failure to detect xcontent
@@ -1313,8 +1314,15 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOExce
13131314 * Operations from the translog will be replayed to bring lucene up to date.
13141315 **/
13151316 public void openEngineAndRecoverFromTranslog () throws IOException {
1317+ final RecoveryState .Translog translogRecoveryStats = recoveryState .getTranslog ();
1318+ final Engine .TranslogRecoveryRunner translogRecoveryRunner = (engine , snapshot ) -> {
1319+ translogRecoveryStats .totalOperations (snapshot .totalOperations ());
1320+ translogRecoveryStats .totalOperationsOnStart (snapshot .totalOperations ());
1321+ return runTranslogRecovery (engine , snapshot , Engine .Operation .Origin .LOCAL_TRANSLOG_RECOVERY ,
1322+ translogRecoveryStats ::incrementRecoveredOperations );
1323+ };
13161324 innerOpenEngineAndTranslog ();
1317- getEngine ().recoverFromTranslog (this :: runTranslogRecovery , Long .MAX_VALUE );
1325+ getEngine ().recoverFromTranslog (translogRecoveryRunner , Long .MAX_VALUE );
13181326 }
13191327
13201328 /**
@@ -1352,11 +1360,7 @@ private void innerOpenEngineAndTranslog() throws IOException {
13521360 final String translogUUID = store .readLastCommittedSegmentsInfo ().getUserData ().get (Translog .TRANSLOG_UUID_KEY );
13531361 final long globalCheckpoint = Translog .readGlobalCheckpoint (translogConfig .getTranslogPath (), translogUUID );
13541362 replicationTracker .updateGlobalCheckpointOnReplica (globalCheckpoint , "read from translog checkpoint" );
1355-
1356- assertMaxUnsafeAutoIdInCommit ();
1357-
1358- final long minRetainedTranslogGen = Translog .readMinTranslogGeneration (translogConfig .getTranslogPath (), translogUUID );
1359- store .trimUnsafeCommits (globalCheckpoint , minRetainedTranslogGen , config .getIndexSettings ().getIndexVersionCreated ());
1363+ trimUnsafeCommits ();
13601364
13611365 createNewEngine (config );
13621366 verifyNotClosed ();
@@ -1367,6 +1371,15 @@ private void innerOpenEngineAndTranslog() throws IOException {
13671371 assert recoveryState .getStage () == RecoveryState .Stage .TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState .getStage ();
13681372 }
13691373
1374+ private void trimUnsafeCommits () throws IOException {
1375+ assert currentEngineReference .get () == null : "engine is running" ;
1376+ final String translogUUID = store .readLastCommittedSegmentsInfo ().getUserData ().get (Translog .TRANSLOG_UUID_KEY );
1377+ final long globalCheckpoint = Translog .readGlobalCheckpoint (translogConfig .getTranslogPath (), translogUUID );
1378+ final long minRetainedTranslogGen = Translog .readMinTranslogGeneration (translogConfig .getTranslogPath (), translogUUID );
1379+ assertMaxUnsafeAutoIdInCommit ();
1380+ store .trimUnsafeCommits (globalCheckpoint , minRetainedTranslogGen , indexSettings .getIndexVersionCreated ());
1381+ }
1382+
13701383 private boolean assertSequenceNumbersInCommit () throws IOException {
13711384 final Map <String , String > userData = SegmentInfos .readLatestCommit (store .directory ()).getUserData ();
13721385 assert userData .containsKey (SequenceNumbers .LOCAL_CHECKPOINT_KEY ) : "commit point doesn't contains a local checkpoint" ;
@@ -1463,7 +1476,7 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn
14631476 if (origin == Engine .Operation .Origin .PRIMARY ) {
14641477 assert assertPrimaryMode ();
14651478 } else {
1466- assert origin == Engine .Operation .Origin .REPLICA ;
1479+ assert origin == Engine .Operation .Origin .REPLICA || origin == Engine . Operation . Origin . LOCAL_RESET ;
14671480 assert assertReplicationTarget ();
14681481 }
14691482 if (writeAllowedStates .contains (state ) == false ) {
@@ -2166,9 +2179,7 @@ public void onFailedEngine(String reason, @Nullable Exception failure) {
21662179
21672180 private Engine createNewEngine (EngineConfig config ) {
21682181 synchronized (mutex ) {
2169- if (state == IndexShardState .CLOSED ) {
2170- throw new AlreadyClosedException (shardId + " can't create engine - shard is closed" );
2171- }
2182+ verifyNotClosed ();
21722183 assert this .currentEngineReference .get () == null ;
21732184 Engine engine = newEngine (config );
21742185 onNewEngine (engine ); // call this before we pass the memory barrier otherwise actions that happen
@@ -2314,19 +2325,14 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long g
23142325 bumpPrimaryTerm (opPrimaryTerm , () -> {
23152326 updateGlobalCheckpointOnReplica (globalCheckpoint , "primary term transition" );
23162327 final long currentGlobalCheckpoint = getGlobalCheckpoint ();
2317- final long localCheckpoint ;
2318- if (currentGlobalCheckpoint == UNASSIGNED_SEQ_NO ) {
2319- localCheckpoint = NO_OPS_PERFORMED ;
2328+ final long maxSeqNo = seqNoStats ().getMaxSeqNo ();
2329+ logger .info ("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]" ,
2330+ opPrimaryTerm , currentGlobalCheckpoint , maxSeqNo );
2331+ if (currentGlobalCheckpoint < maxSeqNo ) {
2332+ resetEngineToGlobalCheckpoint ();
23202333 } else {
2321- localCheckpoint = currentGlobalCheckpoint ;
2334+ getEngine (). rollTranslogGeneration () ;
23222335 }
2323- logger .trace (
2324- "detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]" ,
2325- opPrimaryTerm ,
2326- getLocalCheckpoint (),
2327- localCheckpoint );
2328- getEngine ().resetLocalCheckpoint (localCheckpoint );
2329- getEngine ().rollTranslogGeneration ();
23302336 });
23312337 }
23322338 }
@@ -2687,4 +2693,26 @@ public ParsedDocument newNoopTombstoneDoc(String reason) {
26872693 }
26882694 };
26892695 }
2696+
2697+ /**
2698+ * Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
2699+ */
2700+ void resetEngineToGlobalCheckpoint () throws IOException {
2701+ assert getActiveOperationsCount () == 0 : "Ongoing writes [" + getActiveOperations () + "]" ;
2702+ sync (); // persist the global checkpoint to disk
2703+ final long globalCheckpoint = getGlobalCheckpoint ();
2704+ final Engine newEngine ;
2705+ synchronized (mutex ) {
2706+ verifyNotClosed ();
2707+ IOUtils .close (currentEngineReference .getAndSet (null ));
2708+ trimUnsafeCommits ();
2709+ newEngine = createNewEngine (newEngineConfig ());
2710+ active .set (true );
2711+ }
2712+ final Engine .TranslogRecoveryRunner translogRunner = (engine , snapshot ) -> runTranslogRecovery (
2713+ engine , snapshot , Engine .Operation .Origin .LOCAL_RESET , () -> {
2714+ // TODO: add a dedicate recovery stats for the reset translog
2715+ });
2716+ newEngine .recoverFromTranslog (translogRunner , globalCheckpoint );
2717+ }
26902718}
0 commit comments