7878import java .util .HashMap ;
7979import java .util .List ;
8080import java .util .Map ;
81+ import java .util .Optional ;
8182import java .util .Set ;
8283import java .util .concurrent .atomic .AtomicBoolean ;
8384import java .util .concurrent .atomic .AtomicInteger ;
8687import java .util .concurrent .locks .ReentrantLock ;
8788import java .util .function .Function ;
8889import java .util .function .LongSupplier ;
90+ import java .util .function .Supplier ;
8991
9092public class InternalEngine extends Engine {
9193
@@ -175,9 +177,18 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
175177 throw new IllegalArgumentException (openMode .toString ());
176178 }
177179 logger .trace ("recovered [{}]" , seqNoStats );
178- indexWriter = writer ;
179180 seqNoService = sequenceNumberService (shardId , engineConfig .getIndexSettings (), seqNoStats );
180- translog = openTranslog (engineConfig , writer , seqNoService ::getGlobalCheckpoint );
181+ // norelease
182+ /*
183+ * We have no guarantees that all operations above the local checkpoint are in the Lucene commit or the translog. This means
184+ * that we there might be operations greater than the local checkpoint that will not be replayed. Here we force the local
185+ * checkpoint to the maximum sequence number in the commit (at the potential expense of correctness).
186+ */
187+ while (seqNoService ().getLocalCheckpoint () < seqNoService ().getMaxSeqNo ()) {
188+ seqNoService ().markSeqNoAsCompleted (seqNoService ().getLocalCheckpoint () + 1 );
189+ }
190+ indexWriter = writer ;
191+ translog = openTranslog (engineConfig , writer , () -> seqNoService ().getLocalCheckpoint ());
181192 assert translog .getGeneration () != null ;
182193 } catch (IOException | TranslogCorruptedException e ) {
183194 throw new EngineCreationFailureException (shardId , "failed to create engine" , e );
@@ -412,7 +423,7 @@ private SearcherManager createSearcherManager() throws EngineException {
412423
413424 @ Override
414425 public GetResult get (Get get , Function <String , Searcher > searcherFactory ) throws EngineException {
415- try (ReleasableLock lock = readLock .acquire ()) {
426+ try (ReleasableLock ignored = readLock .acquire ()) {
416427 ensureOpen ();
417428 if (get .realtime ()) {
418429 VersionValue versionValue = versionMap .getUnderLock (get .uid ());
@@ -434,11 +445,28 @@ public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws
434445 }
435446 }
436447
437- private boolean checkVersionConflict (
438- final Operation op ,
439- final long currentVersion ,
440- final long expectedVersion ,
441- final boolean deleted ) {
448+ /**
449+ * Checks for version conflicts. If a version conflict exists, the optional return value represents the operation result. Otherwise, if
450+ * no conflicts are found, the optional return value is not present.
451+ *
452+ * @param <T> the result type
453+ * @param op the operation
454+ * @param currentVersion the current version
455+ * @param expectedVersion the expected version
456+ * @param deleted {@code true} if the current version is not found or represents a delete
457+ * @param onSuccess if there is a version conflict that can be ignored, the result of the operation
458+ * @param onFailure if there is a version conflict that can not be ignored, the result of the operation
459+ * @return if there is a version conflict, the optional value is present and represents the operation result, otherwise the return value
460+ * is not present
461+ */
462+ private <T extends Result > Optional <T > checkVersionConflict (
463+ final Operation op ,
464+ final long currentVersion ,
465+ final long expectedVersion ,
466+ final boolean deleted ,
467+ final Supplier <T > onSuccess ,
468+ final Function <VersionConflictEngineException , T > onFailure ) {
469+ final T result ;
442470 if (op .versionType () == VersionType .FORCE ) {
443471 if (engineConfig .getIndexSettings ().getIndexVersionCreated ().onOrAfter (Version .V_6_0_0_alpha1_UNRELEASED )) {
444472 // If index was created in 5.0 or later, 'force' is not allowed at all
@@ -452,14 +480,22 @@ private boolean checkVersionConflict(
452480 if (op .versionType ().isVersionConflictForWrites (currentVersion , expectedVersion , deleted )) {
453481 if (op .origin ().isRecovery ()) {
454482 // version conflict, but okay
455- return true ;
483+ result = onSuccess . get () ;
456484 } else {
457485 // fatal version conflict
458- throw new VersionConflictEngineException (shardId , op .type (), op .id (),
486+ final VersionConflictEngineException e =
487+ new VersionConflictEngineException (
488+ shardId ,
489+ op .type (),
490+ op .id (),
459491 op .versionType ().explainConflictForWrites (currentVersion , expectedVersion , deleted ));
492+ result = onFailure .apply (e );
460493 }
494+
495+ return Optional .of (result );
496+ } else {
497+ return Optional .empty ();
461498 }
462- return false ;
463499 }
464500
465501 private long checkDeletedAndGCed (VersionValue versionValue ) {
@@ -475,7 +511,7 @@ private long checkDeletedAndGCed(VersionValue versionValue) {
475511 @ Override
476512 public IndexResult index (Index index ) {
477513 IndexResult result ;
478- try (ReleasableLock lock = readLock .acquire ()) {
514+ try (ReleasableLock ignored = readLock .acquire ()) {
479515 ensureOpen ();
480516 if (index .origin ().isRecovery ()) {
481517 // Don't throttle recovery operations
@@ -573,7 +609,7 @@ private IndexResult innerIndex(Index index) throws IOException {
573609 assert assertSequenceNumber (index .origin (), index .seqNo ());
574610 final Translog .Location location ;
575611 final long updatedVersion ;
576- IndexResult indexResult = null ;
612+ long seqNo = index . seqNo () ;
577613 try (Releasable ignored = acquireLock (index .uid ())) {
578614 lastWriteNanos = index .startTime ();
579615 /* if we have an autoGeneratedID that comes into the engine we can potentially optimize
@@ -638,27 +674,32 @@ private IndexResult innerIndex(Index index) throws IOException {
638674 }
639675 }
640676 final long expectedVersion = index .version ();
641- if (checkVersionConflict (index , currentVersion , expectedVersion , deleted )) {
642- // skip index operation because of version conflict on recovery
643- indexResult = new IndexResult (expectedVersion , SequenceNumbersService .UNASSIGNED_SEQ_NO , false );
677+ final Optional <IndexResult > checkVersionConflictResult =
678+ checkVersionConflict (
679+ index ,
680+ currentVersion ,
681+ expectedVersion ,
682+ deleted ,
683+ () -> new IndexResult (currentVersion , index .seqNo (), false ),
684+ e -> new IndexResult (e , currentVersion , index .seqNo ()));
685+
686+ final IndexResult indexResult ;
687+ if (checkVersionConflictResult .isPresent ()) {
688+ indexResult = checkVersionConflictResult .get ();
644689 } else {
645- final long seqNo ;
690+ // no version conflict
646691 if (index .origin () == Operation .Origin .PRIMARY ) {
647- seqNo = seqNoService .generateSeqNo ();
648- } else {
649- seqNo = index .seqNo ();
692+ seqNo = seqNoService ().generateSeqNo ();
650693 }
651- updatedVersion = index .versionType ().updateVersion (currentVersion , expectedVersion );
652- index .parsedDoc ().version ().setLongValue (updatedVersion );
653694
654- // Update the document's sequence number and primary term, the
655- // sequence number here is derived here from either the sequence
656- // number service if this is on the primary, or the existing
657- // document's sequence number if this is on the replica. The
658- // primary term here has already been set, see
659- // IndexShard.prepareIndex where the Engine.Index operation is
660- // created
695+ /**
696+ * Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence
697+ * number service if this is on the primary, or the existing document's sequence number if this is on the replica. The
698+ * primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
699+ */
661700 index .parsedDoc ().updateSeqID (seqNo , index .primaryTerm ());
701+ updatedVersion = index .versionType ().updateVersion (currentVersion , expectedVersion );
702+ index .parsedDoc ().version ().setLongValue (updatedVersion );
662703
663704 if (currentVersion == Versions .NOT_FOUND && forceUpdateDocument == false ) {
664705 // document does not exists, we can optimize for create, but double check if assertions are running
@@ -669,17 +710,17 @@ private IndexResult innerIndex(Index index) throws IOException {
669710 }
670711 indexResult = new IndexResult (updatedVersion , seqNo , deleted );
671712 location = index .origin () != Operation .Origin .LOCAL_TRANSLOG_RECOVERY
672- ? translog .add (new Translog .Index (index , indexResult ))
673- : null ;
713+ ? translog .add (new Translog .Index (index , indexResult ))
714+ : null ;
674715 versionMap .putUnderLock (index .uid ().bytes (), new VersionValue (updatedVersion ));
675716 indexResult .setTranslogLocation (location );
676717 }
677718 indexResult .setTook (System .nanoTime () - index .startTime ());
678719 indexResult .freeze ();
679720 return indexResult ;
680721 } finally {
681- if (indexResult != null && indexResult . getSeqNo () != SequenceNumbersService .UNASSIGNED_SEQ_NO ) {
682- seqNoService .markSeqNoAsCompleted (indexResult . getSeqNo () );
722+ if (seqNo != SequenceNumbersService .UNASSIGNED_SEQ_NO ) {
723+ seqNoService () .markSeqNoAsCompleted (seqNo );
683724 }
684725 }
685726
@@ -724,7 +765,7 @@ private static void update(final Term uid, final List<ParseContext.Document> doc
724765 @ Override
725766 public DeleteResult delete (Delete delete ) {
726767 DeleteResult result ;
727- try (ReleasableLock lock = readLock .acquire ()) {
768+ try (ReleasableLock ignored = readLock .acquire ()) {
728769 ensureOpen ();
729770 // NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:
730771 result = innerDelete (delete );
@@ -748,7 +789,7 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
748789 final Translog .Location location ;
749790 final long updatedVersion ;
750791 final boolean found ;
751- DeleteResult deleteResult = null ;
792+ long seqNo = delete . seqNo () ;
752793 try (Releasable ignored = acquireLock (delete .uid ())) {
753794 lastWriteNanos = delete .startTime ();
754795 final long currentVersion ;
@@ -764,32 +805,40 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
764805 }
765806
766807 final long expectedVersion = delete .version ();
767- if (checkVersionConflict (delete , currentVersion , expectedVersion , deleted )) {
768- // skip executing delete because of version conflict on recovery
769- deleteResult = new DeleteResult (expectedVersion , SequenceNumbersService .UNASSIGNED_SEQ_NO , true );
808+
809+ final Optional <DeleteResult > result =
810+ checkVersionConflict (
811+ delete ,
812+ currentVersion ,
813+ expectedVersion ,
814+ deleted ,
815+ () -> new DeleteResult (expectedVersion , delete .seqNo (), true ),
816+ e -> new DeleteResult (e , expectedVersion , delete .seqNo ()));
817+
818+ final DeleteResult deleteResult ;
819+ if (result .isPresent ()) {
820+ deleteResult = result .get ();
770821 } else {
771- final long seqNo ;
772822 if (delete .origin () == Operation .Origin .PRIMARY ) {
773- seqNo = seqNoService .generateSeqNo ();
774- } else {
775- seqNo = delete .seqNo ();
823+ seqNo = seqNoService ().generateSeqNo ();
776824 }
825+
777826 updatedVersion = delete .versionType ().updateVersion (currentVersion , expectedVersion );
778827 found = deleteIfFound (delete .uid (), currentVersion , deleted , versionValue );
779828 deleteResult = new DeleteResult (updatedVersion , seqNo , found );
780829 location = delete .origin () != Operation .Origin .LOCAL_TRANSLOG_RECOVERY
781- ? translog .add (new Translog .Delete (delete , deleteResult ))
782- : null ;
830+ ? translog .add (new Translog .Delete (delete , deleteResult ))
831+ : null ;
783832 versionMap .putUnderLock (delete .uid ().bytes (),
784- new DeleteVersionValue (updatedVersion , engineConfig .getThreadPool ().estimatedTimeInMillis ()));
833+ new DeleteVersionValue (updatedVersion , engineConfig .getThreadPool ().estimatedTimeInMillis ()));
785834 deleteResult .setTranslogLocation (location );
786835 }
787836 deleteResult .setTook (System .nanoTime () - delete .startTime ());
788837 deleteResult .freeze ();
789838 return deleteResult ;
790839 } finally {
791- if (deleteResult != null && deleteResult . getSeqNo () != SequenceNumbersService .UNASSIGNED_SEQ_NO ) {
792- seqNoService .markSeqNoAsCompleted (deleteResult . getSeqNo () );
840+ if (seqNo != SequenceNumbersService .UNASSIGNED_SEQ_NO ) {
841+ seqNoService () .markSeqNoAsCompleted (seqNo );
793842 }
794843 }
795844 }
0 commit comments