Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING,
IndexModule.INDEX_STORE_TYPE_SETTING,
IndexModule.INDEX_COMPOSITE_STORE_TYPE_SETTING,
IndexModule.INDEX_STORE_FACTORY_SETTING,
IndexModule.INDEX_STORE_PRE_LOAD_SETTING,
IndexModule.INDEX_STORE_HYBRID_NIO_EXTENSIONS,
IndexModule.INDEX_RECOVERY_TYPE_SETTING,
Expand Down
32 changes: 32 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.DefaultCompositeDirectoryFactory;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.translog.TranslogFactory;
Expand Down Expand Up @@ -156,6 +157,17 @@ public final class IndexModule {
Property.NodeScope
);

/**
* Index setting that selects a custom StoreFactory provided by plugins. If empty, the default store is used
*/
public static final Setting<String> INDEX_STORE_FACTORY_SETTING = new Setting<>(
"index.store.factory",
"",
Function.identity(),
Property.IndexScope,
Property.NodeScope
);

/**
* Index setting which used to determine how the data is cached locally fully or partially.
*/
Expand Down Expand Up @@ -260,6 +272,7 @@ public final class IndexModule {
private final AtomicBoolean frozen = new AtomicBoolean(false);
private final BooleanSupplier allowExpensiveQueries;
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
private final Map<String, IndexStorePlugin.StoreFactory> storeFactories;
private final FileCache fileCache;
private final CompositeIndexSettings compositeIndexSettings;

Expand All @@ -283,6 +296,7 @@ public IndexModule(
final BooleanSupplier allowExpensiveQueries,
final IndexNameExpressionResolver expressionResolver,
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
final Map<String, IndexStorePlugin.StoreFactory> storeFactories,
final FileCache fileCache,
final CompositeIndexSettings compositeIndexSettings
) {
Expand All @@ -297,6 +311,7 @@ public IndexModule(
this.allowExpensiveQueries = allowExpensiveQueries;
this.expressionResolver = expressionResolver;
this.recoveryStateFactories = recoveryStateFactories;
this.storeFactories = storeFactories == null ? Collections.emptyMap() : Collections.unmodifiableMap(storeFactories);
this.fileCache = fileCache;
this.compositeIndexSettings = compositeIndexSettings;
}
Expand All @@ -322,6 +337,7 @@ public IndexModule(
allowExpensiveQueries,
expressionResolver,
recoveryStateFactories,
Collections.emptyMap(),
null,
null
);
Expand Down Expand Up @@ -759,6 +775,7 @@ public IndexService newIndexService(
directoryFactory,
compositeDirectoryFactory,
remoteDirectoryFactory,
resolveStoreFactory(indexSettings, storeFactories),
eventListener,
readerWrapperFactory,
mapperRegistry,
Expand Down Expand Up @@ -858,6 +875,21 @@ private static IndexStorePlugin.RecoveryStateFactory getRecoveryStateFactory(
return factory;
}

private static IndexStorePlugin.StoreFactory resolveStoreFactory(
final IndexSettings indexSettings,
final Map<String, IndexStorePlugin.StoreFactory> storeFactories
) {
final String key = indexSettings.getValue(INDEX_STORE_FACTORY_SETTING);
if (key != null && key.isEmpty() == false) {
final IndexStorePlugin.StoreFactory factory = storeFactories.get(key);
if (factory == null) {
throw new IllegalArgumentException("Unknown store factory [" + key + "]");
}
return factory;
}
return Store::new;
}

/**
* creates a new mapper service to do administrative work like mapping updates. This *should not* be used for document parsing.
* doing so will result in an exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final Object refreshMutex = new Object();
private volatile TimeValue refreshInterval;
private volatile boolean shardLevelRefreshEnabled;
private final IndexStorePlugin.StoreFactory storeFactory;

@InternalApi
public IndexService(
Expand All @@ -228,6 +229,7 @@ public IndexService(
IndexStorePlugin.DirectoryFactory directoryFactory,
IndexStorePlugin.CompositeDirectoryFactory compositeDirectoryFactory,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
IndexStorePlugin.StoreFactory storeFactory,
IndexEventListener eventListener,
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> wrapperFactory,
MapperRegistry mapperRegistry,
Expand All @@ -253,6 +255,7 @@ public IndexService(
Supplier<Integer> clusterDefaultMaxMergeAtOnceSupplier
) {
super(indexSettings);
this.storeFactory = storeFactory;
this.allowExpensiveQueries = allowExpensiveQueries;
this.indexSettings = indexSettings;
this.xContentRegistry = xContentRegistry;
Expand Down Expand Up @@ -378,6 +381,7 @@ public IndexService(
QueryCache queryCache,
IndexStorePlugin.DirectoryFactory directoryFactory,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
IndexStorePlugin.StoreFactory storeFactory,
IndexEventListener eventListener,
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> wrapperFactory,
MapperRegistry mapperRegistry,
Expand Down Expand Up @@ -418,6 +422,7 @@ public IndexService(
directoryFactory,
null,
remoteDirectoryFactory,
storeFactory,
eventListener,
wrapperFactory,
mapperRegistry,
Expand Down Expand Up @@ -742,7 +747,7 @@ protected void closeInternal() {
} else {
directory = directoryFactory.newDirectory(this.indexSettings, path);
}
store = new Store(
store = storeFactory.newStore(
shardId,
this.indexSettings,
directory,
Expand Down
22 changes: 12 additions & 10 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -1080,21 +1080,22 @@ public long getNumDocs() {
/**
* Metadata that is currently loaded
*
* @opensearch.internal
* @opensearch.api
*/
static class LoadedMetadata {
final Map<String, StoreFileMetadata> fileMetadata;
final Map<String, String> userData;
final long numDocs;
@PublicApi(since = "3.1.0")
public static class LoadedMetadata {
public final Map<String, StoreFileMetadata> fileMetadata;
public final Map<String, String> userData;
public final long numDocs;

LoadedMetadata(Map<String, StoreFileMetadata> fileMetadata, Map<String, String> userData, long numDocs) {
public LoadedMetadata(Map<String, StoreFileMetadata> fileMetadata, Map<String, String> userData, long numDocs) {
this.fileMetadata = fileMetadata;
this.userData = userData;
this.numDocs = numDocs;
}
}

static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logger logger) throws IOException {
public static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logger logger) throws IOException {
try {
final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory);
return loadMetadata(segmentCommitInfos, directory, logger);
Expand Down Expand Up @@ -1125,11 +1126,11 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
}
}

static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException {
public static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException {
return loadMetadata(segmentInfos, directory, logger, false);
}

static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger, boolean ignoreSegmentsFile)
public static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger, boolean ignoreSegmentsFile)
throws IOException {
long numDocs = Lucene.getNumDocs(segmentInfos);
Map<String, String> commitUserDataBuilder = new HashMap<>();
Expand Down Expand Up @@ -1732,8 +1733,9 @@ public void markStoreCorrupted(IOException exception) throws IOException {
/**
* A listener that is executed once the store is closed and all references to it are released
*
* @opensearch.internal
* @opensearch.api
*/
@PublicApi(since = "3.1.0")
public interface OnClose extends Consumer<ShardLock> {
OnClose EMPTY = new OnClose() {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final Map<String, IndexStorePlugin.CompositeDirectoryFactory> compositeDirectoryFactories;
private final Map<String, IngestionConsumerFactory> ingestionConsumerFactories;
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
private final Map<String, IndexStorePlugin.StoreFactory> storeFactories;
final AbstractRefCounted indicesRefCount; // pkg-private for testing
private final CountDownLatch closeLatch = new CountDownLatch(1);
private volatile boolean idFieldDataEnabled;
Expand Down Expand Up @@ -445,6 +446,7 @@ public IndicesService(
Map<String, IndexStorePlugin.CompositeDirectoryFactory> compositeDirectoryFactories,
ValuesSourceRegistry valuesSourceRegistry,
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
Map<String, IndexStorePlugin.StoreFactory> storeFactories,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier,
SearchRequestStats searchRequestStats,
Expand Down Expand Up @@ -509,6 +511,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
this.directoryFactories = directoryFactories;
this.compositeDirectoryFactories = compositeDirectoryFactories;
this.recoveryStateFactories = recoveryStateFactories;
this.storeFactories = storeFactories;
this.ingestionConsumerFactories = ingestionConsumerFactories;
// doClose() is called when shutting down a node, yet there might still be ongoing requests
// that we need to wait for before closing some resources such as the caches. In order to
Expand Down Expand Up @@ -643,6 +646,7 @@ public IndicesService(
Collections.emptyMap(),
valuesSourceRegistry,
recoveryStateFactories,
Collections.emptyMap(),
remoteDirectoryFactory,
repositoriesServiceSupplier,
searchRequestStats,
Expand Down Expand Up @@ -1060,6 +1064,7 @@ private synchronized IndexService createIndexService(
() -> allowExpensiveQueries,
indexNameExpressionResolver,
recoveryStateFactories,
storeFactories,
fileCache,
compositeIndexSettings
);
Expand Down Expand Up @@ -1179,6 +1184,7 @@ public synchronized MapperService createIndexMapperService(IndexMetadata indexMe
() -> allowExpensiveQueries,
indexNameExpressionResolver,
recoveryStateFactories,
storeFactories,
fileCache,
compositeIndexSettings
);
Expand Down
7 changes: 7 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,12 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
.flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

final Map<String, IndexStorePlugin.StoreFactory> storeFactories = pluginsService.filterPlugins(IndexStorePlugin.class)
.stream()
.map(IndexStorePlugin::getStoreFactories)
.flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

final RerouteService rerouteService = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
rerouteServiceReference.set(rerouteService);
clusterService.setRerouteService(rerouteService);
Expand Down Expand Up @@ -991,6 +997,7 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
Map.copyOf(compositeDirectoryFactories),
searchModule.getValuesSourceRegistry(),
recoveryStateFactories,
storeFactories,
remoteDirectoryFactory,
repositoriesServiceReference::get,
searchRequestStats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.ShardLock;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.IndexStoreListener;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -83,7 +86,9 @@ interface DirectoryFactory {
*
* @return a map from store type to an directory factory
*/
Map<String, DirectoryFactory> getDirectoryFactories();
default Map<String, DirectoryFactory> getDirectoryFactories() {
return Collections.emptyMap();
}

/**
* An interface that describes how to create a new composite directory instance per shard.
Expand Down Expand Up @@ -153,4 +158,43 @@ default Map<String, RecoveryStateFactory> getRecoveryStateFactories() {
default Optional<IndexStoreListener> getIndexStoreListener() {
return Optional.empty();
}

/**
* An interface that describes how to create a new Store instance per shard.
*
* @opensearch.api
*/
@FunctionalInterface
@ExperimentalApi
interface StoreFactory {
/**
* Creates a new Store per shard. This method is called once per shard on shard creation.
* @param shardId the shard id
* @param indexSettings the shard's index settings
* @param directory the Lucene directory selected for this shard
* @param shardLock the shard lock to associate with the store
* @param onClose listener invoked on store close
* @param shardPath the shard path
* @return a new Store instance
*/
Store newStore(
ShardId shardId,
IndexSettings indexSettings,
Directory directory,
ShardLock shardLock,
Store.OnClose onClose,
ShardPath shardPath
) throws IOException;
}

/**
* The {@link StoreFactory} mappings for this plugin. When an index is created a custom store factory can be selected via
* {@code index.store.factory}. If not set, the default store is used.
*
* @return a map from store type key to a store factory
*/
@ExperimentalApi
default Map<String, StoreFactory> getStoreFactories() {
return Collections.emptyMap();
}
}
Loading
Loading