Skip to content

Commit 5e6f3ee

Browse files
authored
Add StoreFactory plugin interface for custom Store implementations (#19091)
* Add StoreFactory plugin interface for custom Store implementations Signed-off-by: xuxiong1 <[email protected]> * Resolve comments Signed-off-by: xuxiong1 <[email protected]> --------- Signed-off-by: xuxiong1 <[email protected]>
1 parent 484454c commit 5e6f3ee

File tree

10 files changed

+244
-12
lines changed

10 files changed

+244
-12
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
99
- Add temporal routing processors for time-based document routing ([#18920](https://github.com/opensearch-project/OpenSearch/issues/18920))
1010
- The dynamic mapping parameter supports false_allow_templates ([#19065](https://github.com/opensearch-project/OpenSearch/pull/19065))
1111
- Add a toBuilder method in EngineConfig to support easy modification of configs([#19054](https://github.com/opensearch-project/OpenSearch/pull/19054))
12+
- Add StoreFactory plugin interface for custom Store implementations([#19091](https://github.com/opensearch-project/OpenSearch/pull/19091))
1213

1314
### Changed
1415
- Add CompletionStage variants to methods in the Client Interface and default to ActionListener impl ([#18998](https://github.com/opensearch-project/OpenSearch/pull/18998))

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
199199
BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING,
200200
IndexModule.INDEX_STORE_TYPE_SETTING,
201201
IndexModule.INDEX_COMPOSITE_STORE_TYPE_SETTING,
202+
IndexModule.INDEX_STORE_FACTORY_SETTING,
202203
IndexModule.INDEX_STORE_PRE_LOAD_SETTING,
203204
IndexModule.INDEX_STORE_HYBRID_NIO_EXTENSIONS,
204205
IndexModule.INDEX_RECOVERY_TYPE_SETTING,

server/src/main/java/org/opensearch/index/IndexModule.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.opensearch.index.similarity.SimilarityService;
8080
import org.opensearch.index.store.DefaultCompositeDirectoryFactory;
8181
import org.opensearch.index.store.FsDirectoryFactory;
82+
import org.opensearch.index.store.Store;
8283
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
8384
import org.opensearch.index.store.remote.filecache.FileCache;
8485
import org.opensearch.index.translog.TranslogFactory;
@@ -156,6 +157,17 @@ public final class IndexModule {
156157
Property.NodeScope
157158
);
158159

160+
/**
161+
* Index setting that selects a custom StoreFactory provided by plugins. If empty, the default store is used
162+
*/
163+
public static final Setting<String> INDEX_STORE_FACTORY_SETTING = new Setting<>(
164+
"index.store.factory",
165+
"",
166+
Function.identity(),
167+
Property.IndexScope,
168+
Property.NodeScope
169+
);
170+
159171
/**
160172
* Index setting which used to determine how the data is cached locally fully or partially.
161173
*/
@@ -260,6 +272,7 @@ public final class IndexModule {
260272
private final AtomicBoolean frozen = new AtomicBoolean(false);
261273
private final BooleanSupplier allowExpensiveQueries;
262274
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
275+
private final Map<String, IndexStorePlugin.StoreFactory> storeFactories;
263276
private final FileCache fileCache;
264277
private final CompositeIndexSettings compositeIndexSettings;
265278

@@ -283,6 +296,7 @@ public IndexModule(
283296
final BooleanSupplier allowExpensiveQueries,
284297
final IndexNameExpressionResolver expressionResolver,
285298
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
299+
final Map<String, IndexStorePlugin.StoreFactory> storeFactories,
286300
final FileCache fileCache,
287301
final CompositeIndexSettings compositeIndexSettings
288302
) {
@@ -297,6 +311,7 @@ public IndexModule(
297311
this.allowExpensiveQueries = allowExpensiveQueries;
298312
this.expressionResolver = expressionResolver;
299313
this.recoveryStateFactories = recoveryStateFactories;
314+
this.storeFactories = storeFactories;
300315
this.fileCache = fileCache;
301316
this.compositeIndexSettings = compositeIndexSettings;
302317
}
@@ -322,6 +337,7 @@ public IndexModule(
322337
allowExpensiveQueries,
323338
expressionResolver,
324339
recoveryStateFactories,
340+
Collections.emptyMap(),
325341
null,
326342
null
327343
);
@@ -759,6 +775,7 @@ public IndexService newIndexService(
759775
directoryFactory,
760776
compositeDirectoryFactory,
761777
remoteDirectoryFactory,
778+
resolveStoreFactory(indexSettings, storeFactories),
762779
eventListener,
763780
readerWrapperFactory,
764781
mapperRegistry,
@@ -858,6 +875,21 @@ private static IndexStorePlugin.RecoveryStateFactory getRecoveryStateFactory(
858875
return factory;
859876
}
860877

878+
private static IndexStorePlugin.StoreFactory resolveStoreFactory(
879+
final IndexSettings indexSettings,
880+
final Map<String, IndexStorePlugin.StoreFactory> storeFactories
881+
) {
882+
final String key = indexSettings.getValue(INDEX_STORE_FACTORY_SETTING);
883+
if (key == null || key.isEmpty()) {
884+
return Store::new;
885+
}
886+
final IndexStorePlugin.StoreFactory factory = storeFactories.get(key);
887+
if (factory == null) {
888+
throw new IllegalArgumentException("Unknown store factory [" + key + "]");
889+
}
890+
return factory;
891+
}
892+
861893
/**
862894
* creates a new mapper service to do administrative work like mapping updates. This *should not* be used for document parsing.
863895
* doing so will result in an exception.

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
206206
private final Object refreshMutex = new Object();
207207
private volatile TimeValue refreshInterval;
208208
private volatile boolean shardLevelRefreshEnabled;
209+
private final IndexStorePlugin.StoreFactory storeFactory;
209210

210211
@InternalApi
211212
public IndexService(
@@ -228,6 +229,7 @@ public IndexService(
228229
IndexStorePlugin.DirectoryFactory directoryFactory,
229230
IndexStorePlugin.CompositeDirectoryFactory compositeDirectoryFactory,
230231
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
232+
IndexStorePlugin.StoreFactory storeFactory,
231233
IndexEventListener eventListener,
232234
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> wrapperFactory,
233235
MapperRegistry mapperRegistry,
@@ -253,6 +255,7 @@ public IndexService(
253255
Supplier<Integer> clusterDefaultMaxMergeAtOnceSupplier
254256
) {
255257
super(indexSettings);
258+
this.storeFactory = storeFactory;
256259
this.allowExpensiveQueries = allowExpensiveQueries;
257260
this.indexSettings = indexSettings;
258261
this.xContentRegistry = xContentRegistry;
@@ -378,6 +381,7 @@ public IndexService(
378381
QueryCache queryCache,
379382
IndexStorePlugin.DirectoryFactory directoryFactory,
380383
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
384+
IndexStorePlugin.StoreFactory storeFactory,
381385
IndexEventListener eventListener,
382386
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> wrapperFactory,
383387
MapperRegistry mapperRegistry,
@@ -418,6 +422,7 @@ public IndexService(
418422
directoryFactory,
419423
null,
420424
remoteDirectoryFactory,
425+
storeFactory,
421426
eventListener,
422427
wrapperFactory,
423428
mapperRegistry,
@@ -742,7 +747,7 @@ protected void closeInternal() {
742747
} else {
743748
directory = directoryFactory.newDirectory(this.indexSettings, path);
744749
}
745-
store = new Store(
750+
store = storeFactory.newStore(
746751
shardId,
747752
this.indexSettings,
748753
directory,

server/src/main/java/org/opensearch/index/store/Store.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1080,21 +1080,22 @@ public long getNumDocs() {
10801080
/**
10811081
* Metadata that is currently loaded
10821082
*
1083-
* @opensearch.internal
1083+
* @opensearch.api
10841084
*/
1085-
static class LoadedMetadata {
1086-
final Map<String, StoreFileMetadata> fileMetadata;
1087-
final Map<String, String> userData;
1088-
final long numDocs;
1085+
@PublicApi(since = "3.2.0")
1086+
public static class LoadedMetadata {
1087+
public final Map<String, StoreFileMetadata> fileMetadata;
1088+
public final Map<String, String> userData;
1089+
public final long numDocs;
10891090

1090-
LoadedMetadata(Map<String, StoreFileMetadata> fileMetadata, Map<String, String> userData, long numDocs) {
1091+
public LoadedMetadata(Map<String, StoreFileMetadata> fileMetadata, Map<String, String> userData, long numDocs) {
10911092
this.fileMetadata = fileMetadata;
10921093
this.userData = userData;
10931094
this.numDocs = numDocs;
10941095
}
10951096
}
10961097

1097-
static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logger logger) throws IOException {
1098+
public static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logger logger) throws IOException {
10981099
try {
10991100
final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory);
11001101
return loadMetadata(segmentCommitInfos, directory, logger);
@@ -1125,11 +1126,11 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
11251126
}
11261127
}
11271128

1128-
static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException {
1129+
public static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException {
11291130
return loadMetadata(segmentInfos, directory, logger, false);
11301131
}
11311132

1132-
static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger, boolean ignoreSegmentsFile)
1133+
public static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger, boolean ignoreSegmentsFile)
11331134
throws IOException {
11341135
long numDocs = Lucene.getNumDocs(segmentInfos);
11351136
Map<String, String> commitUserDataBuilder = new HashMap<>();
@@ -1732,8 +1733,9 @@ public void markStoreCorrupted(IOException exception) throws IOException {
17321733
/**
17331734
* A listener that is executed once the store is closed and all references to it are released
17341735
*
1735-
* @opensearch.internal
1736+
* @opensearch.api
17361737
*/
1738+
@PublicApi(since = "3.2.0")
17371739
public interface OnClose extends Consumer<ShardLock> {
17381740
OnClose EMPTY = new OnClose() {
17391741
/**

server/src/main/java/org/opensearch/indices/IndicesService.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,7 @@ public class IndicesService extends AbstractLifecycleComponent
392392
private final Map<String, IndexStorePlugin.CompositeDirectoryFactory> compositeDirectoryFactories;
393393
private final Map<String, IngestionConsumerFactory> ingestionConsumerFactories;
394394
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
395+
private final Map<String, IndexStorePlugin.StoreFactory> storeFactories;
395396
final AbstractRefCounted indicesRefCount; // pkg-private for testing
396397
private final CountDownLatch closeLatch = new CountDownLatch(1);
397398
private volatile boolean idFieldDataEnabled;
@@ -445,6 +446,7 @@ public IndicesService(
445446
Map<String, IndexStorePlugin.CompositeDirectoryFactory> compositeDirectoryFactories,
446447
ValuesSourceRegistry valuesSourceRegistry,
447448
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
449+
Map<String, IndexStorePlugin.StoreFactory> storeFactories,
448450
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
449451
Supplier<RepositoriesService> repositoriesServiceSupplier,
450452
SearchRequestStats searchRequestStats,
@@ -509,6 +511,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
509511
this.directoryFactories = directoryFactories;
510512
this.compositeDirectoryFactories = compositeDirectoryFactories;
511513
this.recoveryStateFactories = recoveryStateFactories;
514+
this.storeFactories = storeFactories;
512515
this.ingestionConsumerFactories = ingestionConsumerFactories;
513516
// doClose() is called when shutting down a node, yet there might still be ongoing requests
514517
// that we need to wait for before closing some resources such as the caches. In order to
@@ -643,6 +646,7 @@ public IndicesService(
643646
Collections.emptyMap(),
644647
valuesSourceRegistry,
645648
recoveryStateFactories,
649+
Collections.emptyMap(),
646650
remoteDirectoryFactory,
647651
repositoriesServiceSupplier,
648652
searchRequestStats,
@@ -1060,6 +1064,7 @@ private synchronized IndexService createIndexService(
10601064
() -> allowExpensiveQueries,
10611065
indexNameExpressionResolver,
10621066
recoveryStateFactories,
1067+
storeFactories,
10631068
fileCache,
10641069
compositeIndexSettings
10651070
);
@@ -1179,6 +1184,7 @@ public synchronized MapperService createIndexMapperService(IndexMetadata indexMe
11791184
() -> allowExpensiveQueries,
11801185
indexNameExpressionResolver,
11811186
recoveryStateFactories,
1187+
storeFactories,
11821188
fileCache,
11831189
compositeIndexSettings
11841190
);

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -938,6 +938,12 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
938938
.flatMap(m -> m.entrySet().stream())
939939
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
940940

941+
final Map<String, IndexStorePlugin.StoreFactory> storeFactories = pluginsService.filterPlugins(IndexStorePlugin.class)
942+
.stream()
943+
.map(IndexStorePlugin::getStoreFactories)
944+
.flatMap(m -> m.entrySet().stream())
945+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
946+
941947
final RerouteService rerouteService = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
942948
rerouteServiceReference.set(rerouteService);
943949
clusterService.setRerouteService(rerouteService);
@@ -991,6 +997,7 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
991997
Map.copyOf(compositeDirectoryFactories),
992998
searchModule.getValuesSourceRegistry(),
993999
recoveryStateFactories,
1000+
storeFactories,
9941001
remoteDirectoryFactory,
9951002
repositoriesServiceReference::get,
9961003
searchRequestStats,

server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,12 @@
3838
import org.opensearch.common.Nullable;
3939
import org.opensearch.common.annotation.ExperimentalApi;
4040
import org.opensearch.common.annotation.PublicApi;
41+
import org.opensearch.core.index.shard.ShardId;
42+
import org.opensearch.env.ShardLock;
4143
import org.opensearch.index.IndexSettings;
4244
import org.opensearch.index.shard.ShardPath;
4345
import org.opensearch.index.store.IndexStoreListener;
46+
import org.opensearch.index.store.Store;
4447
import org.opensearch.index.store.remote.filecache.FileCache;
4548
import org.opensearch.indices.recovery.RecoveryState;
4649
import org.opensearch.threadpool.ThreadPool;
@@ -83,7 +86,9 @@ interface DirectoryFactory {
8386
*
8487
* @return a map from store type to an directory factory
8588
*/
86-
Map<String, DirectoryFactory> getDirectoryFactories();
89+
default Map<String, DirectoryFactory> getDirectoryFactories() {
90+
return Collections.emptyMap();
91+
}
8792

8893
/**
8994
* An interface that describes how to create a new composite directory instance per shard.
@@ -153,4 +158,43 @@ default Map<String, RecoveryStateFactory> getRecoveryStateFactories() {
153158
default Optional<IndexStoreListener> getIndexStoreListener() {
154159
return Optional.empty();
155160
}
161+
162+
/**
163+
* An interface that describes how to create a new Store instance per shard.
164+
*
165+
* @opensearch.api
166+
*/
167+
@FunctionalInterface
168+
@ExperimentalApi
169+
interface StoreFactory {
170+
/**
171+
* Creates a new Store per shard. This method is called once per shard on shard creation.
172+
* @param shardId the shard id
173+
* @param indexSettings the shard's index settings
174+
* @param directory the Lucene directory selected for this shard
175+
* @param shardLock the shard lock to associate with the store
176+
* @param onClose listener invoked on store close
177+
* @param shardPath the shard path
178+
* @return a new Store instance
179+
*/
180+
Store newStore(
181+
ShardId shardId,
182+
IndexSettings indexSettings,
183+
Directory directory,
184+
ShardLock shardLock,
185+
Store.OnClose onClose,
186+
ShardPath shardPath
187+
) throws IOException;
188+
}
189+
190+
/**
191+
* The {@link StoreFactory} mappings for this plugin. When an index is created a custom store factory can be selected via
192+
* {@code index.store.factory}. If not set, the default store is used.
193+
*
194+
* @return a map from store type key to a store factory
195+
*/
196+
@ExperimentalApi
197+
default Map<String, StoreFactory> getStoreFactories() {
198+
return Collections.emptyMap();
199+
}
156200
}

0 commit comments

Comments
 (0)