2222import org .apache .lucene .index .CorruptIndexException ;
2323import org .apache .lucene .index .DirectoryReader ;
2424import org .apache .lucene .index .IndexCommit ;
25+ import org .apache .lucene .index .IndexWriter ;
2526import org .apache .lucene .index .Term ;
2627import org .apache .lucene .search .IndexSearcher ;
2728import org .apache .lucene .search .TermQuery ;
2829import org .apache .lucene .search .TopDocs ;
2930import org .apache .lucene .store .AlreadyClosedException ;
31+ import org .apache .lucene .store .BaseDirectoryWrapper ;
3032import org .apache .lucene .store .Directory ;
3133import org .apache .lucene .store .FilterDirectory ;
3234import org .apache .lucene .store .IOContext ;
5355import org .elasticsearch .cluster .routing .ShardRoutingState ;
5456import org .elasticsearch .cluster .routing .TestShardRouting ;
5557import org .elasticsearch .cluster .routing .UnassignedInfo ;
58+ import org .elasticsearch .common .CheckedFunction ;
5659import org .elasticsearch .common .Strings ;
5760import org .elasticsearch .common .UUIDs ;
5861import org .elasticsearch .common .breaker .CircuitBreaker ;
9396import org .elasticsearch .index .seqno .SeqNoStats ;
9497import org .elasticsearch .index .seqno .SequenceNumbers ;
9598import org .elasticsearch .index .snapshots .IndexShardSnapshotStatus ;
99+ import org .elasticsearch .index .store .DirectoryService ;
96100import org .elasticsearch .index .store .Store ;
97101import org .elasticsearch .index .store .StoreStats ;
98102import org .elasticsearch .index .translog .TestTranslog ;
110114import org .elasticsearch .snapshots .SnapshotId ;
111115import org .elasticsearch .snapshots .SnapshotInfo ;
112116import org .elasticsearch .snapshots .SnapshotShardFailure ;
117+ import org .elasticsearch .test .CorruptionUtils ;
113118import org .elasticsearch .test .DummyShardLock ;
114119import org .elasticsearch .test .FieldMaskingReader ;
115120import org .elasticsearch .test .VersionUtils ;
118123
119124import java .io .IOException ;
120125import java .nio .charset .Charset ;
126+ import java .nio .file .FileVisitResult ;
127+ import java .nio .file .Files ;
121128import java .nio .file .Path ;
129+ import java .nio .file .SimpleFileVisitor ;
130+ import java .nio .file .attribute .BasicFileAttributes ;
122131import java .util .ArrayList ;
123132import java .util .Arrays ;
124133import java .util .Collections ;
@@ -1216,7 +1225,7 @@ public String[] listAll() throws IOException {
12161225 };
12171226
12181227 try (Store store = createStore (shardId , new IndexSettings (metaData , Settings .EMPTY ), directory )) {
1219- IndexShard shard = newShard (shardRouting , shardPath , metaData , store ,
1228+ IndexShard shard = newShard (shardRouting , shardPath , metaData , i -> store ,
12201229 null , new InternalEngineFactory (), () -> {
12211230 }, EMPTY_EVENT_LISTENER );
12221231 AtomicBoolean failureCallbackTriggered = new AtomicBoolean (false );
@@ -2561,6 +2570,104 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept
25612570 closeShards (newShard );
25622571 }
25632572
2573+ public void testIndexCheckChecksum () throws Exception {
2574+ final boolean primary = true ;
2575+
2576+ IndexShard indexShard = newStartedShard (primary );
2577+
2578+ final long numDocs = between (10 , 100 );
2579+ for (long i = 0 ; i < numDocs ; i ++) {
2580+ indexDoc (indexShard , "_doc" , Long .toString (i ), "{}" );
2581+ }
2582+ indexShard .flush (new FlushRequest ());
2583+ closeShards (indexShard );
2584+
2585+ // start shard with checksum - it has to pass successfully
2586+
2587+ final ShardRouting shardRouting = ShardRoutingHelper .initWithSameId (indexShard .routingEntry (),
2588+ primary ? RecoverySource .StoreRecoverySource .EXISTING_STORE_INSTANCE : RecoverySource .PeerRecoverySource .INSTANCE
2589+ );
2590+ final IndexMetaData indexMetaData = IndexMetaData .builder (indexShard .indexSettings ().getIndexMetaData ())
2591+ .settings (Settings .builder ()
2592+ .put (indexShard .indexSettings .getSettings ())
2593+ .put (IndexSettings .INDEX_CHECK_ON_STARTUP .getKey (), "checksum" ))
2594+ .build ();
2595+
2596+ final ShardPath shardPath = indexShard .shardPath ();
2597+
2598+ final IndexShard newShard = newShard (shardRouting , shardPath , indexMetaData ,
2599+ null , null , indexShard .engineFactory ,
2600+ indexShard .getGlobalCheckpointSyncer (), EMPTY_EVENT_LISTENER );
2601+
2602+ closeShards (newStartedShard (p -> newShard , primary ));
2603+
2604+ // corrupt files
2605+ final Path indexPath = shardPath .getDataPath ().resolve ("index" );
2606+ final Path [] filesToCorrupt =
2607+ Files .walk (indexPath )
2608+ .filter (p -> Files .isRegularFile (p ) && IndexWriter .WRITE_LOCK_NAME .equals (p .getFileName ().toString ()) == false )
2609+ .toArray (Path []::new );
2610+ CorruptionUtils .corruptFile (random (), filesToCorrupt );
2611+
2612+ // check that corrupt marker is *NOT* there
2613+ final AtomicInteger corruptedMarkerCount = new AtomicInteger ();
2614+ final SimpleFileVisitor <Path > corruptedVisitor = new SimpleFileVisitor <Path >() {
2615+ @ Override
2616+ public FileVisitResult visitFile (Path file , BasicFileAttributes attrs ) throws IOException {
2617+ if (Files .isRegularFile (file ) && file .getFileName ().toString ().startsWith (Store .CORRUPTED )) {
2618+ corruptedMarkerCount .incrementAndGet ();
2619+ }
2620+ return FileVisitResult .CONTINUE ;
2621+ }
2622+ };
2623+ Files .walkFileTree (indexPath , corruptedVisitor );
2624+
2625+ assertThat ("store is clean" ,
2626+ corruptedMarkerCount .get (), equalTo (0 ));
2627+
2628+ // storeProvider that does not perform check index on close - it is corrupted
2629+ final CheckedFunction <IndexSettings , Store , IOException > storeProvider = indexSettings -> {
2630+ final ShardId shardId = shardPath .getShardId ();
2631+ final DirectoryService directoryService = new DirectoryService (shardId , indexSettings ) {
2632+ @ Override
2633+ public Directory newDirectory () throws IOException {
2634+ final BaseDirectoryWrapper baseDirectoryWrapper = newFSDirectory (shardPath .resolveIndex ());
2635+ // index is corrupted - don't even try to check index on close - it fails
2636+ baseDirectoryWrapper .setCheckIndexOnClose (false );
2637+ return baseDirectoryWrapper ;
2638+ }
2639+ };
2640+ return new Store (shardId , indexSettings , directoryService , new DummyShardLock (shardId ));
2641+ };
2642+
2643+ // try to start shard on corrupted files
2644+ final IndexShard corruptedShard = newShard (shardRouting , shardPath , indexMetaData ,
2645+ storeProvider , null , indexShard .engineFactory ,
2646+ indexShard .getGlobalCheckpointSyncer (), EMPTY_EVENT_LISTENER );
2647+
2648+ expectThrows (IndexShardRecoveryException .class , () -> newStartedShard (p -> corruptedShard , primary ));
2649+ closeShards (corruptedShard );
2650+
2651+ // check that corrupt marker is there
2652+ Files .walkFileTree (indexPath , corruptedVisitor );
2653+ assertThat ("store has to be marked as corrupted" ,
2654+ corruptedMarkerCount .get (), equalTo (1 ));
2655+
2656+ // try to start another time shard on corrupted files
2657+ final IndexShard corruptedShard2 = newShard (shardRouting , shardPath , indexMetaData ,
2658+ storeProvider , null , indexShard .engineFactory ,
2659+ indexShard .getGlobalCheckpointSyncer (), EMPTY_EVENT_LISTENER );
2660+
2661+ expectThrows (IndexShardRecoveryException .class , () -> newStartedShard (p -> corruptedShard2 , primary ));
2662+ closeShards (corruptedShard2 );
2663+
2664+ // check that corrupt marker is there
2665+ corruptedMarkerCount .set (0 );
2666+ Files .walkFileTree (indexPath , corruptedVisitor );
2667+ assertThat ("store still has a single corrupt marker" ,
2668+ corruptedMarkerCount .get (), equalTo (1 ));
2669+ }
2670+
25642671 /**
25652672 * Simulates a scenario that happens when we are async fetching snapshot metadata from GatewayService
25662673 * and checking index concurrently. This should always be possible without any exception.
@@ -2584,7 +2691,7 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception {
25842691 final IndexMetaData indexMetaData = IndexMetaData .builder (indexShard .indexSettings ().getIndexMetaData ())
25852692 .settings (Settings .builder ()
25862693 .put (indexShard .indexSettings .getSettings ())
2587- .put (IndexSettings .INDEX_CHECK_ON_STARTUP .getKey (), randomFrom ("false" , "true" , "checksum" , "fix" )))
2694+ .put (IndexSettings .INDEX_CHECK_ON_STARTUP .getKey (), randomFrom ("false" , "true" , "checksum" )))
25882695 .build ();
25892696 final IndexShard newShard = newShard (shardRouting , indexShard .shardPath (), indexMetaData ,
25902697 null , null , indexShard .engineFactory , indexShard .getGlobalCheckpointSyncer (), EMPTY_EVENT_LISTENER );
0 commit comments