Skip to content

Commit

Permalink
Integrating IndicesRequestCache with CacheService controlled by a fea…
Browse files Browse the repository at this point in the history
…ture flag

Signed-off-by: Sagar Upadhyaya <[email protected]>
  • Loading branch information
sgup432 committed Mar 5, 2024
1 parent 1d97c3d commit 32f83c3
Show file tree
Hide file tree
Showing 14 changed files with 276 additions and 28 deletions.
2 changes: 2 additions & 0 deletions modules/cache-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
* compatible open source license.
*/

apply plugin: 'opensearch.internal-cluster-test'

opensearchplugin {
description 'Module for caches which are optional and do not require additional security permission'
classname 'org.opensearch.cache.common.tier.TieredSpilloverCachePlugin'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cache.common.tier;

import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchType;
import org.opensearch.client.Client;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.store.OpenSearchOnHeapCache;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.IndicesRequestCache;
import org.opensearch.plugins.CachePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Assert;

import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.greaterThan;

public class TieredSpilloverCacheIT extends OpenSearchIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(TieredSpilloverCachePlugin.class, MockDiskCachePlugin.class);
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.TIERED_CACHING, "true").build();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(
CacheSettings.getConcreteSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
MockOnDiskCache.MockDiskCacheFactory.NAME
)
.build();
}

public void testPluginsAreInstalled() {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName());
NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
List<PluginInfo> pluginInfos = nodesInfoResponse.getNodes()
.stream()
.flatMap(
(Function<NodeInfo, Stream<PluginInfo>>) nodeInfo -> nodeInfo.getInfo(PluginsAndModules.class).getPluginInfos().stream()
)
.collect(Collectors.toList());
Assert.assertTrue(
pluginInfos.stream()
.anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.cache.common" + ".tier.TieredSpilloverCachePlugin"))
);
}

public void testSanityChecksWithIndicesRequestCache() throws InterruptedException {
Client client = client();
assertAcked(
client.admin()
.indices()
.prepareCreate("index")
.setMapping("f", "type=date")
.setSettings(Settings.builder().put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true).build())
.get()
);
indexRandom(
true,
client.prepareIndex("index").setSource("f", "2014-03-10T00:00:00.000Z"),
client.prepareIndex("index").setSource("f", "2014-05-13T00:00:00.000Z")
);
ensureSearchable("index");

// This is not a random example: serialization with time zones writes shared strings
// which used to not work well with the query cache because of the handles stream output
// see #9500
final SearchResponse r1 = client.prepareSearch("index")
.setSize(0)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.addAggregation(
dateHistogram("histo").field("f")
.timeZone(ZoneId.of("+01:00"))
.minDocCount(0)
.dateHistogramInterval(DateHistogramInterval.MONTH)
)
.get();
assertSearchResponse(r1);

// The cached is actually used
assertThat(
client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(),
greaterThan(0L)
);
}

public static class MockDiskCachePlugin extends Plugin implements CachePlugin {

public MockDiskCachePlugin() {}

@Override
public Map<String, ICache.Factory> getCacheFactoryMap() {
return Map.of(MockOnDiskCache.MockDiskCacheFactory.NAME, new MockOnDiskCache.MockDiskCacheFactory(0, 1000));
}

@Override
public String getName() {
return "mock_disk_plugin";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cache;

import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Assert;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class EhcacheDiskCacheIT extends OpenSearchIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(EhcacheCachePlugin.class);
}

public void testPluginsAreInstalled() {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName());
NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
List<PluginInfo> pluginInfos = nodesInfoResponse.getNodes()
.stream()
.flatMap(
(Function<NodeInfo, Stream<PluginInfo>>) nodeInfo -> nodeInfo.getInfo(PluginsAndModules.class).getPluginInfos().stream()
)
.collect(Collectors.toList());
Assert.assertTrue(
pluginInfos.stream().anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.cache.EhcacheCachePlugin"))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.time.DateFormatter;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.cache.request.RequestCacheStats;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
Expand Down Expand Up @@ -81,6 +82,11 @@ public static Collection<Object[]> parameters() {
);
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.TIERED_CACHING, "true").build();
}

@Override
protected boolean useRandomReplicationStrategy() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.common.cache.service;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.settings.CacheSettings;
Expand All @@ -21,6 +22,7 @@
/**
* Service responsible to create caches.
*/
@ExperimentalApi
public class CacheService {

private final Map<String, ICache.Factory> cacheStoreTypeFactories;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.store.OpenSearchOnHeapCache;
import org.opensearch.common.settings.Setting;

/**
Expand All @@ -25,7 +26,7 @@ public class CacheSettings {
*/
public static final Setting.AffixSetting<String> CACHE_TYPE_STORE_NAME = Setting.suffixKeySetting(
"store.name",
(key) -> Setting.simpleString(key, "", Setting.Property.NodeScope)
(key) -> Setting.simpleString(key, OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, Setting.Property.NodeScope)
);

public static Setting<String> getConcreteSettingForCacheType(CacheType cacheType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.network.NetworkService;
Expand Down Expand Up @@ -729,6 +731,8 @@ public void apply(Settings value, Settings current, Settings previous) {
TelemetrySettings.METRICS_PUBLISH_INTERVAL_SETTING,
TelemetrySettings.TRACER_FEATURE_ENABLED_SETTING,
TelemetrySettings.METRICS_FEATURE_ENABLED_SETTING
)
),
List.of(FeatureFlags.TIERED_CACHING),
List.of(CacheSettings.getConcreteSettingForCacheType(CacheType.INDICES_REQUEST_CACHE))
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ protected FeatureFlagSettings(
FeatureFlags.DATETIME_FORMATTER_CACHING_SETTING,
FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING,
FeatureFlags.DOC_ID_FUZZY_SET_SETTING,
FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING
FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING,
FeatureFlags.TIERED_CACHING_SETTING
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public class FeatureFlags {
*/
public static final String DOC_ID_FUZZY_SET = "opensearch.experimental.optimize_doc_id_lookup.fuzzy_set.enabled";

/**
* Gates the functionality of tiered caching
*/
public static final String TIERED_CACHING = "opensearch.experimental.feature.tiered.caching.enabled";

/**
* Should store the settings from opensearch.yml.
*/
Expand Down Expand Up @@ -128,4 +133,6 @@ public static boolean isEnabled(Setting<Boolean> featureFlag) {
);

public static final Setting<Boolean> DOC_ID_FUZZY_SET_SETTING = Setting.boolSetting(DOC_ID_FUZZY_SET, false, Property.NodeScope);

public static final Setting<Boolean> TIERED_CACHING_SETTING = Setting.boolSetting(TIERED_CACHING, false, Property.NodeScope);
}
Loading

0 comments on commit 32f83c3

Please sign in to comment.