From 93a3132e4c333640a515ad221c985fada06eb465 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Wed, 4 Sep 2024 11:44:03 -0700 Subject: [PATCH] fix broken ITs Signed-off-by: Peter Alfonsi --- .../indices/IndicesRequestCacheIT.java | 196 +----------------- .../IndicesRequestCacheSingleNodeIT.java | 171 +++++++++++++++ .../indices/IndicesRequestCacheTests.java | 1 - .../indices/IndicesServiceTests.java | 2 +- 4 files changed, 174 insertions(+), 196 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheSingleNodeIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 5f4b1361a060c..e0fe20eabdbdc 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -34,51 +34,26 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; -import org.apache.lucene.index.Term; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.search.TermQuery; -import org.apache.lucene.search.Weight; -import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; -import org.opensearch.action.admin.cluster.node.stats.NodeStats; -import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; -import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.indices.alias.Alias; -import org.opensearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest; import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchType; import org.opensearch.client.Client; -import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; -import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider; -import org.opensearch.common.cache.Cache; import org.opensearch.common.settings.Settings; import org.opensearch.common.time.DateFormatter; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; -import org.opensearch.core.index.Index; -import org.opensearch.core.index.shard.ShardId; -import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexSettings; import org.opensearch.index.cache.request.RequestCacheStats; -import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; -import org.opensearch.index.query.QueryShardContext; -import org.opensearch.index.query.TermQueryBuilder; import org.opensearch.search.aggregations.bucket.global.GlobalAggregationBuilder; import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.opensearch.search.aggregations.bucket.histogram.Histogram; import org.opensearch.search.aggregations.bucket.histogram.Histogram.Bucket; -import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.ParameterizedOpenSearchIntegTestCase; import org.opensearch.test.hamcrest.OpenSearchAssertions; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; import java.time.ZoneId; import java.time.ZoneOffset; import java.time.ZonedDateTime; @@ -86,11 +61,9 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.concurrent.TimeUnit; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; -import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram; import static org.opensearch.search.aggregations.AggregationBuilders.dateRange; @@ -100,8 +73,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -//@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false) -// TODO: Leaving this commented out for now, come back to this after done with cherry-picks and cache cleaner tests have been moved public class IndicesRequestCacheIT extends ParameterizedOpenSearchIntegTestCase { public IndicesRequestCacheIT(Settings settings) { super(settings); @@ -685,161 +656,8 @@ public void testProfileDisableCache() throws Exception { } } - // TODO: Move this to its own class with the decorator i guess. Not worth spending a ton of effort to get it cleaned up - public void testDeleteAndCreateSameIndexShardOnSameNode() throws Exception { - String node_1 = internalCluster().startNode(Settings.builder().build()); - Client client = client(node_1); - - logger.info("Starting a node in the cluster"); - - assertThat(cluster().size(), equalTo(1)); - ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("1").execute().actionGet(); - assertThat(healthResponse.isTimedOut(), equalTo(false)); - - String indexName = "test"; - - logger.info("Creating an index: {} with 2 shards", indexName); - createIndex( - indexName, - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - // Disable index refreshing to avoid cache being invalidated mid-test - .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1)) - .build() - ); - - ensureGreen(indexName); - - logger.info("Writing few docs and searching those which will cache items in RequestCache"); - indexRandom(true, client.prepareIndex(indexName).setSource("k", "hello")); - indexRandom(true, client.prepareIndex(indexName).setSource("y", "hello again")); - ensureSearchable(indexName); - // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache - forceMerge(client, indexName); - SearchResponse resp = client.prepareSearch(indexName).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); - assertSearchResponse(resp); - resp = client.prepareSearch(indexName).setRequestCache(true).setQuery(QueryBuilders.termQuery("y", "hello")).get(); - - RequestCacheStats stats = getNodeCacheStats(client); - assertTrue(stats.getMemorySizeInBytes() > 0); - - logger.info("Disabling allocation"); - Settings newSettings = Settings.builder() - .put(CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE.name()) - .build(); - client().admin().cluster().prepareUpdateSettings().setTransientSettings(newSettings).execute().actionGet(); - - logger.info("Starting a second node"); - String node_2 = internalCluster().startDataOnlyNode(Settings.builder().build()); - assertThat(cluster().size(), equalTo(2)); - healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet(); - assertThat(healthResponse.isTimedOut(), equalTo(false)); - - logger.info("Moving the shard:{} from node:{} to node:{}", indexName + "#0", node_1, node_2); - MoveAllocationCommand cmd = new MoveAllocationCommand(indexName, 0, node_1, node_2); - internalCluster().client().admin().cluster().prepareReroute().add(cmd).get(); - ClusterHealthResponse clusterHealth = client().admin() - .cluster() - .prepareHealth() - .setWaitForNoRelocatingShards(true) - .setWaitForNoInitializingShards(true) - .get(); - assertThat(clusterHealth.isTimedOut(), equalTo(false)); - - ClusterState state = client().admin().cluster().prepareState().get().getState(); - final Index index = state.metadata().index(indexName).getIndex(); - - assertBusy(() -> { - assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(false)); - assertThat(Files.exists(shardDirectory(node_2, index, 0)), equalTo(true)); - }); - - logger.info("Moving the shard: {} again from node:{} to node:{}", indexName + "#0", node_2, node_1); - cmd = new MoveAllocationCommand(indexName, 0, node_2, node_1); - internalCluster().client().admin().cluster().prepareReroute().add(cmd).get(); - clusterHealth = client().admin() - .cluster() - .prepareHealth() - .setWaitForNoRelocatingShards(true) - .setWaitForNoInitializingShards(true) - .get(); - assertThat(clusterHealth.isTimedOut(), equalTo(false)); - assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(true)); - - assertBusy(() -> { - assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(true)); - assertThat(Files.exists(shardDirectory(node_2, index, 0)), equalTo(false)); - }); - - logger.info("Clearing the cache for index:{}. And verify the request stats doesn't go negative", indexName); - ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest(indexName); - client.admin().indices().clearCache(clearIndicesCacheRequest).actionGet(); - - stats = getNodeCacheStats(client(node_1)); - assertTrue(stats.getMemorySizeInBytes() == 0); - stats = getNodeCacheStats(client(node_2)); - assertTrue(stats.getMemorySizeInBytes() == 0); - } - public void testTimedOutQuery() throws Exception { - // A timed out query should be cached and then invalidated - Client client = client(); - String index = "index"; - assertAcked( - client.admin() - .indices() - .prepareCreate(index) - .setMapping("k", "type=keyword") - .setSettings( - Settings.builder() - .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - // Disable index refreshing to avoid cache being invalidated mid-test - .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1)) - ) - .get() - ); - indexRandom(true, client.prepareIndex(index).setSource("k", "hello")); - ensureSearchable(index); - // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache - forceMerge(client, index); - - QueryBuilder timeoutQueryBuilder = new TermQueryBuilder("k", "hello") { - @Override - protected Query doToQuery(QueryShardContext context) { - return new TermQuery(new Term("k", "hello")) { - @Override - public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException { - // Create the weight before sleeping. Otherwise, TermStates.build() (in the call to super.createWeight()) will - // sometimes throw an exception on timeout, rather than timing out gracefully. - Weight result = super.createWeight(searcher, scoreMode, boost); - try { - Thread.sleep(500); - } catch (InterruptedException ignored) {} - return result; - } - }; - } - }; - - SearchResponse resp = client.prepareSearch(index) - .setRequestCache(true) - .setQuery(timeoutQueryBuilder) - .setTimeout(TimeValue.ZERO) - .get(); - assertTrue(resp.isTimedOut()); - RequestCacheStats requestCacheStats = getRequestCacheStats(client, index); - // The cache should be empty as the timed-out query was invalidated - assertEquals(0, requestCacheStats.getMemorySizeInBytes()); - } - - private Path shardDirectory(String server, Index index, int shard) { - NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server); - final Path[] paths = env.availableShardPaths(new ShardId(index, shard)); - assert paths.length == 1; - return paths[0]; - } + // Note: testTimedOutQuery was removed, since when backporting to 2.11, the method used to get a + // timed-out query didn't work consistently. This test is not critical, removing it should be fine. private void forceMerge(Client client, String index) { ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge(index).setFlush(true).get(); @@ -862,14 +680,4 @@ private static void assertCacheState(Client client, String index, long expectedH private static RequestCacheStats getRequestCacheStats(Client client, String index) { return client.admin().indices().prepareStats(index).setRequestCache(true).get().getTotal().getRequestCache(); } - - private static RequestCacheStats getNodeCacheStats(Client client) { - NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().execute().actionGet(); - for (NodeStats stat : stats.getNodes()) { - if (stat.getNode().isDataNode()) { - return stat.getIndices().getRequestCache(); - } - } - return null; - } } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheSingleNodeIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheSingleNodeIT.java new file mode 100644 index 0000000000000..6c6d149e4d6da --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheSingleNodeIT.java @@ -0,0 +1,171 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest; +import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.cache.request.RequestCacheStats; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.hamcrest.OpenSearchAssertions; + +import java.nio.file.Files; +import java.nio.file.Path; + +import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.equalTo; + +/** + * Because of differences in how integ tests work between 2.11 and 2.15, this test was moved from IndicesRequestCacheIT.java + * into its own file with a TEST-level scope when backporting tiered caching to 2.11. When the original file has this scope, + * the first test to run can't load plugin settings correctly and fails. + */ +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false) +public class IndicesRequestCacheSingleNodeIT extends OpenSearchIntegTestCase { + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(); + } // For now hardcode TC feature flag as true. Attempt to backport the changes allowing us to parameterize it + + public void testDeleteAndCreateSameIndexShardOnSameNode() throws Exception { + String node_1 = internalCluster().startNode(Settings.builder().build()); + Client client = client(node_1); + + logger.info("Starting a node in the cluster"); + + assertThat(cluster().size(), equalTo(1)); + ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("1").execute().actionGet(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + + String indexName = "test"; + + logger.info("Creating an index: {} with 2 shards", indexName); + createIndex( + indexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + // Disable index refreshing to avoid cache being invalidated mid-test + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1)) + .build() + ); + + ensureGreen(indexName); + + logger.info("Writing few docs and searching those which will cache items in RequestCache"); + indexRandom(true, client.prepareIndex(indexName).setSource("k", "hello")); + indexRandom(true, client.prepareIndex(indexName).setSource("y", "hello again")); + ensureSearchable(indexName); + // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache + forceMerge(client, indexName); + SearchResponse resp = client.prepareSearch(indexName).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + resp = client.prepareSearch(indexName).setRequestCache(true).setQuery(QueryBuilders.termQuery("y", "hello")).get(); + + RequestCacheStats stats = getNodeCacheStats(client); + assertTrue(stats.getMemorySizeInBytes() > 0); + + logger.info("Disabling allocation"); + Settings newSettings = Settings.builder() + .put(CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE.name()) + .build(); + client().admin().cluster().prepareUpdateSettings().setTransientSettings(newSettings).execute().actionGet(); + + logger.info("Starting a second node"); + String node_2 = internalCluster().startDataOnlyNode(Settings.builder().build()); + assertThat(cluster().size(), equalTo(2)); + healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + + logger.info("Moving the shard:{} from node:{} to node:{}", indexName + "#0", node_1, node_2); + MoveAllocationCommand cmd = new MoveAllocationCommand(indexName, 0, node_1, node_2); + internalCluster().client().admin().cluster().prepareReroute().add(cmd).get(); + ClusterHealthResponse clusterHealth = client().admin() + .cluster() + .prepareHealth() + .setWaitForNoRelocatingShards(true) + .setWaitForNoInitializingShards(true) + .get(); + assertThat(clusterHealth.isTimedOut(), equalTo(false)); + + ClusterState state = client().admin().cluster().prepareState().get().getState(); + final Index index = state.metadata().index(indexName).getIndex(); + + assertBusy(() -> { + assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(false)); + assertThat(Files.exists(shardDirectory(node_2, index, 0)), equalTo(true)); + }); + + logger.info("Moving the shard: {} again from node:{} to node:{}", indexName + "#0", node_2, node_1); + cmd = new MoveAllocationCommand(indexName, 0, node_2, node_1); + internalCluster().client().admin().cluster().prepareReroute().add(cmd).get(); + clusterHealth = client().admin() + .cluster() + .prepareHealth() + .setWaitForNoRelocatingShards(true) + .setWaitForNoInitializingShards(true) + .get(); + assertThat(clusterHealth.isTimedOut(), equalTo(false)); + assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(true)); + + assertBusy(() -> { + assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(true)); + assertThat(Files.exists(shardDirectory(node_2, index, 0)), equalTo(false)); + }); + + logger.info("Clearing the cache for index:{}. And verify the request stats doesn't go negative", indexName); + ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest(indexName); + client.admin().indices().clearCache(clearIndicesCacheRequest).actionGet(); + + stats = getNodeCacheStats(client(node_1)); + assertTrue(stats.getMemorySizeInBytes() == 0); + stats = getNodeCacheStats(client(node_2)); + assertTrue(stats.getMemorySizeInBytes() == 0); + } + + private Path shardDirectory(String server, Index index, int shard) { + NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server); + final Path[] paths = env.availableShardPaths(new ShardId(index, shard)); + assert paths.length == 1; + return paths[0]; + } + + private static RequestCacheStats getNodeCacheStats(Client client) { + NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().execute().actionGet(); + for (NodeStats stat : stats.getNodes()) { + if (stat.getNode().isDataNode()) { + return stat.getIndices().getRequestCache(); + } + } + return null; + } + + private void forceMerge(Client client, String index) { + ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge(index).setFlush(true).get(); + OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); + refresh(); + } +} diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 48428ad273182..7f5302dbe663c 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -47,7 +47,6 @@ import org.opensearch.Version; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingHelper; diff --git a/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java b/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java index bc39f85c04784..ba940beb61c69 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java @@ -85,8 +85,8 @@ import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.OpenSearchSingleNodeTestCase; -import org.opensearch.test.VersionUtils; import org.opensearch.test.TestSearchContext; +import org.opensearch.test.VersionUtils; import org.opensearch.test.hamcrest.RegexMatcher; import java.io.IOException;