Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump OpenSearch Core main branch to 3.0.0 ([#18039](https://github.com/opensearch-project/OpenSearch/pull/18039))
- Update API of Message in index to add the timestamp for lag calculation in ingestion polling ([#17977](https://github.com/opensearch-project/OpenSearch/pull/17977/))
- Enabled default throttling for all tasks submitted to cluster manager ([#17711](https://github.com/opensearch-project/OpenSearch/pull/17711))
- Add composite directory factory ([#17988](https://github.com/opensearch-project/OpenSearch/pull/17988))

### Changed
- Change the default max header size from 8KB to 16KB. ([#18024](https://github.com/opensearch-project/OpenSearch/pull/18024))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING,
BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING,
IndexModule.INDEX_STORE_TYPE_SETTING,
IndexModule.INDEX_COMPOSITE_STORE_TYPE_SETTING,
IndexModule.INDEX_STORE_PRE_LOAD_SETTING,
IndexModule.INDEX_STORE_HYBRID_NIO_EXTENSIONS,
IndexModule.INDEX_RECOVERY_TYPE_SETTING,
Expand Down
37 changes: 37 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.DefaultCompositeDirectoryFactory;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
Expand Down Expand Up @@ -133,6 +134,8 @@
public static final Setting<Boolean> NODE_STORE_ALLOW_MMAP = Setting.boolSetting("node.store.allow_mmap", true, Property.NodeScope);

private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory();
private static final IndexStorePlugin.CompositeDirectoryFactory DEFAULT_COMPOSITE_DIRECTORY_FACTORY =
new DefaultCompositeDirectoryFactory();

private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new;

Expand All @@ -144,6 +147,14 @@
Property.NodeScope
);

public static final Setting<String> INDEX_COMPOSITE_STORE_TYPE_SETTING = new Setting<>(
"index.composite_store.type",
"default",
Function.identity(),
Property.IndexScope,
Property.NodeScope
);

/**
* Index setting which used to determine how the data is cached locally fully or partially.
*/
Expand Down Expand Up @@ -240,6 +251,7 @@
private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
private final Map<String, TriFunction<Settings, Version, ScriptService, Similarity>> similarities = new HashMap<>();
private final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories;
private final Map<String, IndexStorePlugin.CompositeDirectoryFactory> compositeDirectoryFactories;
private final SetOnce<BiFunction<IndexSettings, IndicesQueryCache, QueryCache>> forceQueryCacheProvider = new SetOnce<>();
private final List<SearchOperationListener> searchOperationListeners = new ArrayList<>();
private final List<IndexingOperationListener> indexOperationListeners = new ArrayList<>();
Expand All @@ -265,6 +277,7 @@
final EngineFactory engineFactory,
final EngineConfigFactory engineConfigFactory,
final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
final Map<String, IndexStorePlugin.CompositeDirectoryFactory> compositeDirectoryFactories,
final BooleanSupplier allowExpensiveQueries,
final IndexNameExpressionResolver expressionResolver,
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
Expand All @@ -278,6 +291,7 @@
this.searchOperationListeners.add(new SearchSlowLog(indexSettings));
this.indexOperationListeners.add(new IndexingSlowLog(indexSettings));
this.directoryFactories = Collections.unmodifiableMap(directoryFactories);
this.compositeDirectoryFactories = Collections.unmodifiableMap(compositeDirectoryFactories);
this.allowExpensiveQueries = allowExpensiveQueries;
this.expressionResolver = expressionResolver;
this.recoveryStateFactories = recoveryStateFactories;
Expand All @@ -301,6 +315,7 @@
engineFactory,
engineConfigFactory,
directoryFactories,
Collections.emptyMap(),
allowExpensiveQueries,
expressionResolver,
recoveryStateFactories,
Expand Down Expand Up @@ -699,6 +714,10 @@
.get() == null ? (shard) -> null : indexReaderWrapper.get();
eventListener.beforeIndexCreated(indexSettings.getIndex(), indexSettings.getSettings());
final IndexStorePlugin.DirectoryFactory directoryFactory = getDirectoryFactory(indexSettings, directoryFactories);
final IndexStorePlugin.CompositeDirectoryFactory compositeDirectoryFactory = getCompositeDirectoryFactory(
indexSettings,
compositeDirectoryFactories
);
final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory = getRecoveryStateFactory(indexSettings, recoveryStateFactories);
QueryCache queryCache = null;
IndexAnalyzers indexAnalyzers = null;
Expand Down Expand Up @@ -735,6 +754,7 @@
client,
queryCache,
directoryFactory,
compositeDirectoryFactory,
remoteDirectoryFactory,
eventListener,
readerWrapperFactory,
Expand Down Expand Up @@ -800,6 +820,23 @@
return factory;
}

private static IndexStorePlugin.CompositeDirectoryFactory getCompositeDirectoryFactory(
final IndexSettings indexSettings,
final Map<String, IndexStorePlugin.CompositeDirectoryFactory> compositeDirectoryFactories
) {
final String compositeStoreType = indexSettings.getValue(INDEX_COMPOSITE_STORE_TYPE_SETTING);
final IndexStorePlugin.CompositeDirectoryFactory factory;
if (compositeStoreType.isEmpty() || compositeDirectoryFactories.isEmpty()) {
factory = DEFAULT_COMPOSITE_DIRECTORY_FACTORY;
} else {
factory = compositeDirectoryFactories.get(compositeStoreType);
if (factory == null) {
throw new IllegalArgumentException("Unknown composite store type [" + compositeStoreType + "]");

Check warning on line 834 in server/src/main/java/org/opensearch/index/IndexModule.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexModule.java#L834

Added line #L834 was not covered by tests
}
}
return factory;
}

private static IndexStorePlugin.RecoveryStateFactory getRecoveryStateFactory(
final IndexSettings indexSettings,
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories
Expand Down
8 changes: 5 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@
import org.opensearch.index.shard.ShardNotInPrimaryModeException;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.CompositeDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.remote.filecache.FileCache;
Expand Down Expand Up @@ -153,6 +152,7 @@
private final NodeEnvironment nodeEnv;
private final ShardStoreDeleter shardStoreDeleter;
private final IndexStorePlugin.DirectoryFactory directoryFactory;
private final IndexStorePlugin.CompositeDirectoryFactory compositeDirectoryFactory;
private final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory;
private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory;
private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper;
Expand Down Expand Up @@ -221,6 +221,7 @@
Client client,
QueryCache queryCache,
IndexStorePlugin.DirectoryFactory directoryFactory,
IndexStorePlugin.CompositeDirectoryFactory compositeDirectoryFactory,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
IndexEventListener eventListener,
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> wrapperFactory,
Expand Down Expand Up @@ -305,6 +306,7 @@
this.eventListener = eventListener;
this.nodeEnv = nodeEnv;
this.directoryFactory = directoryFactory;
this.compositeDirectoryFactory = compositeDirectoryFactory;
this.remoteDirectoryFactory = remoteDirectoryFactory;
this.recoveryStateFactory = recoveryStateFactory;
this.engineFactory = Objects.requireNonNull(engineFactory);
Expand Down Expand Up @@ -399,6 +401,7 @@
client,
queryCache,
directoryFactory,
null,
remoteDirectoryFactory,
eventListener,
wrapperFactory,
Expand Down Expand Up @@ -676,8 +679,7 @@
if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_SETTING) &&
// TODO : Need to remove this check after support for hot indices is added in Composite Directory
this.indexSettings.isWarmIndex()) {
Directory localDirectory = directoryFactory.newDirectory(this.indexSettings, path);
directory = new CompositeDirectory(localDirectory, remoteDirectory, fileCache);
directory = compositeDirectoryFactory.newDirectory(this.indexSettings, path, directoryFactory, remoteDirectory, fileCache);

Check warning on line 682 in server/src/main/java/org/opensearch/index/IndexService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexService.java#L682

Added line #L682 was not covered by tests
} else {
directory = directoryFactory.newDirectory(this.indexSettings, path);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@
@ExperimentalApi
public class CompositeDirectory extends FilterDirectory {
private static final Logger logger = LogManager.getLogger(CompositeDirectory.class);
private final FSDirectory localDirectory;
private final RemoteSegmentStoreDirectory remoteDirectory;
private final FileCache fileCache;
private final TransferManager transferManager;
protected final FSDirectory localDirectory;
protected final RemoteSegmentStoreDirectory remoteDirectory;
protected final FileCache fileCache;
protected final TransferManager transferManager;

/**
* Constructor to initialise the composite directory
Expand Down Expand Up @@ -96,7 +96,7 @@ private String[] listLocalFiles() throws IOException {
* @return A list of file names, including the original file (if present) and all its block files.
* @throws IOException in case of I/O error while listing files.
*/
private List<String> listBlockFiles(String fileName) throws IOException {
protected List<String> listBlockFiles(String fileName) throws IOException {
return Stream.of(listLocalFiles())
.filter(file -> file.equals(fileName) || file.startsWith(fileName + FileTypeUtils.BLOCK_FILE_IDENTIFIER))
.collect(Collectors.toList());
Expand Down Expand Up @@ -383,7 +383,7 @@ private String[] getRemoteFiles() throws IOException {
return remoteFiles;
}

private void cacheFile(String name) throws IOException {
protected void cacheFile(String name) throws IOException {
Path filePath = getFilePath(name);
// put will increase the refCount for the path, making sure it is not evicted, will decrease the ref after it is uploaded to Remote
// so that it can be evicted after that
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.Directory;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.plugins.IndexStorePlugin;

import java.io.IOException;

/**
* Default composite directory factory
*/
public class DefaultCompositeDirectoryFactory implements IndexStorePlugin.CompositeDirectoryFactory {

private static final Logger logger = LogManager.getLogger(DefaultCompositeDirectoryFactory.class);

@Override
public Directory newDirectory(
IndexSettings indexSettings,
ShardPath shardPath,
IndexStorePlugin.DirectoryFactory localDirectoryFactory,
Directory remoteDirectory,
FileCache fileCache
) throws IOException {
logger.trace("Creating composite directory from core - Default CompositeDirectoryFactory");
Directory localDirectory = localDirectoryFactory.newDirectory(indexSettings, shardPath);
return new CompositeDirectory(localDirectory, remoteDirectory, fileCache);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final MetaStateService metaStateService;
private final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders;
private final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories;
private final Map<String, IndexStorePlugin.CompositeDirectoryFactory> compositeDirectoryFactories;
private final Map<String, IngestionConsumerFactory> ingestionConsumerFactories;
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
final AbstractRefCounted indicesRefCount; // pkg-private for testing
Expand Down Expand Up @@ -438,6 +439,7 @@ public IndicesService(
MetaStateService metaStateService,
Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders,
Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
Map<String, IndexStorePlugin.CompositeDirectoryFactory> compositeDirectoryFactories,
ValuesSourceRegistry valuesSourceRegistry,
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
Expand Down Expand Up @@ -502,6 +504,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
this.engineFactoryProviders = engineFactoryProviders;

this.directoryFactories = directoryFactories;
this.compositeDirectoryFactories = compositeDirectoryFactories;
this.recoveryStateFactories = recoveryStateFactories;
this.ingestionConsumerFactories = ingestionConsumerFactories;
// doClose() is called when shutting down a node, yet there might still be ongoing requests
Expand Down Expand Up @@ -633,6 +636,7 @@ public IndicesService(
metaStateService,
engineFactoryProviders,
directoryFactories,
Collections.emptyMap(),
valuesSourceRegistry,
recoveryStateFactories,
remoteDirectoryFactory,
Expand Down Expand Up @@ -1048,6 +1052,7 @@ private synchronized IndexService createIndexService(
getEngineFactory(idxSettings),
getEngineConfigFactory(idxSettings),
directoryFactories,
compositeDirectoryFactories,
() -> allowExpensiveQueries,
indexNameExpressionResolver,
recoveryStateFactories,
Expand Down Expand Up @@ -1166,6 +1171,7 @@ public synchronized MapperService createIndexMapperService(IndexMetadata indexMe
getEngineFactory(idxSettings),
getEngineConfigFactory(idxSettings),
directoryFactories,
compositeDirectoryFactories,
() -> allowExpensiveQueries,
indexNameExpressionResolver,
recoveryStateFactories,
Expand Down
18 changes: 18 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
import org.opensearch.index.recovery.RemoteStoreRestoreService;
import org.opensearch.index.remote.RemoteIndexPathUploader;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.store.DefaultCompositeDirectoryFactory;
import org.opensearch.index.store.IndexStoreListener;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
Expand Down Expand Up @@ -894,6 +895,22 @@
});
directoryFactories.putAll(builtInDirectoryFactories);

final Map<String, IndexStorePlugin.CompositeDirectoryFactory> compositeDirectoryFactories = new HashMap<>();
pluginsService.filterPlugins(IndexStorePlugin.class)
.stream()
.map(IndexStorePlugin::getCompositeDirectoryFactories)
.flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
.forEach((k, v) -> {
if (k.equals("default")) {
throw new IllegalStateException(

Check warning on line 906 in server/src/main/java/org/opensearch/node/Node.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/node/Node.java#L906

Added line #L906 was not covered by tests
"registered composite index store type [" + k + "] conflicts with a built-in default type"
);
}
compositeDirectoryFactories.put(k, v);
});

Check warning on line 911 in server/src/main/java/org/opensearch/node/Node.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/node/Node.java#L910-L911

Added lines #L910 - L911 were not covered by tests
compositeDirectoryFactories.put("default", new DefaultCompositeDirectoryFactory());

final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories = pluginsService.filterPlugins(
IndexStorePlugin.class
)
Expand Down Expand Up @@ -952,6 +969,7 @@
metaStateService,
engineFactoryProviders,
Map.copyOf(directoryFactories),
Map.copyOf(compositeDirectoryFactories),
searchModule.getValuesSourceRegistry(),
recoveryStateFactories,
remoteDirectoryFactory,
Expand Down
Loading
Loading