Skip to content

Commit 4553942

Browse files
committed
Add composite directory factory (opensearch-project#17988)
Signed-off-by: Shreyansh Ray <[email protected]>
1 parent bc68a12 commit 4553942

File tree

10 files changed

+237
-9
lines changed

10 files changed

+237
-9
lines changed

CHANGELOG.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,25 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55

66
## [Unreleased 2.x]
77
### Added
8+
<<<<<<< HEAD
89
- Renaming the node role search to warm ([#17573](https://github.com/opensearch-project/OpenSearch/pull/17573))
10+
=======
11+
- [Rule based auto-tagging] Add get rule API ([#17336](https://github.com/opensearch-project/OpenSearch/pull/17336))
12+
- Add multi-threaded writer support in pull-based ingestion ([#17912](https://github.com/opensearch-project/OpenSearch/pull/17912))
13+
- Unset discovery nodes for every transport node actions request ([#17682](https://github.com/opensearch-project/OpenSearch/pull/17682))
14+
- Implement parallel shard refresh behind cluster settings ([#17782](https://github.com/opensearch-project/OpenSearch/pull/17782))
15+
- Bump OpenSearch Core main branch to 3.0.0 ([#18039](https://github.com/opensearch-project/OpenSearch/pull/18039))
16+
- Update API of Message in index to add the timestamp for lag calculation in ingestion polling ([#17977](https://github.com/opensearch-project/OpenSearch/pull/17977/))
17+
- Enabled default throttling for all tasks submitted to cluster manager ([#17711](https://github.com/opensearch-project/OpenSearch/pull/17711))
18+
- Add composite directory factory ([#17988](https://github.com/opensearch-project/OpenSearch/pull/17988))
19+
20+
### Changed
21+
- Change the default max header size from 8KB to 16KB. ([#18024](https://github.com/opensearch-project/OpenSearch/pull/18024))
22+
- Avoid invalid retries in multiple replicas when querying [#17370](https://github.com/opensearch-project/OpenSearch/pull/17370)
23+
* Enable concurrent_segment_search auto mode by default[#17978](https://github.com/opensearch-project/OpenSearch/pull/17978)
24+
- Skip approximation when `track_total_hits` is set to `true` [#18017](https://github.com/opensearch-project/OpenSearch/pull/18017)
25+
26+
>>>>>>> d744b40ef78 (Add composite directory factory (#17988))
927
### Dependencies
1028
- Bump `netty` from 4.1.118.Final to 4.1.121.Final ([#18192](https://github.com/opensearch-project/OpenSearch/pull/18192))
1129

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
191191
MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING,
192192
BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING,
193193
IndexModule.INDEX_STORE_TYPE_SETTING,
194+
IndexModule.INDEX_COMPOSITE_STORE_TYPE_SETTING,
194195
IndexModule.INDEX_STORE_PRE_LOAD_SETTING,
195196
IndexModule.INDEX_STORE_HYBRID_MMAP_EXTENSIONS,
196197
IndexModule.INDEX_STORE_HYBRID_NIO_EXTENSIONS,

server/src/main/java/org/opensearch/index/IndexModule.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import org.opensearch.index.shard.IndexingOperationListener;
7878
import org.opensearch.index.shard.SearchOperationListener;
7979
import org.opensearch.index.similarity.SimilarityService;
80+
import org.opensearch.index.store.DefaultCompositeDirectoryFactory;
8081
import org.opensearch.index.store.FsDirectoryFactory;
8182
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
8283
import org.opensearch.index.store.remote.filecache.FileCache;
@@ -134,6 +135,8 @@ public final class IndexModule {
134135
public static final Setting<Boolean> NODE_STORE_ALLOW_MMAP = Setting.boolSetting("node.store.allow_mmap", true, Property.NodeScope);
135136

136137
private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory();
138+
private static final IndexStorePlugin.CompositeDirectoryFactory DEFAULT_COMPOSITE_DIRECTORY_FACTORY =
139+
new DefaultCompositeDirectoryFactory();
137140

138141
private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new;
139142

@@ -145,6 +148,14 @@ public final class IndexModule {
145148
Property.NodeScope
146149
);
147150

151+
public static final Setting<String> INDEX_COMPOSITE_STORE_TYPE_SETTING = new Setting<>(
152+
"index.composite_store.type",
153+
"default",
154+
Function.identity(),
155+
Property.IndexScope,
156+
Property.NodeScope
157+
);
158+
148159
/**
149160
* Index setting which used to determine how the data is cached locally fully or partially.
150161
*/
@@ -315,6 +326,7 @@ public Iterator<Setting<?>> settings() {
315326
private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
316327
private final Map<String, TriFunction<Settings, Version, ScriptService, Similarity>> similarities = new HashMap<>();
317328
private final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories;
329+
private final Map<String, IndexStorePlugin.CompositeDirectoryFactory> compositeDirectoryFactories;
318330
private final SetOnce<BiFunction<IndexSettings, IndicesQueryCache, QueryCache>> forceQueryCacheProvider = new SetOnce<>();
319331
private final List<SearchOperationListener> searchOperationListeners = new ArrayList<>();
320332
private final List<IndexingOperationListener> indexOperationListeners = new ArrayList<>();
@@ -340,6 +352,7 @@ public IndexModule(
340352
final EngineFactory engineFactory,
341353
final EngineConfigFactory engineConfigFactory,
342354
final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
355+
final Map<String, IndexStorePlugin.CompositeDirectoryFactory> compositeDirectoryFactories,
343356
final BooleanSupplier allowExpensiveQueries,
344357
final IndexNameExpressionResolver expressionResolver,
345358
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
@@ -353,6 +366,7 @@ public IndexModule(
353366
this.searchOperationListeners.add(new SearchSlowLog(indexSettings));
354367
this.indexOperationListeners.add(new IndexingSlowLog(indexSettings));
355368
this.directoryFactories = Collections.unmodifiableMap(directoryFactories);
369+
this.compositeDirectoryFactories = Collections.unmodifiableMap(compositeDirectoryFactories);
356370
this.allowExpensiveQueries = allowExpensiveQueries;
357371
this.expressionResolver = expressionResolver;
358372
this.recoveryStateFactories = recoveryStateFactories;
@@ -401,6 +415,7 @@ public IndexModule(
401415
engineFactory,
402416
engineConfigFactory,
403417
directoryFactories,
418+
Collections.emptyMap(),
404419
allowExpensiveQueries,
405420
expressionResolver,
406421
recoveryStateFactories,
@@ -788,6 +803,10 @@ public IndexService newIndexService(
788803
.get() == null ? (shard) -> null : indexReaderWrapper.get();
789804
eventListener.beforeIndexCreated(indexSettings.getIndex(), indexSettings.getSettings());
790805
final IndexStorePlugin.DirectoryFactory directoryFactory = getDirectoryFactory(indexSettings, directoryFactories);
806+
final IndexStorePlugin.CompositeDirectoryFactory compositeDirectoryFactory = getCompositeDirectoryFactory(
807+
indexSettings,
808+
compositeDirectoryFactories
809+
);
791810
final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory = getRecoveryStateFactory(indexSettings, recoveryStateFactories);
792811
QueryCache queryCache = null;
793812
IndexAnalyzers indexAnalyzers = null;
@@ -824,6 +843,7 @@ public IndexService newIndexService(
824843
client,
825844
queryCache,
826845
directoryFactory,
846+
compositeDirectoryFactory,
827847
remoteDirectoryFactory,
828848
eventListener,
829849
readerWrapperFactory,
@@ -885,6 +905,23 @@ private static IndexStorePlugin.DirectoryFactory getDirectoryFactory(
885905
return factory;
886906
}
887907

908+
private static IndexStorePlugin.CompositeDirectoryFactory getCompositeDirectoryFactory(
909+
final IndexSettings indexSettings,
910+
final Map<String, IndexStorePlugin.CompositeDirectoryFactory> compositeDirectoryFactories
911+
) {
912+
final String compositeStoreType = indexSettings.getValue(INDEX_COMPOSITE_STORE_TYPE_SETTING);
913+
final IndexStorePlugin.CompositeDirectoryFactory factory;
914+
if (compositeStoreType.isEmpty() || compositeDirectoryFactories.isEmpty()) {
915+
factory = DEFAULT_COMPOSITE_DIRECTORY_FACTORY;
916+
} else {
917+
factory = compositeDirectoryFactories.get(compositeStoreType);
918+
if (factory == null) {
919+
throw new IllegalArgumentException("Unknown composite store type [" + compositeStoreType + "]");
920+
}
921+
}
922+
return factory;
923+
}
924+
888925
private static IndexStorePlugin.RecoveryStateFactory getRecoveryStateFactory(
889926
final IndexSettings indexSettings,
890927
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@
9393
import org.opensearch.index.shard.ShardNotInPrimaryModeException;
9494
import org.opensearch.index.shard.ShardPath;
9595
import org.opensearch.index.similarity.SimilarityService;
96-
import org.opensearch.index.store.CompositeDirectory;
9796
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
9897
import org.opensearch.index.store.Store;
9998
import org.opensearch.index.store.remote.filecache.FileCache;
@@ -153,6 +152,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
153152
private final NodeEnvironment nodeEnv;
154153
private final ShardStoreDeleter shardStoreDeleter;
155154
private final IndexStorePlugin.DirectoryFactory directoryFactory;
155+
private final IndexStorePlugin.CompositeDirectoryFactory compositeDirectoryFactory;
156156
private final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory;
157157
private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory;
158158
private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper;
@@ -216,6 +216,7 @@ public IndexService(
216216
Client client,
217217
QueryCache queryCache,
218218
IndexStorePlugin.DirectoryFactory directoryFactory,
219+
IndexStorePlugin.CompositeDirectoryFactory compositeDirectoryFactory,
219220
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
220221
IndexEventListener eventListener,
221222
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> wrapperFactory,
@@ -296,6 +297,7 @@ public IndexService(
296297
this.eventListener = eventListener;
297298
this.nodeEnv = nodeEnv;
298299
this.directoryFactory = directoryFactory;
300+
this.compositeDirectoryFactory = compositeDirectoryFactory;
299301
this.remoteDirectoryFactory = remoteDirectoryFactory;
300302
this.recoveryStateFactory = recoveryStateFactory;
301303
this.engineFactory = Objects.requireNonNull(engineFactory);
@@ -454,6 +456,7 @@ public IndexService(
454456
client,
455457
queryCache,
456458
directoryFactory,
459+
null,
457460
remoteDirectoryFactory,
458461
eventListener,
459462
wrapperFactory,
@@ -726,8 +729,7 @@ protected void closeInternal() {
726729
if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_SETTING) &&
727730
// TODO : Need to remove this check after support for hot indices is added in Composite Directory
728731
this.indexSettings.isWarmIndex()) {
729-
Directory localDirectory = directoryFactory.newDirectory(this.indexSettings, path);
730-
directory = new CompositeDirectory(localDirectory, remoteDirectory, fileCache);
732+
directory = compositeDirectoryFactory.newDirectory(this.indexSettings, path, directoryFactory, remoteDirectory, fileCache);
731733
} else {
732734
directory = directoryFactory.newDirectory(this.indexSettings, path);
733735
}

server/src/main/java/org/opensearch/index/store/CompositeDirectory.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@
5353
@ExperimentalApi
5454
public class CompositeDirectory extends FilterDirectory {
5555
private static final Logger logger = LogManager.getLogger(CompositeDirectory.class);
56-
private final FSDirectory localDirectory;
57-
private final RemoteSegmentStoreDirectory remoteDirectory;
58-
private final FileCache fileCache;
59-
private final TransferManager transferManager;
56+
protected final FSDirectory localDirectory;
57+
protected final RemoteSegmentStoreDirectory remoteDirectory;
58+
protected final FileCache fileCache;
59+
protected final TransferManager transferManager;
6060

6161
/**
6262
* Constructor to initialise the composite directory
@@ -97,7 +97,7 @@ private String[] listLocalFiles() throws IOException {
9797
* @return A list of file names, including the original file (if present) and all its block files.
9898
* @throws IOException in case of I/O error while listing files.
9999
*/
100-
private List<String> listBlockFiles(String fileName) throws IOException {
100+
protected List<String> listBlockFiles(String fileName) throws IOException {
101101
return Stream.of(listLocalFiles())
102102
.filter(file -> file.equals(fileName) || file.startsWith(fileName + FileTypeUtils.BLOCK_FILE_IDENTIFIER))
103103
.collect(Collectors.toList());
@@ -384,7 +384,7 @@ private String[] getRemoteFiles() throws IOException {
384384
return remoteFiles;
385385
}
386386

387-
private void cacheFile(String name) throws IOException {
387+
protected void cacheFile(String name) throws IOException {
388388
Path filePath = getFilePath(name);
389389
// put will increase the refCount for the path, making sure it is not evicted, will decrease the ref after it is uploaded to Remote
390390
// so that it can be evicted after that
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.store;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.lucene.store.Directory;
14+
import org.opensearch.index.IndexSettings;
15+
import org.opensearch.index.shard.ShardPath;
16+
import org.opensearch.index.store.remote.filecache.FileCache;
17+
import org.opensearch.plugins.IndexStorePlugin;
18+
19+
import java.io.IOException;
20+
21+
/**
22+
* Default composite directory factory
23+
*/
24+
public class DefaultCompositeDirectoryFactory implements IndexStorePlugin.CompositeDirectoryFactory {
25+
26+
private static final Logger logger = LogManager.getLogger(DefaultCompositeDirectoryFactory.class);
27+
28+
@Override
29+
public Directory newDirectory(
30+
IndexSettings indexSettings,
31+
ShardPath shardPath,
32+
IndexStorePlugin.DirectoryFactory localDirectoryFactory,
33+
Directory remoteDirectory,
34+
FileCache fileCache
35+
) throws IOException {
36+
logger.trace("Creating composite directory from core - Default CompositeDirectoryFactory");
37+
Directory localDirectory = localDirectoryFactory.newDirectory(indexSettings, shardPath);
38+
return new CompositeDirectory(localDirectory, remoteDirectory, fileCache);
39+
}
40+
}

server/src/main/java/org/opensearch/indices/IndicesService.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,7 @@ public class IndicesService extends AbstractLifecycleComponent
344344
private final MetaStateService metaStateService;
345345
private final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders;
346346
private final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories;
347+
private final Map<String, IndexStorePlugin.CompositeDirectoryFactory> compositeDirectoryFactories;
347348
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
348349
final AbstractRefCounted indicesRefCount; // pkg-private for testing
349350
private final CountDownLatch closeLatch = new CountDownLatch(1);
@@ -390,6 +391,7 @@ public IndicesService(
390391
MetaStateService metaStateService,
391392
Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders,
392393
Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
394+
Map<String, IndexStorePlugin.CompositeDirectoryFactory> compositeDirectoryFactories,
393395
ValuesSourceRegistry valuesSourceRegistry,
394396
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
395397
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
@@ -452,6 +454,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
452454
this.engineFactoryProviders = engineFactoryProviders;
453455

454456
this.directoryFactories = directoryFactories;
457+
this.compositeDirectoryFactories = compositeDirectoryFactories;
455458
this.recoveryStateFactories = recoveryStateFactories;
456459
// doClose() is called when shutting down a node, yet there might still be ongoing requests
457460
// that we need to wait for before closing some resources such as the caches. In order to
@@ -628,6 +631,7 @@ public IndicesService(
628631
metaStateService,
629632
engineFactoryProviders,
630633
directoryFactories,
634+
Collections.emptyMap(),
631635
valuesSourceRegistry,
632636
recoveryStateFactories,
633637
remoteDirectoryFactory,
@@ -1022,6 +1026,7 @@ private synchronized IndexService createIndexService(
10221026
getEngineFactory(idxSettings),
10231027
getEngineConfigFactory(idxSettings),
10241028
directoryFactories,
1029+
compositeDirectoryFactories,
10251030
() -> allowExpensiveQueries,
10261031
indexNameExpressionResolver,
10271032
recoveryStateFactories,
@@ -1115,6 +1120,7 @@ public synchronized MapperService createIndexMapperService(IndexMetadata indexMe
11151120
getEngineFactory(idxSettings),
11161121
getEngineConfigFactory(idxSettings),
11171122
directoryFactories,
1123+
compositeDirectoryFactories,
11181124
() -> allowExpensiveQueries,
11191125
indexNameExpressionResolver,
11201126
recoveryStateFactories,

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@
156156
import org.opensearch.index.recovery.RemoteStoreRestoreService;
157157
import org.opensearch.index.remote.RemoteIndexPathUploader;
158158
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
159+
import org.opensearch.index.store.DefaultCompositeDirectoryFactory;
159160
import org.opensearch.index.store.IndexStoreListener;
160161
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
161162
import org.opensearch.index.store.remote.filecache.FileCache;
@@ -879,6 +880,22 @@ protected Node(
879880
});
880881
directoryFactories.putAll(builtInDirectoryFactories);
881882

883+
final Map<String, IndexStorePlugin.CompositeDirectoryFactory> compositeDirectoryFactories = new HashMap<>();
884+
pluginsService.filterPlugins(IndexStorePlugin.class)
885+
.stream()
886+
.map(IndexStorePlugin::getCompositeDirectoryFactories)
887+
.flatMap(m -> m.entrySet().stream())
888+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
889+
.forEach((k, v) -> {
890+
if (k.equals("default")) {
891+
throw new IllegalStateException(
892+
"registered composite index store type [" + k + "] conflicts with a built-in default type"
893+
);
894+
}
895+
compositeDirectoryFactories.put(k, v);
896+
});
897+
compositeDirectoryFactories.put("default", new DefaultCompositeDirectoryFactory());
898+
882899
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories = pluginsService.filterPlugins(
883900
IndexStorePlugin.class
884901
)
@@ -937,6 +954,7 @@ protected Node(
937954
metaStateService,
938955
engineFactoryProviders,
939956
Map.copyOf(directoryFactories),
957+
Map.copyOf(compositeDirectoryFactories),
940958
searchModule.getValuesSourceRegistry(),
941959
recoveryStateFactories,
942960
remoteDirectoryFactory,

0 commit comments

Comments
 (0)