Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.xpack.searchablesnapshots.BaseSearchableSnapshotsIntegTestCase;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots;
Expand All @@ -33,6 +34,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
Expand All @@ -46,6 +48,7 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
public class SearchableSnapshotsPersistentCacheIntegTests extends BaseSearchableSnapshotsIntegTestCase {

@Override
Expand Down Expand Up @@ -87,6 +90,9 @@ public void testCacheSurviveRestart() throws Exception {
);
ensureGreen(restoredIndexName);

assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME);

final Index restoredIndex = client().admin()
.cluster()
.prepareState()
Expand Down Expand Up @@ -147,20 +153,20 @@ public Settings onNodeStopped(String nodeName) {
}
});

ensureGreen(restoredIndexName);

assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME);

final CacheService cacheServiceAfterRestart = internalCluster().getInstance(CacheService.class, dataNode);
final PersistentCache persistentCacheAfterRestart = cacheServiceAfterRestart.getPersistentCache();
ensureGreen(restoredIndexName);

cacheFiles.forEach(cacheFile -> assertTrue(cacheFile + " should have survived node restart", Files.exists(cacheFile)));
assertThat("Cache files should be loaded in cache", persistentCacheAfterRestart.getNumDocs(), equalTo((long) cacheFiles.size()));

assertAcked(client().admin().indices().prepareDelete(restoredIndexName));

assertBusy(() -> {
cacheFiles.forEach(cacheFile -> assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile)));
cacheServiceAfterRestart.synchronizeCache();
assertThat(persistentCacheAfterRestart.getNumDocs(), equalTo(0L));
});
assertBusy(() -> cacheFiles.forEach(cacheFile -> assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile))));
assertEmptyPersistentCacheOnDataNodes();
}

public void testPersistentCacheCleanUpAfterRelocation() throws Exception {
Expand All @@ -186,6 +192,7 @@ public void testPersistentCacheCleanUpAfterRelocation() throws Exception {
final int numDocs = scaledRandomIntBetween(1_000, 5_000);
try (BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), numDocs)) {
waitForDocs(numDocs, indexer);
indexer.stopAndAwaitStopped();
}
refresh(indexName);

Expand All @@ -212,7 +219,6 @@ public void testPersistentCacheCleanUpAfterRelocation() throws Exception {
.cluster()
.prepareState()
.clear()
.setRoutingTable(true)
.setMetadata(true)
.setIndices(mountedIndexName)
.get();
Expand Down Expand Up @@ -246,6 +252,9 @@ public void testPersistentCacheCleanUpAfterRelocation() throws Exception {

ensureGreen(mountedIndexName);

assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME);

recoveryResponse = client().admin().indices().prepareRecoveries(mountedIndexName).get();
assertTrue(recoveryResponse.shardRecoveryStates().containsKey(mountedIndexName));
assertTrue(
Expand All @@ -269,12 +278,26 @@ public void testPersistentCacheCleanUpAfterRelocation() throws Exception {

logger.info("--> deleting mounted index {}", mountedIndex);
assertAcked(client().admin().indices().prepareDelete(mountedIndexName));
assertEmptyPersistentCacheOnDataNodes();
}

assertBusy(() -> {
for (CacheService cacheService : internalCluster().getDataNodeInstances(CacheService.class)) {
cacheService.synchronizeCache();
assertThat(cacheService.getPersistentCache().getNumDocs(), equalTo(0L));
}
});
private void assertEmptyPersistentCacheOnDataNodes() throws Exception {
final Set<DiscoveryNode> dataNodes = new HashSet<>(getDiscoveryNodes().getDataNodes().values());
logger.info("--> verifying persistent caches are empty on nodes... {}", dataNodes);
try {
assertBusy(() -> {
for (DiscoveryNode node : List.copyOf(dataNodes)) {
final CacheService cacheService = internalCluster().getInstance(CacheService.class, node.getName());
cacheService.synchronizeCache();
assertThat(cacheService.getPersistentCache().getNumDocs(), equalTo(0L));
logger.info("--> persistent cache is empty on node {}", node);
dataNodes.remove(node);
}
});
logger.info("--> all persistent caches are empty");
} catch (AssertionError ae) {
logger.error("--> persistent caches not empty on nodes: {}", dataNodes);
throw ae;
}
}
}