Skip to content

Commit 58eb44e

Browse files
authored
[Tiered Cache] Using a single cache manager for all ehcache disk caches (#17513)
* Using a single cache manager for all ehcache disk caches Signed-off-by: Sagar Upadhyaya <[email protected]> * Added changelog Signed-off-by: Sagar Upadhyaya <[email protected]> * Fixing cache manager UT Signed-off-by: Sagar Upadhyaya <[email protected]> * Addressing comments Signed-off-by: Sagar Upadhyaya <[email protected]> * Removing commented out code Signed-off-by: Sagar Upadhyaya <[email protected]> * Adding changelog Signed-off-by: Sagar Upadhyaya <[email protected]> * Changes to perform mutable changes for cache manager under a lock Signed-off-by: Sagar Upadhyaya <[email protected]> * Changes to fix UT Signed-off-by: Sagar Upadhyaya <[email protected]> * Addressing minor comments Signed-off-by: Sagar Upadhyaya <[email protected]> --------- Signed-off-by: Sagar Upadhyaya <[email protected]> Signed-off-by: Sagar <[email protected]>
1 parent 18b0d1c commit 58eb44e

File tree

7 files changed

+579
-150
lines changed

7 files changed

+579
-150
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2424
- [Security Manager Replacement] Phase off SecurityManager usage in favor of Java Agent ([#17861](https://github.com/opensearch-project/OpenSearch/pull/17861))
2525
- Support AutoExpand for SearchReplica ([#17741](https://github.com/opensearch-project/OpenSearch/pull/17741))
2626
- Implement fixed interval refresh task scheduling ([#17777](https://github.com/opensearch-project/OpenSearch/pull/17777))
27+
- [Tiered caching] Create a single cache manager for all the disk caches. ([#17513](https://github.com/opensearch-project/OpenSearch/pull/17513))
2728
- Add GRPC DocumentService and Bulk endpoint ([#17727](https://github.com/opensearch-project/OpenSearch/pull/17727))
2829
- Added scale to zero (`search_only` mode) support for OpenSearch reader writer separation ([#17299](https://github.com/opensearch-project/OpenSearch/pull/17299)
2930
- [Star Tree] [Search] Resolving numeric range aggregation with metric aggregation using star-tree ([#17273](https://github.com/opensearch-project/OpenSearch/pull/17273))

modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ static class TieredSpilloverCacheSegment<K, V> implements ICache<K, V> {
213213
.setSegmentCount(1) // We don't need to make underlying caches multi-segmented
214214
.setStatsTrackingEnabled(false)
215215
.setMaxSizeInBytes(diskCacheSizeInBytes)
216-
.setStoragePath(builder.cacheConfig.getStoragePath() + "/" + segmentNumber)
216+
.setStoragePath(builder.cacheConfig.getStoragePath())
217217
.setCacheAlias("tiered_disk_cache#" + segmentNumber)
218218
.build(),
219219
builder.cacheType,

plugins/cache-ehcache/src/main/java/org/opensearch/cache/EhcacheDiskCacheSettings.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.opensearch.common.cache.CacheType;
1313
import org.opensearch.common.settings.Setting;
1414
import org.opensearch.common.unit.TimeValue;
15+
import org.opensearch.threadpool.ThreadPool;
1516

1617
import java.util.HashMap;
1718
import java.util.Map;
@@ -36,17 +37,26 @@ public class EhcacheDiskCacheSettings {
3637

3738
public static final Setting.AffixSetting<Integer> DISK_WRITE_MINIMUM_THREADS_SETTING = Setting.suffixKeySetting(
3839
EhcacheDiskCache.EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME + ".min_threads",
39-
(key) -> Setting.intSetting(key, 2, 1, 5, NodeScope)
40+
(key) -> Setting.intSetting(key, 2, 1, ThreadPool.searchThreadPoolSize(Runtime.getRuntime().availableProcessors()), NodeScope)
4041
);
4142

4243
/**
43-
* Ehcache disk write maximum threads for its pool
44+
* Ehcache disk write maximum threads for its pool. The default value is 1.5 * CPU_CORES ie equal to number of
45+
* search threads. Disk operations are typically I/O bound rather than CPU bound, so setting it greater than the
46+
* number of cpu cores should ideally be fine.
4447
*
4548
* Setting pattern: {cache_type}.ehcache_disk.max_threads
4649
*/
4750
public static final Setting.AffixSetting<Integer> DISK_WRITE_MAXIMUM_THREADS_SETTING = Setting.suffixKeySetting(
4851
EhcacheDiskCache.EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME + ".max_threads",
49-
(key) -> Setting.intSetting(key, 2, 1, 20, NodeScope)
52+
(key) -> Setting.intSetting(
53+
key,
54+
ThreadPool.searchThreadPoolSize(Runtime.getRuntime().availableProcessors()),
55+
1,
56+
Runtime.getRuntime().availableProcessors() * 10, // The max one can configure this in setting is 10 times
57+
// CPU cores. Ideally won't be required, but in case one way use it.
58+
NodeScope
59+
)
5060
);
5161

5262
/**

plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java

Lines changed: 87 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
13-
import org.apache.logging.log4j.message.ParameterizedMessage;
1413
import org.opensearch.OpenSearchException;
1514
import org.opensearch.cache.EhcacheDiskCacheSettings;
1615
import org.opensearch.common.SuppressForbidden;
@@ -153,16 +152,19 @@ public class EhcacheDiskCache<K, V> implements ICache<K, V> {
153152
if (this.storagePath == null || this.storagePath.isBlank()) {
154153
throw new IllegalArgumentException("Storage path shouldn't be null or empty");
155154
}
156-
// Delete all the previous disk cache related files/data. We don't persist data between process restart for
157-
// now which is why need to do this. Clean up in case there was a non graceful restart and we had older disk
158-
// cache data still lying around.
159-
Path ehcacheDirectory = Paths.get(this.storagePath);
160-
if (Files.exists(ehcacheDirectory)) {
161-
try {
162-
logger.info("Found older disk cache data lying around during initialization under path: {}", this.storagePath);
163-
IOUtils.rm(ehcacheDirectory);
164-
} catch (IOException e) {
165-
throw new OpenSearchException(String.format(CACHE_DATA_CLEANUP_DURING_INITIALIZATION_EXCEPTION, this.storagePath), e);
155+
// Delete all the previous disk cache related files/data only if cache manager doesn't exist. As we can
156+
// create multiple caches via single cache manager for a cache type. We don't persist data between process
157+
// restart for now which is why need to do this. Clean up in case there was a non graceful restart and we had
158+
// older disk cache data still lying around.
159+
if (!EhcacheDiskCacheManager.doesCacheManagerExist(cacheType)) {
160+
Path ehcacheDirectory = Paths.get(this.storagePath);
161+
if (Files.exists(ehcacheDirectory)) {
162+
try {
163+
logger.info("Found older disk cache data lying around during initialization under path: {}", this.storagePath);
164+
IOUtils.rm(ehcacheDirectory);
165+
} catch (IOException e) {
166+
throw new OpenSearchException(String.format(CACHE_DATA_CLEANUP_DURING_INITIALIZATION_EXCEPTION, this.storagePath), e);
167+
}
166168
}
167169
}
168170
if (builder.threadPoolAlias == null || builder.threadPoolAlias.isBlank()) {
@@ -173,7 +175,7 @@ public class EhcacheDiskCache<K, V> implements ICache<K, V> {
173175
this.settings = Objects.requireNonNull(builder.getSettings(), "Settings objects shouldn't be null");
174176
this.keySerializer = Objects.requireNonNull(builder.keySerializer, "Key serializer shouldn't be null");
175177
this.valueSerializer = Objects.requireNonNull(builder.valueSerializer, "Value serializer shouldn't be null");
176-
this.cacheManager = buildCacheManager();
178+
this.cacheManager = EhcacheDiskCacheManager.getCacheManager(cacheType, this.storagePath, settings, this.threadPoolAlias);
177179
Objects.requireNonNull(builder.getRemovalListener(), "Removal listener can't be null");
178180
this.removalListener = builder.getRemovalListener();
179181
Objects.requireNonNull(builder.getWeigher(), "Weigher can't be null");
@@ -189,73 +191,54 @@ public class EhcacheDiskCache<K, V> implements ICache<K, V> {
189191
}
190192
}
191193

192-
// Package private for testing
193-
PersistentCacheManager getCacheManager() {
194-
return this.cacheManager;
195-
}
196-
197194
@SuppressWarnings({ "rawtypes", "removal" })
198195
private Cache<ICacheKey, ByteArrayWrapper> buildCache(Duration expireAfterAccess, Builder<K, V> builder) {
199196
// Creating the cache requires permissions specified in plugin-security.policy
200-
return AccessController.doPrivileged((PrivilegedAction<Cache<ICacheKey, ByteArrayWrapper>>) () -> {
201-
try {
202-
int segmentCount = (Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
203-
.get(DISK_SEGMENT_KEY)
204-
.get(settings);
205-
if (builder.getNumberOfSegments() > 0) {
206-
segmentCount = builder.getNumberOfSegments();
197+
int segmentCount = (Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType).get(DISK_SEGMENT_KEY).get(settings);
198+
if (builder.getNumberOfSegments() > 0) {
199+
segmentCount = builder.getNumberOfSegments();
200+
}
201+
CacheConfigurationBuilder<ICacheKey, ByteArrayWrapper> cacheConfigurationBuilder = CacheConfigurationBuilder
202+
.newCacheConfigurationBuilder(
203+
ICacheKey.class,
204+
ByteArrayWrapper.class,
205+
ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B)
206+
).withExpiry(new ExpiryPolicy<>() {
207+
@Override
208+
public Duration getExpiryForCreation(ICacheKey key, ByteArrayWrapper value) {
209+
return INFINITE;
207210
}
208-
return this.cacheManager.createCache(
209-
this.diskCacheAlias,
210-
CacheConfigurationBuilder.newCacheConfigurationBuilder(
211-
ICacheKey.class,
212-
ByteArrayWrapper.class,
213-
ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B)
214-
).withExpiry(new ExpiryPolicy<>() {
215-
@Override
216-
public Duration getExpiryForCreation(ICacheKey key, ByteArrayWrapper value) {
217-
return INFINITE;
218-
}
219-
220-
@Override
221-
public Duration getExpiryForAccess(ICacheKey key, Supplier<? extends ByteArrayWrapper> value) {
222-
return expireAfterAccess;
223-
}
224-
225-
@Override
226-
public Duration getExpiryForUpdate(
227-
ICacheKey key,
228-
Supplier<? extends ByteArrayWrapper> oldValue,
229-
ByteArrayWrapper newValue
230-
) {
231-
return INFINITE;
232-
}
233-
})
234-
.withService(getListenerConfiguration(builder))
235-
.withService(
236-
new OffHeapDiskStoreConfiguration(
237-
this.threadPoolAlias,
238-
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
239-
.get(DISK_WRITE_CONCURRENCY_KEY)
240-
.get(settings),
241-
segmentCount
242-
)
243-
)
244-
.withKeySerializer(new KeySerializerWrapper(keySerializer))
245-
.withValueSerializer(new ByteArrayWrapperSerializer())
246-
// We pass ByteArrayWrapperSerializer as ehcache's value serializer. If V is an interface, and we pass its
247-
// serializer directly to ehcache, ehcache requires the classes match exactly before/after serialization.
248-
// This is not always feasible or necessary, like for BytesReference. So, we handle the value serialization
249-
// before V hits ehcache.
250-
);
251-
} catch (IllegalArgumentException ex) {
252-
logger.error("Ehcache disk cache initialization failed due to illegal argument: {}", ex.getMessage());
253-
throw ex;
254-
} catch (IllegalStateException ex) {
255-
logger.error("Ehcache disk cache initialization failed: {}", ex.getMessage());
256-
throw ex;
257-
}
258-
});
211+
212+
@Override
213+
public Duration getExpiryForAccess(ICacheKey key, Supplier<? extends ByteArrayWrapper> value) {
214+
return expireAfterAccess;
215+
}
216+
217+
@Override
218+
public Duration getExpiryForUpdate(
219+
ICacheKey key,
220+
Supplier<? extends ByteArrayWrapper> oldValue,
221+
ByteArrayWrapper newValue
222+
) {
223+
return INFINITE;
224+
}
225+
})
226+
.withService(getListenerConfiguration(builder))
227+
.withService(
228+
new OffHeapDiskStoreConfiguration(
229+
this.threadPoolAlias,
230+
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType).get(DISK_WRITE_CONCURRENCY_KEY).get(settings),
231+
segmentCount
232+
)
233+
)
234+
.withKeySerializer(new KeySerializerWrapper(keySerializer))
235+
.withValueSerializer(new ByteArrayWrapperSerializer()); // We pass ByteArrayWrapperSerializer as ehcache's value serializer. If
236+
// V is an interface, and we pass its
237+
// serializer directly to ehcache, ehcache requires the classes match exactly before/after serialization.
238+
// This is not always feasible or necessary, like for BytesReference. So, we handle the value serialization
239+
// before V hits ehcache.
240+
241+
return EhcacheDiskCacheManager.createCache(cacheType, this.diskCacheAlias, cacheConfigurationBuilder);
259242
}
260243

261244
private CacheEventListenerConfigurationBuilder getListenerConfiguration(Builder<K, V> builder) {
@@ -470,21 +453,7 @@ public void refresh() {
470453
@Override
471454
@SuppressForbidden(reason = "Ehcache uses File.io")
472455
public void close() {
473-
try {
474-
cacheManager.close();
475-
} catch (Exception e) {
476-
logger.error(() -> new ParameterizedMessage("Exception occurred while trying to close ehcache manager"), e);
477-
}
478-
// Delete all the disk cache related files/data in case it is present
479-
Path ehcacheDirectory = Paths.get(this.storagePath);
480-
if (Files.exists(ehcacheDirectory)) {
481-
try {
482-
IOUtils.rm(ehcacheDirectory);
483-
} catch (IOException e) {
484-
logger.error(() -> new ParameterizedMessage("Failed to delete ehcache disk cache data under path: {}", this.storagePath));
485-
}
486-
}
487-
456+
EhcacheDiskCacheManager.closeCache(cacheType, diskCacheAlias, storagePath);
488457
}
489458

490459
/**
@@ -597,16 +566,24 @@ public void onEvent(CacheEvent<? extends ICacheKey<K>, ? extends ByteArrayWrappe
597566
* Wrapper over ICacheKeySerializer which is compatible with ehcache's serializer requirements.
598567
*/
599568
@SuppressWarnings({ "rawtypes", "unchecked" })
600-
private class KeySerializerWrapper implements org.ehcache.spi.serialization.Serializer<ICacheKey> {
569+
public class KeySerializerWrapper implements org.ehcache.spi.serialization.Serializer<ICacheKey> {
601570
private ICacheKeySerializer<K> serializer;
602571

572+
/**
573+
* Constructor for key serializer
574+
* @param internalKeySerializer serializer for internal key
575+
*/
603576
public KeySerializerWrapper(Serializer<K, byte[]> internalKeySerializer) {
604577
this.serializer = new ICacheKeySerializer<>(internalKeySerializer);
605578
}
606579

607-
// This constructor must be present, but does not have to work as we are not actually persisting the disk
608-
// cache after a restart.
609-
// See https://www.ehcache.org/documentation/3.0/serializers-copiers.html#persistent-vs-transient-caches
580+
/**
581+
* This constructor must be present, but does not have to work as we are not actually persisting the disk
582+
* cache after a restart. See https://www.ehcache.org/documentation/3.0/serializers-copiers
583+
* .html#persistent-vs-transient-caches
584+
* @param classLoader
585+
* @param persistenceContext
586+
*/
610587
public KeySerializerWrapper(ClassLoader classLoader, FileBasedPersistenceContext persistenceContext) {}
611588

612589
@Override
@@ -632,12 +609,19 @@ public boolean equals(ICacheKey object, ByteBuffer binary) throws ClassNotFoundE
632609
/**
633610
* Wrapper allowing Ehcache to serialize ByteArrayWrapper.
634611
*/
635-
private static class ByteArrayWrapperSerializer implements org.ehcache.spi.serialization.Serializer<ByteArrayWrapper> {
612+
public static class ByteArrayWrapperSerializer implements org.ehcache.spi.serialization.Serializer<ByteArrayWrapper> {
613+
/**
614+
* Default constructor
615+
*/
636616
public ByteArrayWrapperSerializer() {}
637617

638-
// This constructor must be present, but does not have to work as we are not actually persisting the disk
639-
// cache after a restart.
640-
// See https://www.ehcache.org/documentation/3.0/serializers-copiers.html#persistent-vs-transient-caches
618+
/**
619+
* This constructor must be present, but does not have to work as we are not actually persisting the disk
620+
* cache after a restart. See https://www.ehcache.org/documentation/3.0/serializers-copiers
621+
* .html#persistent-vs-transient-caches
622+
* @param classLoader
623+
* @param persistenceContext
624+
*/
641625
public ByteArrayWrapperSerializer(ClassLoader classLoader, FileBasedPersistenceContext persistenceContext) {}
642626

643627
@Override
@@ -906,9 +890,13 @@ public EhcacheDiskCache<K, V> build() {
906890
* A wrapper over byte[], with equals() that works using Arrays.equals().
907891
* Necessary due to a limitation in how Ehcache compares byte[].
908892
*/
909-
static class ByteArrayWrapper {
893+
public static class ByteArrayWrapper {
910894
private final byte[] value;
911895

896+
/**
897+
* Constructor for byte array wrapper.
898+
* @param value value to wrap.
899+
*/
912900
public ByteArrayWrapper(byte[] value) {
913901
this.value = value;
914902
}

0 commit comments

Comments
 (0)