diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index bfd4e30b3b12f..1f429d3247a29 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -1642,8 +1642,8 @@ static void validateTranslogDurabilitySettings(Settings requestSettings, Cluster } public static void validateIndexStoreLocality(Settings indexSettings) { - if (indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.LocalityType.FULL.toString()) - .equalsIgnoreCase(IndexModule.LocalityType.PARTIAL.toString()) + if (indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL.toString()) + .equalsIgnoreCase(IndexModule.DataLocalityType.PARTIAL.toString()) && !FeatureFlags.isEnabled(FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING)) { throw new IllegalArgumentException( "index.store.locality can be set to PARTIAL only if Feature Flag for Writable Remote Index is true" diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java index e2c48496cc8df..e16e75de7f27d 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java @@ -32,6 +32,7 @@ package org.opensearch.common.blobstore; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.core.action.ActionListener; import java.io.IOException; diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 489855d10801c..b4fd7666ba45a 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -146,10 +146,10 @@ public final class IndexModule { /** * Index setting which used to determine how the data is cached locally fully or partially */ - public static final Setting INDEX_STORE_LOCALITY_SETTING = new Setting<>( - "index.store.locality", - LocalityType.FULL.name(), - LocalityType::getValueOf, + public static final Setting INDEX_STORE_LOCALITY_SETTING = new Setting<>( + "index.store.data_locality", + DataLocalityType.FULL.name(), + DataLocalityType::getValueOf, Property.IndexScope, Property.NodeScope ); @@ -593,24 +593,33 @@ public boolean match(Settings settings) { } } - public enum LocalityType { + /** + * Indicates the locality of the data - whether it will be cached fully or partially + */ + public enum DataLocalityType { + /** + * Indicates that all the data will be cached locally + */ FULL, + /** + * Indicates that only a subset of the data will be cached locally + */ PARTIAL; - private static final Map LOCALITY_TYPES; + private static final Map LOCALITY_TYPES; static { - final Map localityTypes = new HashMap<>(values().length); - for (final LocalityType localityType : values()) { - localityTypes.put(localityType.name(), localityType); + final Map localityTypes = new HashMap<>(values().length); + for (final DataLocalityType dataLocalityType : values()) { + localityTypes.put(dataLocalityType.name(), dataLocalityType); } LOCALITY_TYPES = Collections.unmodifiableMap(localityTypes); } - public static LocalityType getValueOf(final String localityType) { + public static DataLocalityType getValueOf(final String localityType) { Objects.requireNonNull(localityType, "No locality type given."); final String localityTypeName = toRootUpperCase(localityType.trim()); - final LocalityType type = LOCALITY_TYPES.get(localityTypeName); + final DataLocalityType type = LOCALITY_TYPES.get(localityTypeName); if (type != null) { return type; } diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index c6b5a8fd11075..edc6d4a9840e2 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -93,9 +93,8 @@ import org.opensearch.index.shard.ShardNotInPrimaryModeException; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.similarity.SimilarityService; -import org.opensearch.index.store.BaseRemoteSegmentStoreDirectory; import org.opensearch.index.store.CompositeDirectory; -import org.opensearch.index.store.FsDirectoryFactory; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.Store; import org.opensearch.index.store.remote.filecache.FileCache; @@ -536,14 +535,9 @@ public synchronized IndexShard createShard( * Composite Directory currently uses FSDirectory's getDirectory() method to fetch and use the Path for operating on FileCache * TODO : Refactor FileCache to have key in form of String instead of Path. Once that is done we can remove this assertion */ - assert directoryFactory instanceof FsDirectoryFactory - : "For Composite Directory, local directory must be of type FSDirectory"; Directory localDirectory = directoryFactory.newDirectory(this.indexSettings, path); - directory = new CompositeDirectory( - (FSDirectory) localDirectory, - (BaseRemoteSegmentStoreDirectory) remoteDirectory, - fileCache - ); + assert localDirectory instanceof FSDirectory : "For Composite Directory, local directory must be of type FSDirectory"; + directory = new CompositeDirectory((FSDirectory) localDirectory, (RemoteSegmentStoreDirectory) remoteDirectory, fileCache); } else { directory = directoryFactory.newDirectory(this.indexSettings, path); } diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index ce26800ff5f05..9053024f7f1e0 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -918,8 +918,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null); replicationType = IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(settings); isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false); - isStoreLocalityPartial = settings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.LocalityType.FULL.toString()) - .equalsIgnoreCase(IndexModule.LocalityType.PARTIAL.toString()); + isStoreLocalityPartial = settings.get( + IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), + IndexModule.DataLocalityType.FULL.toString() + ).equalsIgnoreCase(IndexModule.DataLocalityType.PARTIAL.toString()); remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY); remoteTranslogUploadBufferInterval = INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings); remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY); diff --git a/server/src/main/java/org/opensearch/index/store/BaseRemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/BaseRemoteSegmentStoreDirectory.java deleted file mode 100644 index 83fb2d2800eae..0000000000000 --- a/server/src/main/java/org/opensearch/index/store/BaseRemoteSegmentStoreDirectory.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.store; - -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FilterDirectory; - -import java.io.IOException; -import java.nio.file.Path; - -/** - * BaseRemoteSegmentsStoreDirectory - abstract class created to extend on the functionality of RemoteSegmentStoreDirectory - * Can add other public methods of RemoteSegmentStoreDirectory here as well such as init() and readLatestMetadata() - */ - -public abstract class BaseRemoteSegmentStoreDirectory extends FilterDirectory { - - protected BaseRemoteSegmentStoreDirectory(Directory in) { - super(in); - } - - /** - * Function to download a specific part of file (from pos to pos + length) from the remote store to a desired location - */ - - public abstract void downloadFile(String fileName, long pos, long length, Path location) throws IOException; -} diff --git a/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java b/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java index 14f9a23c84f7b..e668ac31e57ee 100644 --- a/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java @@ -16,10 +16,12 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.Lock; +import org.apache.lucene.store.LockObtainFailedException; import org.opensearch.index.store.remote.file.OnDemandCompositeBlockIndexInput; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.utils.FileType; +import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.NoSuchFileException; import java.util.ArrayList; @@ -30,22 +32,40 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; +/** + * Composite Directory will contain both local and remote directory + * Consumers of Composite directory need not worry whether file is in local or remote + * All such abstractions will be handled by the Composite directory itself + * Implements all required methods by Directory abstraction + */ public class CompositeDirectory extends FilterDirectory { private static final Logger logger = LogManager.getLogger(CompositeDirectory.class); private final FSDirectory localDirectory; - private final BaseRemoteSegmentStoreDirectory remoteDirectory; + private final RemoteSegmentStoreDirectory remoteDirectory; private final FileCache fileCache; private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock(); private final ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock(); - public CompositeDirectory(FSDirectory localDirectory, BaseRemoteSegmentStoreDirectory remoteDirectory, FileCache fileCache) { + /** + * Constructor to initialise the composite directory + * @param localDirectory corresponding to the local FSDirectory + * @param remoteDirectory corresponding to the remote directory + * @param fileCache used to cache the remote files locally + */ + public CompositeDirectory(FSDirectory localDirectory, RemoteSegmentStoreDirectory remoteDirectory, FileCache fileCache) { super(localDirectory); this.localDirectory = localDirectory; this.remoteDirectory = remoteDirectory; this.fileCache = fileCache; } + /** + * Returns names of all files stored in this directory in sorted order + * Does not include locally stored block files (having _block_ in their names) + * + * @throws IOException in case of I/O error + */ @Override public String[] listAll() throws IOException { logger.trace("listAll() called ..."); @@ -72,6 +92,12 @@ public String[] listAll() throws IOException { } } + /** + * Removes an existing file in the directory. + * Throws {@link NoSuchFileException} or {@link FileNotFoundException} in case file is not present locally and in remote as well + * @param name the name of an existing file. + * @throws IOException in case of I/O error + */ @Override public void deleteFile(String name) throws IOException { logger.trace("deleteFile() called {}", name); @@ -79,18 +105,24 @@ public void deleteFile(String name) throws IOException { try { localDirectory.deleteFile(name); fileCache.remove(localDirectory.getDirectory().resolve(name)); - } catch (NoSuchFileException e) { - /** - * We might encounter NoSuchFileException in case file is deleted from local - * But if it is present in remote we should just skip deleting this file - * TODO : Handle cases where file is not present in remote as well - */ - logger.trace("NoSuchFileException encountered while deleting {} from local", name); + } catch (NoSuchFileException | FileNotFoundException e) { + logger.trace("File {} not found in local, trying to delete from Remote", name); + try { + remoteDirectory.deleteFile(name); + } finally { + writeLock.unlock(); + } } finally { writeLock.unlock(); } } + /** + * Returns the byte length of a file in the directory. + * Throws {@link NoSuchFileException} or {@link FileNotFoundException} in case file is not present locally and in remote as well + * @param name the name of an existing file. + * @throws IOException in case of I/O error + */ @Override public long fileLength(String name) throws IOException { logger.trace("fileLength() called {}", name); @@ -110,6 +142,12 @@ public long fileLength(String name) throws IOException { } } + /** + * Creates a new, empty file in the directory and returns an {@link IndexOutput} instance for + * appending data to this file. + * @param name the name of the file to create. + * @throws IOException in case of I/O error + */ @Override public IndexOutput createOutput(String name, IOContext context) throws IOException { logger.trace("createOutput() called {}", name); @@ -121,6 +159,13 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti } } + /** + * Creates a new, empty, temporary file in the directory and returns an {@link IndexOutput} + * instance for appending data to this file. + * + *

The temporary file name (accessible via {@link IndexOutput#getName()}) will start with + * {@code prefix}, end with {@code suffix} and have a reserved file extension {@code .tmp}. + */ @Override public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException { logger.trace("createTempOutput() called {} , {}", prefix, suffix); @@ -132,6 +177,10 @@ public IndexOutput createTempOutput(String prefix, String suffix, IOContext cont } } + /** + * Ensures that any writes to these files are moved to stable storage (made durable). + * @throws IOException in case of I/O error + */ @Override public void sync(Collection names) throws IOException { logger.trace("sync() called {}", names); @@ -149,6 +198,10 @@ public void sync(Collection names) throws IOException { } } + /** + * Ensures that directory metadata, such as recent file renames, are moved to stable storage. + * @throws IOException in case of I/O error + */ @Override public void syncMetaData() throws IOException { logger.trace("syncMetaData() called "); @@ -160,6 +213,11 @@ public void syncMetaData() throws IOException { } } + /** + * Renames {@code source} file to {@code dest} file where {@code dest} must not already exist in + * the directory. + * @throws IOException in case of I/O error + */ @Override public void rename(String source, String dest) throws IOException { logger.trace("rename() called {}, {}", source, dest); @@ -171,6 +229,12 @@ public void rename(String source, String dest) throws IOException { } } + /** + * Opens a stream for reading an existing file. + * Check whether the file is present locally or in remote and return the IndexInput accordingly + * @param name the name of an existing file. + * @throws IOException in case of I/O error + */ @Override public IndexInput openInput(String name, IOContext context) throws IOException { logger.trace("openInput() called {}", name); @@ -191,6 +255,13 @@ public IndexInput openInput(String name, IOContext context) throws IOException { } } + /** + * Acquires and returns a {@link Lock} for a file with the given name. + * @param name the name of the lock file + * @throws LockObtainFailedException (optional specific exception) if the lock could not be + * obtained because it is currently held elsewhere. + * @throws IOException in case of I/O error + */ @Override public Lock obtainLock(String name) throws IOException { logger.trace("obtainLock() called {}", name); @@ -202,6 +273,10 @@ public Lock obtainLock(String name) throws IOException { } } + /** + * Closes the directory + * @throws IOException in case of I/O error + */ @Override public void close() throws IOException { writeLock.lock(); @@ -212,6 +287,10 @@ public void close() throws IOException { } } + /** + * Returns a set of files currently pending deletion in this directory. + * @throws IOException in case of I/O error + */ @Override public Set getPendingDeletions() throws IOException { readLock.lock(); @@ -224,9 +303,9 @@ public Set getPendingDeletions() throws IOException { /** * Function to perform operations once files have been uploaded to Remote Store - * Once uploaded local files are safe to delete - * @param files : files which have been successfully uploaded to Remote Store - * @throws IOException + * Currently deleting the local files here, as once uploaded to Remote, local files are safe to delete + * @param files : recent files which have been successfully uploaded to Remote Store + * @throws IOException in case of I/O error */ public void afterSyncToRemote(Collection files) throws IOException { logger.trace("afterSyncToRemote called for {}", files); @@ -244,6 +323,9 @@ public void afterSyncToRemote(Collection files) throws IOException { } } + /** + * Return the list of files present in Remote + */ private String[] getRemoteFiles() { String[] remoteFiles; try { diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 345583bbbd1be..e6d5af04e0cb7 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -28,8 +28,10 @@ import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.core.action.ActionListener; import org.opensearch.index.store.exception.ChecksumCombinationException; +import org.opensearch.index.store.remote.utils.BlockIOContext; import java.io.FileNotFoundException; import java.io.IOException; @@ -198,10 +200,18 @@ public IndexInput openInput(String name, IOContext context) throws IOException { public IndexInput openInput(String name, long fileLength, IOContext context) throws IOException { InputStream inputStream = null; try { - inputStream = blobContainer.readBlob(name); - return new RemoteIndexInput(name, downloadRateLimiter.apply(inputStream), fileLength); + if (context instanceof BlockIOContext) { + long position = ((BlockIOContext) context).getBlockStart(); + long length = ((BlockIOContext) context).getBlockSize(); + inputStream = blobContainer.readBlob(name, position, length); + byte[] bytes = downloadRateLimiter.apply(inputStream).readAllBytes(); + return new ByteArrayIndexInput(name, bytes); + } else { + inputStream = blobContainer.readBlob(name); + return new RemoteIndexInput(name, downloadRateLimiter.apply(inputStream), fileLength); + } } catch (Exception e) { - // Incase the RemoteIndexInput creation fails, close the input stream to avoid file handler leak. + // In case the RemoteIndexInput creation fails, close the input stream to avoid file handler leak. if (inputStream != null) { try { inputStream.close(); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index d34732a3cd936..ec1163fe91b6c 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -18,6 +18,7 @@ import org.apache.lucene.store.ByteBuffersDataOutput; import org.apache.lucene.store.ByteBuffersIndexOutput; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; @@ -41,14 +42,10 @@ import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.threadpool.ThreadPool; -import java.io.BufferedOutputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; -import java.nio.file.Files; import java.nio.file.NoSuchFileException; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -75,7 +72,7 @@ * @opensearch.api */ @PublicApi(since = "2.3.0") -public final class RemoteSegmentStoreDirectory extends BaseRemoteSegmentStoreDirectory implements RemoteStoreCommitLevelLockManager { +public final class RemoteSegmentStoreDirectory extends FilterDirectory implements RemoteStoreCommitLevelLockManager { /** * Each segment file is uploaded with unique suffix. @@ -220,22 +217,6 @@ private RemoteSegmentMetadata readMetadataFile(String metadataFilename) throws I } } - @Override - public void downloadFile(String fileName, long pos, long length, Path filePath) throws IOException { - OutputStream fileOutputStream = null, localFileOutputStream = null; - InputStream inputStream = null; - try { - fileOutputStream = Files.newOutputStream(filePath); - localFileOutputStream = new BufferedOutputStream(fileOutputStream); - inputStream = remoteDataDirectory.blobContainer.readBlob(getExistingRemoteFilename(fileName), pos, length); - inputStream.transferTo(localFileOutputStream); - } finally { - localFileOutputStream.close(); - fileOutputStream.close(); - inputStream.close(); - } - } - /** * Metadata of a segment that is uploaded to remote segment store. * diff --git a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandCompositeBlockIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandCompositeBlockIndexInput.java index be8378c28e61e..b2af3e7305c5d 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandCompositeBlockIndexInput.java +++ b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandCompositeBlockIndexInput.java @@ -13,28 +13,37 @@ import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; -import org.opensearch.index.store.BaseRemoteSegmentStoreDirectory; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.remote.filecache.CachedIndexInput; import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.index.store.remote.utils.BlockIOContext; +import java.io.BufferedOutputStream; +import java.io.FileNotFoundException; import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.util.concurrent.atomic.AtomicBoolean; +/** + * OnDemandCompositeBlockIndexInput is used by the Composite Directory to read data in blocks from Remote and cache those blocks in FileCache + */ public class OnDemandCompositeBlockIndexInput extends OnDemandBlockIndexInput { private static final Logger logger = LogManager.getLogger(OnDemandCompositeBlockIndexInput.class); - private final BaseRemoteSegmentStoreDirectory remoteDirectory; + private final RemoteSegmentStoreDirectory remoteDirectory; private final String fileName; private final Long originalFileSize; - private final FSDirectory directory; + private final FSDirectory localDirectory; private final IOContext context; private final FileCache fileCache; public OnDemandCompositeBlockIndexInput( - BaseRemoteSegmentStoreDirectory remoteDirectory, + RemoteSegmentStoreDirectory remoteDirectory, String fileName, - FSDirectory directory, + FSDirectory localDirectory, FileCache fileCache, IOContext context ) throws IOException { @@ -46,7 +55,7 @@ public OnDemandCompositeBlockIndexInput( .length(remoteDirectory.fileLength(fileName)), remoteDirectory, fileName, - directory, + localDirectory, fileCache, context ); @@ -54,15 +63,15 @@ public OnDemandCompositeBlockIndexInput( public OnDemandCompositeBlockIndexInput( Builder builder, - BaseRemoteSegmentStoreDirectory remoteDirectory, + RemoteSegmentStoreDirectory remoteDirectory, String fileName, - FSDirectory directory, + FSDirectory localDirectory, FileCache fileCache, IOContext context ) throws IOException { super(builder); this.remoteDirectory = remoteDirectory; - this.directory = directory; + this.localDirectory = localDirectory; this.fileName = fileName; this.fileCache = fileCache; this.context = context; @@ -81,7 +90,7 @@ protected OnDemandCompositeBlockIndexInput buildSlice(String sliceDescription, l .resourceDescription(sliceDescription), remoteDirectory, fileName, - directory, + localDirectory, fileCache, context ); @@ -104,17 +113,12 @@ protected IndexInput fetchBlock(int blockId) throws IOException { length, originalFileSize ); - Path filePath = directory.getDirectory().resolve(blockFileName); - final CachedIndexInput cacheEntry = fileCache.compute(filePath, (path, cachedIndexInput) -> { + Path blockFilePath = getLocalFilePath(blockFileName); + final CachedIndexInput cacheEntry = fileCache.compute(blockFilePath, (path, cachedIndexInput) -> { if (cachedIndexInput == null || cachedIndexInput.isClosed()) { // Doesn't exist or is closed, either way create a new one - try { - logger.trace("Downloading block from Remote"); - remoteDirectory.downloadFile(fileName, blockStart, length, filePath); - } catch (IOException e) { - throw new RuntimeException(e); - } - return new CachedIndexInputImpl(blockFileName); + IndexInput indexInput = fetchIndexInput(blockFileName, blockStart, length); + return new CachedIndexInputImpl(indexInput); } else { logger.trace("Block already present in cache"); // already in the cache and ready to be used (open) @@ -137,36 +141,88 @@ private long getActualBlockSize(int blockId) { return (blockId != getBlock(originalFileSize - 1)) ? blockSize : getBlockOffset(originalFileSize - 1) + 1; } + private Path getLocalFilePath(String file) { + return localDirectory.getDirectory().resolve(file); + } + + private IndexInput fetchIndexInput(String blockFileName, long start, long length) { + IndexInput indexInput; + Path filePath = getLocalFilePath(blockFileName); + try { + // Fetch from local if block file is present locally in disk + indexInput = localDirectory.openInput(blockFileName, IOContext.READ); + logger.trace("Block file present locally, just putting it in cache"); + } catch (FileNotFoundException | NoSuchFileException e) { + logger.trace("Block file not present locally, fetching from Remote"); + // If block file is not present locally in disk, fetch from remote and persist the block file in disk + try ( + OutputStream fileOutputStream = Files.newOutputStream(filePath); + OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream) + ) { + logger.trace("Fetching block file from Remote"); + indexInput = remoteDirectory.openInput(fileName, new BlockIOContext(IOContext.READ, start, length)); + logger.trace("Persisting the fetched blocked file from Remote"); + int indexInputLength = (int) indexInput.length(); + byte[] bytes = new byte[indexInputLength]; + indexInput.readBytes(bytes, 0, indexInputLength); + localFileOutputStream.write(bytes); + } catch (Exception err) { + logger.trace("Exception while fetching block from remote and persisting it on disk"); + throw new RuntimeException(err); + } + } catch (Exception e) { + logger.trace("Exception while fetching block file locally"); + throw new RuntimeException(e); + } + return indexInput; + } + + /** + * Implementation of the CachedIndexInput interface + */ private class CachedIndexInputImpl implements CachedIndexInput { + + IndexInput indexInput; AtomicBoolean isClosed; - String blockFileName; - CachedIndexInputImpl(String blockFileName) { - this.blockFileName = blockFileName; + /** + * Constructor - takes IndexInput as parameter + */ + CachedIndexInputImpl(IndexInput indexInput) { + this.indexInput = indexInput; isClosed = new AtomicBoolean(false); } + /** + * Returns the wrapped indexInput + */ @Override public IndexInput getIndexInput() throws IOException { - return directory.openInput(blockFileName, context); + return indexInput; } + /** + * Returns the length of the wrapped indexInput + */ @Override public long length() { - try { - return directory.fileLength(blockFileName); - } catch (IOException e) { - throw new RuntimeException(e); - } + return indexInput.length(); } + /** + * Checks if the wrapped indexInput is closed + */ @Override public boolean isClosed() { return isClosed.get(); } + /** + * Closes the wrapped indexInput + */ @Override public void close() throws Exception { + indexInput.close(); isClosed.set(true); } } diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/BlockIOContext.java b/server/src/main/java/org/opensearch/index/store/remote/utils/BlockIOContext.java new file mode 100644 index 0000000000000..da94e4a46d307 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/BlockIOContext.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.utils; + +import org.apache.lucene.store.IOContext; + +/** + * BlockIOContext is an extension of IOContext which can be used to pass block related information to the openInput() method of any directory + */ +public class BlockIOContext extends IOContext { + + private final boolean isBlockRequest; + private long blockStart; + private long blockSize; + + /** + * Default constructor + */ + BlockIOContext(IOContext ctx) { + super(ctx.context); + this.isBlockRequest = false; + this.blockStart = -1; + this.blockSize = -1; + } + + /** + * Constructor to initialise BlockIOContext with block related information + */ + public BlockIOContext(IOContext ctx, long blockStart, long blockSize) { + super(ctx.context); + this.isBlockRequest = true; + this.blockStart = blockStart; + this.blockSize = blockSize; + } + + /** + * Function to check if the Context contains a block request or not + */ + public boolean isBlockRequest() { + return isBlockRequest; + } + + /** + * Getter for blockStart + */ + public long getBlockStart() { + return blockStart; + } + + /** + * Getter for blockSize + */ + public long getBlockSize() { + return blockSize; + } +} diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/FileType.java b/server/src/main/java/org/opensearch/index/store/remote/utils/FileType.java index faf3e0519699e..418f8a24a5f24 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/FileType.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/FileType.java @@ -8,14 +8,29 @@ package org.opensearch.index.store.remote.utils; +/** + * Enum to represent whether a file is block or not + */ public enum FileType { + /** + * Block file + */ BLOCK, + /** + * Non block file + */ NON_BLOCK; + /** + * Returns if the fileType is a block file or not + */ public static boolean isBlockFile(FileType fileType) { return fileType.equals(FileType.BLOCK); } + /** + * Returns if the fileName is block file or not + */ public static boolean isBlockFile(String fileName) { if (fileName.contains("_block_")) return true; return false;