Skip to content

Commit ce5f6cd

Browse files
author
Sachin Kale
committed
Add RemoteDirectory instance to Store as a secondary directory
Signed-off-by: Sachin Kale <[email protected]>
1 parent bcfd328 commit ce5f6cd

File tree

9 files changed

+194
-5
lines changed

9 files changed

+194
-5
lines changed

server/src/main/java/org/opensearch/index/IndexModule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import org.opensearch.index.shard.SearchOperationListener;
7171
import org.opensearch.index.similarity.SimilarityService;
7272
import org.opensearch.index.store.FsDirectoryFactory;
73+
import org.opensearch.index.store.RemoteDirectoryFactory;
7374
import org.opensearch.indices.IndicesQueryCache;
7475
import org.opensearch.indices.breaker.CircuitBreakerService;
7576
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
@@ -118,6 +119,8 @@ public final class IndexModule {
118119

119120
private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory();
120121

122+
private static final RemoteDirectoryFactory REMOTE_DIRECTORY_FACTORY = new RemoteDirectoryFactory();
123+
121124
private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new;
122125

123126
public static final Setting<String> INDEX_STORE_TYPE_SETTING = new Setting<>(
@@ -511,6 +514,7 @@ public IndexService newIndexService(
511514
client,
512515
queryCache,
513516
directoryFactory,
517+
REMOTE_DIRECTORY_FACTORY,
514518
eventListener,
515519
readerWrapperFactory,
516520
mapperRegistry,

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@
9595
import org.opensearch.indices.mapper.MapperRegistry;
9696
import org.opensearch.indices.recovery.RecoveryState;
9797
import org.opensearch.plugins.IndexStorePlugin;
98+
import org.opensearch.repositories.RepositoriesService;
99+
import org.opensearch.repositories.Repository;
100+
import org.opensearch.repositories.RepositoryMissingException;
98101
import org.opensearch.script.ScriptService;
99102
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
100103
import org.opensearch.threadpool.ThreadPool;
@@ -135,6 +138,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
135138
private final NodeEnvironment nodeEnv;
136139
private final ShardStoreDeleter shardStoreDeleter;
137140
private final IndexStorePlugin.DirectoryFactory directoryFactory;
141+
private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory;
138142
private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory;
139143
private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper;
140144
private final IndexCache indexCache;
@@ -189,6 +193,7 @@ public IndexService(
189193
Client client,
190194
QueryCache queryCache,
191195
IndexStorePlugin.DirectoryFactory directoryFactory,
196+
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
192197
IndexEventListener eventListener,
193198
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> wrapperFactory,
194199
MapperRegistry mapperRegistry,
@@ -259,6 +264,7 @@ public IndexService(
259264
this.eventListener = eventListener;
260265
this.nodeEnv = nodeEnv;
261266
this.directoryFactory = directoryFactory;
267+
this.remoteDirectoryFactory = remoteDirectoryFactory;
262268
this.recoveryStateFactory = recoveryStateFactory;
263269
this.engineFactory = Objects.requireNonNull(engineFactory);
264270
this.engineConfigFactory = Objects.requireNonNull(engineConfigFactory);
@@ -423,7 +429,8 @@ private long getAvgShardSizeInBytes() throws IOException {
423429
public synchronized IndexShard createShard(
424430
final ShardRouting routing,
425431
final Consumer<ShardId> globalCheckpointSyncer,
426-
final RetentionLeaseSyncer retentionLeaseSyncer
432+
final RetentionLeaseSyncer retentionLeaseSyncer,
433+
final RepositoriesService repositoriesService
427434
) throws IOException {
428435
Objects.requireNonNull(retentionLeaseSyncer);
429436
/*
@@ -497,10 +504,23 @@ public synchronized IndexShard createShard(
497504
}
498505
};
499506
Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
507+
Directory remoteDirectory = null;
508+
if (this.indexSettings.isRemoteStoreEnabled()) {
509+
try {
510+
Repository repository = repositoriesService.repository("dragon-stone");
511+
remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path, repository);
512+
} catch (RepositoryMissingException e) {
513+
throw new IllegalArgumentException(
514+
"Repository should be created before creating index with remote_store enabled setting",
515+
e
516+
);
517+
}
518+
}
500519
store = new Store(
501520
shardId,
502521
this.indexSettings,
503522
directory,
523+
remoteDirectory,
504524
lock,
505525
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId))
506526
);
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.store;
10+
11+
import org.apache.lucene.store.Directory;
12+
import org.opensearch.common.blobstore.BlobContainer;
13+
import org.opensearch.common.blobstore.BlobPath;
14+
import org.opensearch.index.IndexSettings;
15+
import org.opensearch.index.shard.ShardPath;
16+
import org.opensearch.plugins.IndexStorePlugin;
17+
import org.opensearch.repositories.Repository;
18+
import org.opensearch.repositories.blobstore.BlobStoreRepository;
19+
20+
import java.io.IOException;
21+
22+
public class RemoteDirectoryFactory implements IndexStorePlugin.RemoteDirectoryFactory {
23+
24+
@Override
25+
public Directory newDirectory(IndexSettings indexSettings, ShardPath path, Repository repository) throws IOException {
26+
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
27+
BlobPath blobPath = new BlobPath();
28+
blobPath = blobPath.add(indexSettings.getIndex().getName()).add(String.valueOf(path.getShardId().getId()));
29+
BlobContainer blobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(blobPath);
30+
return new RemoteDirectory(blobContainer);
31+
}
32+
}

server/src/main/java/org/opensearch/index/store/Store.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
174174

175175
private final AtomicBoolean isClosed = new AtomicBoolean(false);
176176
private final StoreDirectory directory;
177+
private final StoreDirectory secondaryDirectory;
177178
private final ReentrantReadWriteLock metadataLock = new ReentrantReadWriteLock();
178179
private final ShardLock shardLock;
179180
private final OnClose onClose;
@@ -187,15 +188,36 @@ protected void closeInternal() {
187188
};
188189

189190
public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock) {
190-
this(shardId, indexSettings, directory, shardLock, OnClose.EMPTY);
191+
this(shardId, indexSettings, directory, null, shardLock, OnClose.EMPTY);
192+
}
193+
194+
public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, Directory secondaryDirectory, ShardLock shardLock) {
195+
this(shardId, indexSettings, directory, secondaryDirectory, shardLock, OnClose.EMPTY);
191196
}
192197

193198
public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock, OnClose onClose) {
199+
this(shardId, indexSettings, directory, null, shardLock, onClose);
200+
}
201+
202+
public Store(
203+
ShardId shardId,
204+
IndexSettings indexSettings,
205+
Directory directory,
206+
Directory secondaryDirectory,
207+
ShardLock shardLock,
208+
OnClose onClose
209+
) {
194210
super(shardId, indexSettings);
195211
final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING);
196212
logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval);
197213
ByteSizeCachingDirectory sizeCachingDir = new ByteSizeCachingDirectory(directory, refreshInterval);
198214
this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId));
215+
if (secondaryDirectory != null) {
216+
ByteSizeCachingDirectory sizeCachingSecondaryDir = new ByteSizeCachingDirectory(secondaryDirectory, refreshInterval);
217+
this.secondaryDirectory = new StoreDirectory(sizeCachingSecondaryDir, Loggers.getLogger("index.store.deletes", shardId));
218+
} else {
219+
this.secondaryDirectory = null;
220+
}
199221
this.shardLock = shardLock;
200222
this.onClose = onClose;
201223

@@ -205,8 +227,20 @@ public Store(ShardId shardId, IndexSettings indexSettings, Directory directory,
205227
}
206228

207229
public Directory directory() {
230+
return directory(true);
231+
}
232+
233+
public Directory secondaryDirectory() {
234+
return directory(false);
235+
}
236+
237+
private Directory directory(boolean primary) {
208238
ensureOpen();
209-
return directory;
239+
if (primary) {
240+
return directory;
241+
} else {
242+
return secondaryDirectory;
243+
}
210244
}
211245

212246
/**

server/src/main/java/org/opensearch/indices/IndicesService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -852,7 +852,7 @@ public IndexShard createShard(
852852
IndexService indexService = indexService(shardRouting.index());
853853
assert indexService != null;
854854
RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode);
855-
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer);
855+
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, repositoriesService);
856856
indexShard.addShardFailureCallback(onShardFailure);
857857
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> {
858858
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS

server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.opensearch.index.IndexSettings;
4040
import org.opensearch.index.shard.ShardPath;
4141
import org.opensearch.indices.recovery.RecoveryState;
42+
import org.opensearch.repositories.Repository;
4243

4344
import java.io.IOException;
4445
import java.util.Collections;
@@ -66,6 +67,22 @@ interface DirectoryFactory {
6667
Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException;
6768
}
6869

70+
/**
71+
* An interface that describes how to create a new remote directory instance per shard.
72+
*/
73+
@FunctionalInterface
74+
interface RemoteDirectoryFactory {
75+
/**
76+
* Creates a new remote directory per shard. This method is called once per shard on shard creation.
77+
* @param indexSettings the shards index settings
78+
* @param shardPath the path the shard is using
79+
* @param repository to get the BlobContainer details
80+
* @return a new RemoteDirectory instance
81+
* @throws IOException if an IOException occurs while opening the directory
82+
*/
83+
Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath, Repository repository) throws IOException;
84+
}
85+
6986
/**
7087
* The {@link DirectoryFactory} mappings for this plugin. When an index is created the store type setting
7188
* {@link org.opensearch.index.IndexModule#INDEX_STORE_TYPE_SETTING} on the index will be examined and either use the default or a
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.store;
10+
11+
import org.apache.lucene.store.Directory;
12+
import org.junit.Before;
13+
import org.mockito.ArgumentCaptor;
14+
import org.opensearch.common.blobstore.BlobContainer;
15+
import org.opensearch.common.blobstore.BlobPath;
16+
import org.opensearch.common.blobstore.BlobStore;
17+
import org.opensearch.common.settings.Settings;
18+
import org.opensearch.index.IndexSettings;
19+
import org.opensearch.index.shard.ShardId;
20+
import org.opensearch.index.shard.ShardPath;
21+
import org.opensearch.repositories.blobstore.BlobStoreRepository;
22+
import org.opensearch.test.IndexSettingsModule;
23+
import org.opensearch.test.OpenSearchTestCase;
24+
25+
import java.io.IOException;
26+
import java.nio.file.Path;
27+
import java.util.Collections;
28+
29+
import static org.mockito.ArgumentMatchers.any;
30+
import static org.mockito.Mockito.*;
31+
32+
public class RemoteDirectoryFactoryTests extends OpenSearchTestCase {
33+
34+
private RemoteDirectoryFactory remoteDirectoryFactory;
35+
36+
@Before
37+
public void setup() {
38+
remoteDirectoryFactory = new RemoteDirectoryFactory();
39+
}
40+
41+
public void testNewDirectory() throws IOException {
42+
Settings settings = Settings.builder().build();
43+
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
44+
Path tempDir = createTempDir().resolve(indexSettings.getUUID()).resolve("0");
45+
ShardPath shardPath = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0));
46+
BlobStoreRepository repository = mock(BlobStoreRepository.class);
47+
BlobStore blobStore = mock(BlobStore.class);
48+
BlobContainer blobContainer = mock(BlobContainer.class);
49+
when(repository.blobStore()).thenReturn(blobStore);
50+
when(blobStore.blobContainer(any())).thenReturn(blobContainer);
51+
when(blobContainer.listBlobs()).thenReturn(Collections.emptyMap());
52+
53+
Directory directory = remoteDirectoryFactory.newDirectory(indexSettings, shardPath, repository);
54+
assertTrue(directory instanceof RemoteDirectory);
55+
ArgumentCaptor<BlobPath> blobPathCaptor = ArgumentCaptor.forClass(BlobPath.class);
56+
verify(blobStore).blobContainer(blobPathCaptor.capture());
57+
BlobPath blobPath = blobPathCaptor.getValue();
58+
assertEquals("foo/0/", blobPath.buildAsString());
59+
60+
directory.listAll();
61+
verify(blobContainer).listBlobs();
62+
}
63+
}

server/src/test/java/org/opensearch/index/store/StoreTests.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1138,4 +1138,23 @@ public void testGetPendingFiles() throws IOException {
11381138
}
11391139
}
11401140
}
1141+
1142+
public void testStoreWithoutSecondaryDirectory() throws IOException {
1143+
final ShardId shardId = new ShardId("index", "_na_", 1);
1144+
Directory primaryDirectory = new NIOFSDirectory(createTempDir("primary"));
1145+
try (Store store = new Store(shardId, INDEX_SETTINGS, primaryDirectory, new DummyShardLock(shardId))) {
1146+
assertEquals(primaryDirectory, FilterDirectory.unwrap(FilterDirectory.unwrap(store.directory())));
1147+
assertNull(store.secondaryDirectory());
1148+
}
1149+
}
1150+
1151+
public void testStoreWithSecondaryDirectory() throws IOException {
1152+
final ShardId shardId = new ShardId("index", "_na_", 1);
1153+
Directory primaryDirectory = new NIOFSDirectory(createTempDir("primary"));
1154+
Directory secondaryDirectory = new NIOFSDirectory(createTempDir("secondary"));
1155+
try (Store store = new Store(shardId, INDEX_SETTINGS, primaryDirectory, secondaryDirectory, new DummyShardLock(shardId))) {
1156+
assertEquals(primaryDirectory, FilterDirectory.unwrap(FilterDirectory.unwrap(store.directory())));
1157+
assertEquals(secondaryDirectory, FilterDirectory.unwrap(FilterDirectory.unwrap(store.secondaryDirectory())));
1158+
}
1159+
}
11411160
}

server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem
148148
newRouting = newRouting.moveToUnassigned(unassignedInfo)
149149
.updateUnassigned(unassignedInfo, RecoverySource.EmptyStoreRecoverySource.INSTANCE);
150150
newRouting = ShardRoutingHelper.initialize(newRouting, nodeId);
151-
IndexShard shard = index.createShard(newRouting, s -> {}, RetentionLeaseSyncer.EMPTY);
151+
IndexShard shard = index.createShard(newRouting, s -> {}, RetentionLeaseSyncer.EMPTY, null);
152152
IndexShardTestCase.updateRoutingEntry(shard, newRouting);
153153
assertEquals(5, counter.get());
154154
final DiscoveryNode localNode = new DiscoveryNode(

0 commit comments

Comments
 (0)