diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPersistentCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPersistentCacheIntegTests.java index 32737e45e5193..fe9da469e945d 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPersistentCacheIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPersistentCacheIntegTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.test.BackgroundIndexer; +import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.xpack.searchablesnapshots.BaseSearchableSnapshotsIntegTestCase; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; @@ -33,6 +34,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.HashSet; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; @@ -46,6 +48,7 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.notNullValue; +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) public class SearchableSnapshotsPersistentCacheIntegTests extends BaseSearchableSnapshotsIntegTestCase { @Override @@ -87,6 +90,9 @@ public void testCacheSurviveRestart() throws Exception { ); ensureGreen(restoredIndexName); + assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); + assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME); + final Index restoredIndex = client().admin() .cluster() .prepareState() @@ -147,20 +153,20 @@ public Settings onNodeStopped(String nodeName) { } }); + ensureGreen(restoredIndexName); + + assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); + assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME); + final CacheService cacheServiceAfterRestart = internalCluster().getInstance(CacheService.class, dataNode); final PersistentCache persistentCacheAfterRestart = cacheServiceAfterRestart.getPersistentCache(); - ensureGreen(restoredIndexName); cacheFiles.forEach(cacheFile -> assertTrue(cacheFile + " should have survived node restart", Files.exists(cacheFile))); assertThat("Cache files should be loaded in cache", persistentCacheAfterRestart.getNumDocs(), equalTo((long) cacheFiles.size())); assertAcked(client().admin().indices().prepareDelete(restoredIndexName)); - - assertBusy(() -> { - cacheFiles.forEach(cacheFile -> assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile))); - cacheServiceAfterRestart.synchronizeCache(); - assertThat(persistentCacheAfterRestart.getNumDocs(), equalTo(0L)); - }); + assertBusy(() -> cacheFiles.forEach(cacheFile -> assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile)))); + assertEmptyPersistentCacheOnDataNodes(); } public void testPersistentCacheCleanUpAfterRelocation() throws Exception { @@ -186,6 +192,7 @@ public void testPersistentCacheCleanUpAfterRelocation() throws Exception { final int numDocs = scaledRandomIntBetween(1_000, 5_000); try (BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), numDocs)) { waitForDocs(numDocs, indexer); + indexer.stopAndAwaitStopped(); } refresh(indexName); @@ -212,7 +219,6 @@ public void testPersistentCacheCleanUpAfterRelocation() throws Exception { .cluster() .prepareState() .clear() - .setRoutingTable(true) .setMetadata(true) .setIndices(mountedIndexName) .get(); @@ -246,6 +252,9 @@ public void testPersistentCacheCleanUpAfterRelocation() throws Exception { ensureGreen(mountedIndexName); + assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); + assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME); + recoveryResponse = client().admin().indices().prepareRecoveries(mountedIndexName).get(); assertTrue(recoveryResponse.shardRecoveryStates().containsKey(mountedIndexName)); assertTrue( @@ -269,12 +278,26 @@ public void testPersistentCacheCleanUpAfterRelocation() throws Exception { logger.info("--> deleting mounted index {}", mountedIndex); assertAcked(client().admin().indices().prepareDelete(mountedIndexName)); + assertEmptyPersistentCacheOnDataNodes(); + } - assertBusy(() -> { - for (CacheService cacheService : internalCluster().getDataNodeInstances(CacheService.class)) { - cacheService.synchronizeCache(); - assertThat(cacheService.getPersistentCache().getNumDocs(), equalTo(0L)); - } - }); + private void assertEmptyPersistentCacheOnDataNodes() throws Exception { + final Set dataNodes = new HashSet<>(getDiscoveryNodes().getDataNodes().values()); + logger.info("--> verifying persistent caches are empty on nodes... {}", dataNodes); + try { + assertBusy(() -> { + for (DiscoveryNode node : List.copyOf(dataNodes)) { + final CacheService cacheService = internalCluster().getInstance(CacheService.class, node.getName()); + cacheService.synchronizeCache(); + assertThat(cacheService.getPersistentCache().getNumDocs(), equalTo(0L)); + logger.info("--> persistent cache is empty on node {}", node); + dataNodes.remove(node); + } + }); + logger.info("--> all persistent caches are empty"); + } catch (AssertionError ae) { + logger.error("--> persistent caches not empty on nodes: {}", dataNodes); + throw ae; + } } }