diff --git a/server/src/main/java/org/opensearch/index/store/RemoteBufferedIndexOutput.java b/server/src/main/java/org/opensearch/index/store/RemoteBufferedIndexOutput.java new file mode 100644 index 0000000000000..ad44a3bf75678 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/RemoteBufferedIndexOutput.java @@ -0,0 +1,107 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.OutputStreamIndexOutput; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.io.stream.BytesStreamOutput; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Class for output to a file in a {@link RemoteBufferedOutputDirectory}. This is right now used only for writing locks + * in remote store. in the future, we can use it for other operations as well. + * The current limitation of this is we keep all the file content in memory till we call close(), + * So this class should be used to write small files (in MBs). + * TODO: extend the class to continously write to the store if content size in buffer gets higher than a specific size. + * @see RemoteBufferedOutputDirectory + * + * @opensearch.internal + */ +public class RemoteBufferedIndexOutput extends RemoteIndexOutput { + private final BytesStreamOutput out; + private final OutputStreamIndexOutput indexOutputBuffer; + // visible for testing + static final int BUFFER_SIZE = 4096; + + public RemoteBufferedIndexOutput(String name, BlobContainer blobContainer, int bufferSize) { + super(name, blobContainer); + out = new BytesStreamOutput(); + indexOutputBuffer = new OutputStreamIndexOutput(name, name, out, bufferSize); + } + + public RemoteBufferedIndexOutput(String name, BlobContainer blobContainer) { + this(name, blobContainer, BUFFER_SIZE); + } + + // Visible for testing + RemoteBufferedIndexOutput(String name, BlobContainer blobContainer, BytesStreamOutput out, OutputStreamIndexOutput indexOutputBuffer) { + super(name, blobContainer); + this.out = out; + this.indexOutputBuffer = indexOutputBuffer; + } + + @Override + public void copyBytes(DataInput input, long numBytes) throws IOException { + indexOutputBuffer.copyBytes(input, numBytes); + } + + /** + * when we trigger close() to close the stream, we will first flush the buffer to output stream and then write all + * data to blob container and close the output stream. + * + */ + @Override + public void close() throws IOException { + + try (final BytesStreamOutput outStream = out; InputStream stream = out.bytes().streamInput()) { + indexOutputBuffer.close(); + blobContainer.writeBlob(getName(), stream, out.bytes().length(), false); + } + + } + + /** + * This method will write Bytes to the stream we are maintaining. + * + */ + @Override + public void writeByte(byte b) throws IOException { + indexOutputBuffer.writeByte(b); + } + + /** + * This method will write a byte array to the stream we are maintaining. + * + */ + @Override + public void writeBytes(byte[] byteArray, int offset, int length) throws IOException { + indexOutputBuffer.writeBytes(byteArray, offset, length); + } + + /** + * This method will return the file pointer to the current position in the stream. + * + */ + @Override + public long getFilePointer() { + return indexOutputBuffer.getFilePointer(); + } + + /** + * This method will return checksum + * + */ + @Override + public long getChecksum() throws IOException { + return indexOutputBuffer.getChecksum(); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/RemoteBufferedOutputDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteBufferedOutputDirectory.java new file mode 100644 index 0000000000000..b430ae2a6bc9c --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/RemoteBufferedOutputDirectory.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.opensearch.common.blobstore.BlobContainer; + +/** + * A {@code RemoteBufferedOutputDirectory} is an extension of RemoteDirectory which also provides an abstraction layer + * for storing a list of files to a remote store. + * Additionally, with this implementation, creation of new files is also allowed. + * A remoteDirectory contains only files (no sub-folder hierarchy). + * + * @opensearch.internal + */ +public class RemoteBufferedOutputDirectory extends RemoteDirectory { + public RemoteBufferedOutputDirectory(BlobContainer blobContainer) { + super(blobContainer); + } + + @Override + public IndexOutput createOutput(String name, IOContext context) { + return new RemoteBufferedIndexOutput(name, this.blobContainer); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 62e2b12896411..bb2ff47cf0072 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -35,7 +35,7 @@ */ public class RemoteDirectory extends Directory { - private final BlobContainer blobContainer; + protected final BlobContainer blobContainer; public RemoteDirectory(BlobContainer blobContainer) { this.blobContainer = blobContainer; diff --git a/server/src/main/java/org/opensearch/index/store/RemoteIndexOutput.java b/server/src/main/java/org/opensearch/index/store/RemoteIndexOutput.java index 2af65452a6eac..11ea223f529f2 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteIndexOutput.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteIndexOutput.java @@ -28,7 +28,7 @@ */ public class RemoteIndexOutput extends IndexOutput { - private final BlobContainer blobContainer; + protected final BlobContainer blobContainer; public RemoteIndexOutput(String name, BlobContainer blobContainer) { super(name, name); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index c385303813844..9f41ac6f7fd17 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -18,20 +18,23 @@ import org.apache.lucene.store.IndexOutput; import org.opensearch.common.UUIDs; import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.index.store.lockmanager.RemoteStoreCommitLevelLockManager; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; +import org.opensearch.index.store.lockmanager.FileLockInfo; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; import java.io.IOException; import java.nio.file.NoSuchFileException; +import java.util.Map; +import java.util.HashSet; +import java.util.Optional; +import java.util.HashMap; import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -47,7 +50,7 @@ * another instance of {@code RemoteDirectory}. * @opensearch.internal */ -public final class RemoteSegmentStoreDirectory extends FilterDirectory { +public final class RemoteSegmentStoreDirectory extends FilterDirectory implements RemoteStoreCommitLevelLockManager { /** * Each segment file is uploaded with unique suffix. * For example, _0.cfe in local filesystem will be uploaded to remote segment store as _0.cfe__gX7bNIIBrs0AUNsR2yEG @@ -66,6 +69,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory { */ private final RemoteDirectory remoteMetadataDirectory; + private final RemoteStoreLockManager mdLockManager; + /** * To prevent explosion of refresh metadata files, we replace refresh files for the given primary term and generation * This is achieved by uploading refresh metadata file with the same UUID suffix. @@ -87,10 +92,15 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory { private static final Logger logger = LogManager.getLogger(RemoteSegmentStoreDirectory.class); - public RemoteSegmentStoreDirectory(RemoteDirectory remoteDataDirectory, RemoteDirectory remoteMetadataDirectory) throws IOException { + public RemoteSegmentStoreDirectory( + RemoteDirectory remoteDataDirectory, + RemoteDirectory remoteMetadataDirectory, + RemoteStoreLockManager mdLockManager + ) throws IOException { super(remoteDataDirectory); this.remoteDataDirectory = remoteDataDirectory; this.remoteMetadataDirectory = remoteMetadataDirectory; + this.mdLockManager = mdLockManager; init(); } @@ -217,15 +227,13 @@ public int compare(String first, String second) { } } + static String getMetadataFilePrefixForCommit(long primaryTerm, long generation) { + return String.join(SEPARATOR, METADATA_PREFIX, Long.toString(primaryTerm), Long.toString(generation, Character.MAX_RADIX)); + } + // Visible for testing static String getMetadataFilename(long primaryTerm, long generation, String uuid) { - return String.join( - SEPARATOR, - METADATA_PREFIX, - Long.toString(primaryTerm), - Long.toString(generation, Character.MAX_RADIX), - uuid - ); + return String.join(SEPARATOR, getMetadataFilePrefixForCommit(primaryTerm, generation), uuid); } // Visible for testing @@ -317,7 +325,75 @@ public IndexInput openInput(String name, IOContext context) throws IOException { } } - public void copyFrom(Directory from, String src, String dest, IOContext context, boolean useCommonSuffix) throws IOException { + /** + * This acquires a lock on a given commit by creating a lock file in lock directory using {@code FileLockInfo} + * @param primaryTerm Primary Term of index at the time of commit. + * @param generation Commit Generation + * @param acquirerId Lock Acquirer ID which wants to acquire lock on the commit. + * @throws IOException will be thrown in case i) listing file failed or ii) Writing the lock file failed. + * @throws NoSuchFileException when metadata file is not present for given commit point. + */ + @Override + public void acquireLock(long primaryTerm, long generation, String acquirerId) throws IOException { + String metadataFile = getMetadataFileForCommit(primaryTerm, generation); + + mdLockManager.acquire(FileLockInfo.getLockInfoBuilder().withFileToLock(metadataFile).withAcquirerId(acquirerId).build()); + } + + /** + * Releases a lock which was acquired on given segment commit. + * @param primaryTerm Primary Term of index at the time of commit. + * @param generation Commit Generation + * @param acquirerId Acquirer ID for which lock needs to be released. + * @throws IOException will be thrown in case i) listing lock files failed or ii) deleting the lock file failed. + * @throws NoSuchFileException when metadata file is not present for given commit point. + */ + @Override + public void releaseLock(long primaryTerm, long generation, String acquirerId) throws IOException { + String metadataFile = getMetadataFileForCommit(primaryTerm, generation); + mdLockManager.release(FileLockInfo.getLockInfoBuilder().withFileToLock(metadataFile).withAcquirerId(acquirerId).build()); + } + + /** + * Checks if a specific commit have any corresponding lock file. + * @param primaryTerm Primary Term of index at the time of commit. + * @param generation Commit Generation + * @return True if there is at least one lock for given primary term and generation. + * @throws IOException will be thrown in case listing lock files failed. + * @throws NoSuchFileException when metadata file is not present for given commit point. + */ + @Override + public Boolean isLockAcquired(long primaryTerm, long generation) throws IOException { + String metadataFile = getMetadataFileForCommit(primaryTerm, generation); + return mdLockManager.isAcquired(FileLockInfo.getLockInfoBuilder().withFileToLock(metadataFile).build()); + } + + // Visible for testing + String getMetadataFileForCommit(long primaryTerm, long generation) throws IOException { + Collection metadataFiles = remoteMetadataDirectory.listFilesByPrefix( + MetadataFilenameUtils.getMetadataFilePrefixForCommit(primaryTerm, generation) + ); + + if (metadataFiles.isEmpty()) { + throw new NoSuchFileException( + "Metadata file is not present for given primary term " + primaryTerm + " and generation " + generation + ); + } + if (metadataFiles.size() != 1) { + throw new IllegalStateException( + "there should be only one metadata file for given primary term " + + primaryTerm + + "and generation " + + generation + + " but found " + + metadataFiles.size() + ); + } + return metadataFiles.iterator().next(); + } + + public void copyFrom(Directory from, String src, String dest, IOContext context, boolean useCommonSuffix, String checksum) + throws IOException { String remoteFilename; if (useCommonSuffix) { remoteFilename = dest + SEGMENT_NAME_UUID_SEPARATOR + this.commonFilenameSuffix; @@ -325,11 +401,14 @@ public void copyFrom(Directory from, String src, String dest, IOContext context, remoteFilename = getNewRemoteSegmentFilename(dest); } remoteDataDirectory.copyFrom(from, src, remoteFilename, context); - String checksum = getChecksumOfLocalFile(from, src); UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum, from.fileLength(src)); segmentsUploadedToRemoteStore.put(src, segmentMetadata); } + public void copyFrom(Directory from, String src, String dest, IOContext context, boolean useCommonSuffix) throws IOException { + copyFrom(from, src, dest, context, useCommonSuffix, getChecksumOfLocalFile(from, src)); + } + /** * Copies an existing src file from directory from to a non-existent file dest in this directory. * Once the segment is uploaded to remote segment store, update the cache accordingly. @@ -408,6 +487,13 @@ public Map getSegmentsUploadedToRemoteStore() { return Collections.unmodifiableMap(this.segmentsUploadedToRemoteStore); } + public Map getSegmentsUploadedToRemoteStore(long primaryTerm, long generation) throws IOException { + String metadataFile = getMetadataFileForCommit(primaryTerm, generation); + + Map segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(readMetadataFile(metadataFile)); + return Collections.unmodifiableMap(segmentsUploadedToRemoteStore); + } + /** * Delete stale segment and metadata files * One metadata file is kept per commit (refresh updates the same file). To read segments uploaded to remote store, @@ -426,20 +512,47 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException ); return; } - List latestNMetadataFiles = sortedMetadataFileList.subList( - sortedMetadataFileList.size() - lastNMetadataFilesToKeep, - sortedMetadataFileList.size() + + List metadataFilesEligibleToDelete = sortedMetadataFileList.subList( + 0, + sortedMetadataFileList.size() - lastNMetadataFilesToKeep ); + List metadataFilesToBeDeleted = metadataFilesEligibleToDelete.stream().filter(metadataFile -> { + try { + // TODO: add snapshot interop feature flag here as that will be the first feature to use lock + // manager. + boolean lockManagerEnabled = false; + if (!lockManagerEnabled) { + return true; + } + return !isLockAcquired( + MetadataFilenameUtils.getPrimaryTerm(metadataFile.split(MetadataFilenameUtils.SEPARATOR)), + MetadataFilenameUtils.getGeneration(metadataFile.split(MetadataFilenameUtils.SEPARATOR)) + ); + } catch (IOException e) { + logger.error( + "skipping metadata file (" + + metadataFile + + ") deletion for this run," + + " as checking lock for metadata is failing with error: " + + e + ); + return false; + } + }).collect(Collectors.toList()); + + sortedMetadataFileList.removeAll(metadataFilesToBeDeleted); + Map activeSegmentFilesMetadataMap = new HashMap<>(); Set activeSegmentRemoteFilenames = new HashSet<>(); - for (String metadataFile : latestNMetadataFiles) { + for (String metadataFile : sortedMetadataFileList) { Map segmentMetadataMap = readMetadataFile(metadataFile); activeSegmentFilesMetadataMap.putAll(segmentMetadataMap); activeSegmentRemoteFilenames.addAll( segmentMetadataMap.values().stream().map(metadata -> metadata.uploadedFilename).collect(Collectors.toSet()) ); } - for (String metadataFile : sortedMetadataFileList.subList(0, sortedMetadataFileList.size() - lastNMetadataFilesToKeep)) { + for (String metadataFile : metadataFilesToBeDeleted) { Map staleSegmentFilesMetadataMap = readMetadataFile(metadataFile); Set staleSegmentRemoteFilenames = staleSegmentFilesMetadataMap.values() .stream() diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index cb5548167a577..388f80ea3e480 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -13,6 +13,8 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; +import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -28,6 +30,7 @@ * @opensearch.internal */ public class RemoteSegmentStoreDirectoryFactory implements IndexStorePlugin.DirectoryFactory { + private static final String SEGMENTS = "segments"; private final Supplier repositoriesService; @@ -38,17 +41,23 @@ public RemoteSegmentStoreDirectoryFactory(Supplier reposito @Override public Directory newDirectory(IndexSettings indexSettings, ShardPath path) throws IOException { String repositoryName = indexSettings.getRemoteStoreRepository(); + String indexUUID = indexSettings.getIndex().getUUID(); + String shardId = String.valueOf(path.getShardId().getId()); try (Repository repository = repositoriesService.get().repository(repositoryName)) { assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; BlobPath commonBlobPath = ((BlobStoreRepository) repository).basePath(); - commonBlobPath = commonBlobPath.add(indexSettings.getIndex().getUUID()) - .add(String.valueOf(path.getShardId().getId())) - .add("segments"); + commonBlobPath = commonBlobPath.add(indexUUID).add(shardId).add(SEGMENTS); RemoteDirectory dataDirectory = createRemoteDirectory(repository, commonBlobPath, "data"); RemoteDirectory metadataDirectory = createRemoteDirectory(repository, commonBlobPath, "metadata"); + RemoteStoreMetadataLockManager mdLockManager = RemoteStoreLockManagerFactory.newLockManager( + repositoriesService.get(), + repositoryName, + indexUUID, + shardId + ); - return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager); } catch (RepositoryMissingException e) { throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e); } diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/FileLockInfo.java b/server/src/main/java/org/opensearch/index/store/lockmanager/FileLockInfo.java new file mode 100644 index 0000000000000..a8fb7bf20c393 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/FileLockInfo.java @@ -0,0 +1,129 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.lockmanager; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * A Class that defines Info about Remote Store File Lock. + * This is used to provide Remote Store Lock Information and some utility methods for the Lock file. + * @opensearch.internal + */ +public class FileLockInfo implements LockInfo { + private String fileToLock; + private String acquirerId; + + public String getAcquirerId() { + return acquirerId; + } + + public String getFileToLock() { + return fileToLock; + } + + private void setFileToLock(String fileName) { + this.fileToLock = fileName; + } + + private void setAcquirerId(String acquirerId) { + this.acquirerId = acquirerId; + } + + @Override + public String generateLockName() { + validateRequiredParameters(this); + return LockFileUtils.generateLockName(fileToLock, acquirerId); + } + + String getLockPrefix() { + if (fileToLock == null || fileToLock.isBlank()) { + throw new IllegalArgumentException("File to Lock should be provided"); + } + return fileToLock + RemoteStoreLockManagerUtils.SEPARATOR; + } + + List getLocksForAcquirer(String[] lockFiles) { + if (acquirerId == null || acquirerId.isBlank()) { + throw new IllegalArgumentException("Acquirer ID should be provided"); + } + return Arrays.stream(lockFiles) + .filter(lockFile -> acquirerId.equals(LockFileUtils.getAcquirerIdFromLock(lockFile))) + .collect(Collectors.toList()); + } + + public static LockInfoBuilder getLockInfoBuilder() { + return new LockInfoBuilder(); + } + + private static void validateRequiredParameters(FileLockInfo fileLockInfo) { + if (fileLockInfo.getAcquirerId() == null || fileLockInfo.getAcquirerId().isBlank()) { + throw new IllegalArgumentException("Acquirer ID should be provided"); + } + if (fileLockInfo.getFileToLock() == null || fileLockInfo.getFileToLock().isBlank()) { + throw new IllegalArgumentException("File to Lock should be provided"); + } + } + + static class LockFileUtils { + static String generateLockName(String fileToLock, String acquirerId) { + return String.join(RemoteStoreLockManagerUtils.SEPARATOR, fileToLock, acquirerId) + + RemoteStoreLockManagerUtils.LOCK_FILE_EXTENSION; + } + + public static String getFileToLockNameFromLock(String lockName) { + String[] lockNameTokens = lockName.split(RemoteStoreLockManagerUtils.SEPARATOR); + + if (lockNameTokens.length != 2) { + throw new IllegalArgumentException("Provided Lock Name " + lockName + " is not Valid."); + } + return lockNameTokens[0]; + } + + public static String getAcquirerIdFromLock(String lockName) { + String[] lockNameTokens = lockName.split(RemoteStoreLockManagerUtils.SEPARATOR); + + if (lockNameTokens.length != 2) { + throw new IllegalArgumentException("Provided Lock Name " + lockName + " is not Valid."); + } + return lockNameTokens[1].replace(RemoteStoreLockManagerUtils.LOCK_FILE_EXTENSION, ""); + } + } + + /** + * A Builder Class to build an Instance of {@code FileLockInfo} + * @opensearch.internal + */ + public static class LockInfoBuilder implements LockInfo.LockInfoBuilder { + private final FileLockInfo lockFileInfo; + + LockInfoBuilder() { + this.lockFileInfo = new FileLockInfo(); + } + + public LockInfoBuilder withFileToLock(String fileToLock) { + lockFileInfo.setFileToLock(fileToLock); + return this; + } + + public LockInfoBuilder withAcquirerId(String acquirerId) { + lockFileInfo.setAcquirerId(acquirerId); + return this; + } + + @Override + public FileLockInfo build() { + if (lockFileInfo.fileToLock == null && lockFileInfo.acquirerId == null) { + throw new IllegalStateException("Either File to Lock or AcquirerId should be provided to instantiate FileLockInfo"); + } + return lockFileInfo; + } + } +} diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/LockInfo.java b/server/src/main/java/org/opensearch/index/store/lockmanager/LockInfo.java new file mode 100644 index 0000000000000..1470969540d36 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/LockInfo.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.lockmanager; + +/** + * An Interface that defines Remote Store Lock Information. + * Individual Implemented Classes of this interface can decide how the lock should look like and its contents. + * @opensearch.internal + */ +public interface LockInfo { + /** + * A function which generates the lock name on the basis of given information. + * @return the name of the lock. + */ + String generateLockName(); + + /** + * An Interface that defines a Lock Info Builder. + */ + public static interface LockInfoBuilder { + /** + * Method to build Lock Info Instance. + */ + public LockInfo build(); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreCommitLevelLockManager.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreCommitLevelLockManager.java new file mode 100644 index 0000000000000..20b6ded9f2401 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreCommitLevelLockManager.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.lockmanager; + +import java.io.IOException; + +/** + * An Interface that defines Commit Level Lock in Remote Store. We can lock the segment files corresponding to a given + * primaryTerm and Commit Generation. + * + * @opensearch.internal + */ +public interface RemoteStoreCommitLevelLockManager { + /** + * + * This method will be used to acquire lock on segment files of a specific commit. + * @param primaryTerm Primary Term of index at the time of commit. + * @param generation Commit Generation + * @param acquirerId Resource ID which wants to acquire lock on the commit. + * @throws IOException in case there is a problem in acquiring lock on a commit. + */ + void acquireLock(long primaryTerm, long generation, String acquirerId) throws IOException; + + /** + * This method will be used to release lock on segment files of a specific commit, which got acquired by given + * resource. + * @param primaryTerm Primary Term of index at the time of commit. + * @param generation Commit Generation + * @param acquirerId Resource ID for which lock needs to be released. + * @throws IOException in case there is a problem in releasing lock on a commit. + */ + void releaseLock(long primaryTerm, long generation, String acquirerId) throws IOException; + + /** + * This method will be used to check if a specific commit have any lock acquired on it or not. + * @param primaryTerm Primary Term of index at the time of commit. + * @param generation Commit Generation + * @return true if given commit is locked, else false. + * @throws IOException in case there is a problem in checking if a commit is locked or not. + */ + Boolean isLockAcquired(long primaryTerm, long generation) throws IOException; +} diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java new file mode 100644 index 0000000000000..ce657627fcfc6 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.lockmanager; + +import java.io.IOException; + +/** + * An Interface that defines Remote Store Lock Manager. + * This will provide the functionality to acquire lock, release lock or to check if a lock is acquired on a specific + * file in remote store. + * @opensearch.internal + */ +public interface RemoteStoreLockManager { + /** + * + * @param lockInfo lock info instance for which we need to acquire lock. + * @throws IOException throws exception in case there is a problem with acquiring lock. + */ + public void acquire(LockInfo lockInfo) throws IOException; + + /** + * + * @param lockInfo lock info instance for which lock need to be removed. + * @throws IOException throws exception in case there is a problem in releasing lock. + */ + void release(LockInfo lockInfo) throws IOException; + + /** + * + * @param lockInfo lock info instance for which we need to check if lock is acquired. + * @return whether a lock is acquired on the given lock info. + * @throws IOException throws exception in case there is a problem in checking if a given file is locked or not. + */ + Boolean isAcquired(LockInfo lockInfo) throws IOException; +} diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerFactory.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerFactory.java new file mode 100644 index 0000000000000..e866551eae143 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerFactory.java @@ -0,0 +1,70 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.lockmanager; + +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.index.store.RemoteBufferedOutputDirectory; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.RepositoryMissingException; +import org.opensearch.repositories.blobstore.BlobStoreRepository; + +import java.io.IOException; +import java.util.function.Supplier; + +/** + * Factory for remote store lock manager + * + * @opensearch.internal + */ +public class RemoteStoreLockManagerFactory { + private static final String SEGMENTS = "segments"; + private static final String LOCK_FILES = "lock_files"; + private final Supplier repositoriesService; + + public RemoteStoreLockManagerFactory(Supplier repositoriesService) { + this.repositoriesService = repositoriesService; + } + + public RemoteStoreMetadataLockManager newLockManager(String repositoryName, String indexUUID, String shardId) throws IOException { + return newLockManager(repositoriesService.get(), repositoryName, indexUUID, shardId); + } + + public static RemoteStoreMetadataLockManager newLockManager( + RepositoriesService repositoriesService, + String repositoryName, + String indexUUID, + String shardId + ) throws IOException { + try (Repository repository = repositoriesService.repository(repositoryName)) { + assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; + BlobPath shardLevelBlobPath = ((BlobStoreRepository) repository).basePath().add(indexUUID).add(shardId).add(SEGMENTS); + RemoteBufferedOutputDirectory shardMDLockDirectory = createRemoteBufferedOutputDirectory( + repository, + shardLevelBlobPath, + LOCK_FILES + ); + + return new RemoteStoreMetadataLockManager(shardMDLockDirectory); + } catch (RepositoryMissingException e) { + throw new IllegalArgumentException("Repository should be present to acquire/release lock", e); + } + } + + private static RemoteBufferedOutputDirectory createRemoteBufferedOutputDirectory( + Repository repository, + BlobPath commonBlobPath, + String extention + ) { + BlobPath extendedPath = commonBlobPath.add(extention); + BlobContainer dataBlobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(extendedPath); + return new RemoteBufferedOutputDirectory(dataBlobContainer); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerUtils.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerUtils.java new file mode 100644 index 0000000000000..452dfc329d88b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerUtils.java @@ -0,0 +1,23 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.lockmanager; + +/** + * Utility class for remote store lock manager, + * right now only have constants defined, we can add methods as well here in the future. + * @opensearch.internal + */ +public class RemoteStoreLockManagerUtils { + static final String FILE_TO_LOCK_NAME = "file_to_lock"; + static final String SEPARATOR = "___"; + static final String LOCK_FILE_EXTENSION = ".lock"; + static final String ACQUIRER_ID = "acquirer_id"; + public static final String NO_TTL = "-1"; + static final String LOCK_EXPIRY_TIME = "lock_expiry_time"; +} diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java new file mode 100644 index 0000000000000..41665ebe47600 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java @@ -0,0 +1,86 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.lockmanager; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.opensearch.index.store.RemoteBufferedOutputDirectory; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +/** + * A Class that implements Remote Store Lock Manager by creating lock files for the remote store files that needs to + * be locked. + * It uses {@code LockFileInfo} instance to get the information about the lock file on which operations need to + * be executed. + * + * @opensearch.internal + */ +public class RemoteStoreMetadataLockManager implements RemoteStoreLockManager { + private static final Logger logger = LogManager.getLogger(RemoteStoreMetadataLockManager.class); + private final RemoteBufferedOutputDirectory lockDirectory; + + public RemoteStoreMetadataLockManager(RemoteBufferedOutputDirectory lockDirectory) { + this.lockDirectory = lockDirectory; + } + + /** + * Acquires lock on the file mentioned in LockInfo Instance. + * @param lockInfo File Lock Info instance for which we need to acquire lock. + * @throws IOException in case there is some failure while acquiring lock. + */ + @Override + public void acquire(LockInfo lockInfo) throws IOException { + assert lockInfo instanceof FileLockInfo : "lockInfo should be instance of FileLockInfo"; + IndexOutput indexOutput = lockDirectory.createOutput(lockInfo.generateLockName(), IOContext.DEFAULT); + indexOutput.close(); + } + + /** + * Releases Locks acquired by a given acquirer which is passed in LockInfo Instance. + * Right now this method is only used to release locks for a given acquirer, + * This can be extended in future to handle other cases as well, like: + * - release lock for given fileToLock and AcquirerId + * - release all locks for given fileToLock + * @param lockInfo File Lock Info instance for which lock need to be removed. + * @throws IOException in case there is some failure in releasing locks. + */ + @Override + public void release(LockInfo lockInfo) throws IOException { + assert lockInfo instanceof FileLockInfo : "lockInfo should be instance of FileLockInfo"; + String[] lockFiles = lockDirectory.listAll(); + + // ideally there should be only one lock per acquirer, but just to handle any stale locks, + // we try to release all the locks for the acquirer. + List locksToRelease = ((FileLockInfo) lockInfo).getLocksForAcquirer(lockFiles); + if (locksToRelease.size() > 1) { + logger.warn(locksToRelease.size() + " locks found for acquirer " + ((FileLockInfo) lockInfo).getAcquirerId()); + } + for (String lock : locksToRelease) { + lockDirectory.deleteFile(lock); + } + } + + /** + * Checks whether a given file have any lock on it or not. + * @param lockInfo File Lock Info instance for which we need to check if lock is acquired. + * @return true if lock is acquired on a file, else false. + * @throws IOException in case there is some failure in checking locks for a file. + */ + @Override + public Boolean isAcquired(LockInfo lockInfo) throws IOException { + assert lockInfo instanceof FileLockInfo : "lockInfo should be instance of FileLockInfo"; + Collection lockFiles = lockDirectory.listFilesByPrefix(((FileLockInfo) lockInfo).getLockPrefix()); + return !lockFiles.isEmpty(); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/package-info.java b/server/src/main/java/org/opensearch/index/store/lockmanager/package-info.java new file mode 100644 index 0000000000000..d68edd02696be --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Package containing classes for remote segment store md lock manager + */ +package org.opensearch.index.store.lockmanager; diff --git a/server/src/test/java/org/opensearch/index/store/RemoteBufferedIndexOutputTests.java b/server/src/test/java/org/opensearch/index/store/RemoteBufferedIndexOutputTests.java new file mode 100644 index 0000000000000..b9200d71ac226 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/RemoteBufferedIndexOutputTests.java @@ -0,0 +1,163 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.OutputStreamIndexOutput; +import org.junit.After; +import org.junit.Before; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; + +public class RemoteBufferedIndexOutputTests extends OpenSearchTestCase { + + private static final String FILENAME = "segment_1"; + + private BlobContainer blobContainer; + + private BytesStreamOutput out; + + private OutputStreamIndexOutput indexOutputBuffer; + + private RemoteBufferedIndexOutput remoteBufferedIndexOutput; + + @Before + public void setup() { + blobContainer = mock(BlobContainer.class); + out = new BytesStreamOutput(); + indexOutputBuffer = new OutputStreamIndexOutput(FILENAME, FILENAME, out, RemoteBufferedIndexOutput.BUFFER_SIZE); + remoteBufferedIndexOutput = new RemoteBufferedIndexOutput(FILENAME, blobContainer, out, indexOutputBuffer); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + try (final BytesStreamOutput outStream = out) { + indexOutputBuffer.close(); + } catch (IOException e) { + // do nothing + } + } + + public void testCopyBytes() throws IOException { + String testData = "testData"; + IndexInput indexInput = new ByteArrayIndexInput("blobName", testData.getBytes(StandardCharsets.UTF_8)); + remoteBufferedIndexOutput.copyBytes(indexInput, indexInput.length()); + indexOutputBuffer.getChecksum(); // calling getChecksum() to flush the buffer. + assertEquals(out.bytes().utf8ToString(), testData); + out.reset(); + } + + public void testCopyBytesException() throws IOException { + OutputStreamIndexOutput indexOutputBufferMock = mock(OutputStreamIndexOutput.class); + IndexInput indexInput = mock(IndexInput.class); + RemoteBufferedIndexOutput bufferedIndexOutputUsingMock = new RemoteBufferedIndexOutput( + FILENAME, + blobContainer, + out, + indexOutputBufferMock + ); + doThrow(new IOException("Test Induced Failure")).when(indexOutputBufferMock).copyBytes(eq(indexInput), eq(100L)); + + assertThrows(IOException.class, () -> bufferedIndexOutputUsingMock.copyBytes(indexInput, 100)); + } + + public void testWriteBytes() throws IOException { + byte[] b = new byte[] { Byte.MAX_VALUE }; + remoteBufferedIndexOutput.writeBytes(b, 0, b.length); + indexOutputBuffer.getChecksum(); // calling getChecksum() to flush the buffer. + assertArrayEquals(b, BytesReference.toBytes(out.bytes())); + out.reset(); + } + + public void testClose() throws IOException { + BytesReference mockBytesReference = mock(BytesReference.class); + BytesStreamOutput outStream = mock(BytesStreamOutput.class); + when(outStream.bytes()).thenReturn(mockBytesReference); + when(mockBytesReference.streamInput()).thenReturn(mock(StreamInput.class)); + + OutputStreamIndexOutput indexOutputBufferMock = mock(OutputStreamIndexOutput.class); + RemoteBufferedIndexOutput bufferedIndexOutputUsingMock = new RemoteBufferedIndexOutput( + FILENAME, + blobContainer, + outStream, + indexOutputBufferMock + ); + + bufferedIndexOutputUsingMock.close(); + verify(blobContainer).writeBlob(eq(FILENAME), any(StreamInput.class), anyLong(), eq(false)); + verify(indexOutputBufferMock).close(); + verify(outStream).close(); + } + + public void testCloseException() throws IOException { + BytesReference mockBytesReference = mock(BytesReference.class); + BytesStreamOutput outStream = mock(BytesStreamOutput.class); + when(outStream.bytes()).thenReturn(mockBytesReference); + StreamInput streamInputMock = mock(StreamInput.class); + when(mockBytesReference.streamInput()).thenReturn(streamInputMock); + doThrow(new IOException("Test Induced Failure")).when(streamInputMock).close(); + + OutputStreamIndexOutput indexOutputBufferMock = mock(OutputStreamIndexOutput.class); + RemoteBufferedIndexOutput bufferedIndexOutputUsingMock = new RemoteBufferedIndexOutput( + FILENAME, + blobContainer, + outStream, + indexOutputBufferMock + ); + try { + bufferedIndexOutputUsingMock.close(); + } catch (Exception e) { + // do nothing + } + verify(blobContainer).writeBlob(eq(FILENAME), any(StreamInput.class), anyLong(), eq(false)); + verify(indexOutputBufferMock).close(); + verify(outStream).close(); + + } + + public void testCloseException2() throws IOException { + BytesReference mockBytesReference = mock(BytesReference.class); + BytesStreamOutput outStream = mock(BytesStreamOutput.class); + when(outStream.bytes()).thenReturn(mockBytesReference); + StreamInput streamInputMock = mock(StreamInput.class); + when(mockBytesReference.streamInput()).thenReturn(streamInputMock); + + OutputStreamIndexOutput indexOutputBufferMock = mock(OutputStreamIndexOutput.class); + doThrow(new IOException("Test Induced Failure")).when(indexOutputBufferMock).close(); + RemoteBufferedIndexOutput bufferedIndexOutputUsingMock = new RemoteBufferedIndexOutput( + FILENAME, + blobContainer, + outStream, + indexOutputBufferMock + ); + try { + bufferedIndexOutputUsingMock.close(); + } catch (Exception e) { + // do nothing + } + verify(indexOutputBufferMock).close(); + verify(outStream).close(); + } +} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteBufferedOutputDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteBufferedOutputDirectoryTests.java new file mode 100644 index 0000000000000..4fec8c9bc37af --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/RemoteBufferedOutputDirectoryTests.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.IOContext; +import org.junit.Before; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.test.OpenSearchTestCase; + +import static org.mockito.Mockito.mock; + +public class RemoteBufferedOutputDirectoryTests extends OpenSearchTestCase { + + private BlobContainer blobContainer; + private RemoteBufferedOutputDirectory remoteBufferedOutputDirectory; + + @Before + public void setup() { + blobContainer = mock(BlobContainer.class); + remoteBufferedOutputDirectory = new RemoteBufferedOutputDirectory(blobContainer); + } + + public void testCreateOutput() { + String testBlobName = "testBlob"; + assert (remoteBufferedOutputDirectory.createOutput(testBlobName, IOContext.DEFAULT) instanceof RemoteBufferedIndexOutput); + + } +} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java index 7be86aa0d96a4..7a9cbc12d823b 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java @@ -72,13 +72,14 @@ public void testNewDirectory() throws IOException { try (Directory directory = remoteSegmentStoreDirectoryFactory.newDirectory(indexSettings, shardPath)) { assertTrue(directory instanceof RemoteSegmentStoreDirectory); ArgumentCaptor blobPathCaptor = ArgumentCaptor.forClass(BlobPath.class); - verify(blobStore, times(2)).blobContainer(blobPathCaptor.capture()); + verify(blobStore, times(3)).blobContainer(blobPathCaptor.capture()); List blobPaths = blobPathCaptor.getAllValues(); assertEquals("base_path/uuid_1/0/segments/data/", blobPaths.get(0).buildAsString()); assertEquals("base_path/uuid_1/0/segments/metadata/", blobPaths.get(1).buildAsString()); + assertEquals("base_path/uuid_1/0/segments/lock_files/", blobPaths.get(2).buildAsString()); verify(blobContainer).listBlobsByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX); - verify(repositoriesService).repository("remote_store_repository"); + verify(repositoriesService, times(2)).repository("remote_store_repository"); } } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 49a2d50dfae06..10295ffc56812 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -23,32 +23,34 @@ import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; import java.nio.file.NoSuchFileException; import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.HashMap; +import java.util.Collection; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.startsWith; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.doReturn; public class RemoteSegmentStoreDirectoryTests extends OpenSearchTestCase { private RemoteDirectory remoteDataDirectory; private RemoteDirectory remoteMetadataDirectory; + private RemoteStoreMetadataLockManager mdLockManager; private RemoteSegmentStoreDirectory remoteSegmentStoreDirectory; @@ -56,8 +58,9 @@ public class RemoteSegmentStoreDirectoryTests extends OpenSearchTestCase { public void setup() throws IOException { remoteDataDirectory = mock(RemoteDirectory.class); remoteMetadataDirectory = mock(RemoteDirectory.class); + mdLockManager = mock(RemoteStoreMetadataLockManager.class); - remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(remoteDataDirectory, remoteMetadataDirectory); + remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(remoteDataDirectory, remoteMetadataDirectory, mdLockManager); } public void testUploadedSegmentMetadataToString() { @@ -345,6 +348,135 @@ public void testOpenInputException() throws IOException { assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.openInput("_0.si", IOContext.DEFAULT)); } + public void testAcquireLock() throws IOException { + populateMetadata(); + remoteSegmentStoreDirectory.init(); + String mdFile = "xyz"; + String acquirerId = "test-acquirer"; + long testPrimaryTerm = 1; + long testGeneration = 5; + + List metadataFiles = List.of("metadata__1__5__abc"); + when( + remoteMetadataDirectory.listFilesByPrefix( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration) + ) + ).thenReturn(metadataFiles); + + remoteSegmentStoreDirectory.acquireLock(testPrimaryTerm, testGeneration, acquirerId); + verify(mdLockManager).acquire(any()); + } + + public void testAcquireLockNoSuchFile() throws IOException { + populateMetadata(); + remoteSegmentStoreDirectory.init(); + String testAcquirerId = "test-acquirer"; + long testPrimaryTerm = 2; + long testGeneration = 3; + + assertThrows( + NoSuchFileException.class, + () -> remoteSegmentStoreDirectory.acquireLock(testPrimaryTerm, testGeneration, testAcquirerId) + ); + } + + public void testReleaseLock() throws IOException { + populateMetadata(); + remoteSegmentStoreDirectory.init(); + String testAcquirerId = "test-acquirer"; + long testPrimaryTerm = 1; + long testGeneration = 5; + + List metadataFiles = List.of("metadata__1__5__abc"); + when( + remoteMetadataDirectory.listFilesByPrefix( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration) + ) + ).thenReturn(metadataFiles); + + remoteSegmentStoreDirectory.releaseLock(testPrimaryTerm, testGeneration, testAcquirerId); + verify(mdLockManager).release(any()); + } + + public void testIsAcquired() throws IOException { + populateMetadata(); + remoteSegmentStoreDirectory.init(); + long testPrimaryTerm = 1; + long testGeneration = 5; + + List metadataFiles = List.of("metadata__1__5__abc"); + when( + remoteMetadataDirectory.listFilesByPrefix( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration) + ) + ).thenReturn(metadataFiles); + + remoteSegmentStoreDirectory.isLockAcquired(testPrimaryTerm, testGeneration); + verify(mdLockManager).isAcquired(any()); + } + + public void testIsAcquiredException() throws IOException { + populateMetadata(); + remoteSegmentStoreDirectory.init(); + long testPrimaryTerm = 1; + long testGeneration = 5; + + List metadataFiles = new ArrayList<>(); + when( + remoteMetadataDirectory.listFilesByPrefix( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration) + ) + ).thenReturn(metadataFiles); + + assertThrows(NoSuchFileException.class, () -> remoteSegmentStoreDirectory.isLockAcquired(testPrimaryTerm, testGeneration)); + } + + public void testGetMetadataFileForCommit() throws IOException { + long testPrimaryTerm = 2; + long testGeneration = 3; + List metadataFiles = List.of( + "metadata__1__5__abc", + "metadata__" + testPrimaryTerm + "__" + testGeneration + "__pqr", + "metadata__2__1__zxv" + ); + when( + remoteMetadataDirectory.listFilesByPrefix( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration) + ) + ).thenReturn(List.of("metadata__" + testPrimaryTerm + "__" + testGeneration + "__pqr")); + + String output = remoteSegmentStoreDirectory.getMetadataFileForCommit(testPrimaryTerm, testGeneration); + assertEquals("metadata__" + testPrimaryTerm + "__" + testGeneration + "__pqr", output); + + } + + public void testGetSegmentsUploadedToRemoteStore() throws IOException { + long testPrimaryTerm = 1; + long testGeneration = 5; + + List metadataFiles = List.of("metadata__1__5__abc"); + when( + remoteMetadataDirectory.listFilesByPrefix( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration) + ) + ).thenReturn(metadataFiles); + + Map> metadataFilenameContentMapping = Map.of( + "metadata__1__5__abc", + getDummyMetadata("_0", 5), + "metadata__1__6__pqr", + getDummyMetadata("_0", 6), + "metadata__2__1__zxv", + getDummyMetadata("_0", 1) + ); + + when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn( + createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__1__5__abc")) + ); + + assert (remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore(testPrimaryTerm, testGeneration).containsKey("segments_5")); + } + public void testCopyFrom() throws IOException { String filename = "_100.si"; populateMetadata(); diff --git a/server/src/test/java/org/opensearch/index/store/lockmanager/FileLockInfoTests.java b/server/src/test/java/org/opensearch/index/store/lockmanager/FileLockInfoTests.java new file mode 100644 index 0000000000000..95af53cb6e5ec --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/lockmanager/FileLockInfoTests.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.lockmanager; + +import org.opensearch.test.OpenSearchTestCase; + +import java.util.List; + +public class FileLockInfoTests extends OpenSearchTestCase { + String testMetadata = "testMetadata"; + String testAcquirerId = "testAcquirerId"; + + public void testGenerateLockName() { + FileLockInfo fileLockInfo = FileLockInfo.getLockInfoBuilder().withFileToLock(testMetadata).withAcquirerId(testAcquirerId).build(); + assertEquals(fileLockInfo.generateLockName(), FileLockInfo.LockFileUtils.generateLockName(testMetadata, testAcquirerId)); + } + + public void testGenerateLockNameFailureCase1() { + FileLockInfo fileLockInfo = FileLockInfo.getLockInfoBuilder().withFileToLock(testMetadata).build(); + assertThrows(IllegalArgumentException.class, fileLockInfo::generateLockName); + } + + public void testGenerateLockNameFailureCase2() { + FileLockInfo fileLockInfo = FileLockInfo.getLockInfoBuilder().withAcquirerId(testAcquirerId).build(); + assertThrows(IllegalArgumentException.class, fileLockInfo::generateLockName); + } + + public void testGetLockPrefix() { + FileLockInfo fileLockInfo = FileLockInfo.getLockInfoBuilder().withFileToLock(testMetadata).build(); + assertEquals(fileLockInfo.getLockPrefix(), testMetadata + RemoteStoreLockManagerUtils.SEPARATOR); + } + + public void testGetLockPrefixFailureCase() { + FileLockInfo fileLockInfo = FileLockInfo.getLockInfoBuilder().withAcquirerId(testAcquirerId).build(); + assertThrows(IllegalArgumentException.class, fileLockInfo::getLockPrefix); + } + + public void testGetLocksForAcquirer() { + String[] locks = new String[] { + FileLockInfo.LockFileUtils.generateLockName(testMetadata, testAcquirerId), + FileLockInfo.LockFileUtils.generateLockName(testMetadata, "acquirerId2") }; + FileLockInfo fileLockInfo = FileLockInfo.getLockInfoBuilder().withAcquirerId(testAcquirerId).build(); + + assertEquals( + fileLockInfo.getLocksForAcquirer(locks), + List.of(FileLockInfo.LockFileUtils.generateLockName(testMetadata, testAcquirerId)) + ); + } + +} diff --git a/server/src/test/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerFactoryTests.java b/server/src/test/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerFactoryTests.java new file mode 100644 index 0000000000000..61b4cc2176134 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerFactoryTests.java @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.lockmanager; + +import org.junit.Before; +import org.mockito.ArgumentCaptor; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class RemoteStoreLockManagerFactoryTests extends OpenSearchTestCase { + + private Supplier repositoriesServiceSupplier; + private RepositoriesService repositoriesService; + private RemoteStoreLockManagerFactory remoteStoreLockManagerFactory; + + @Before + public void setup() throws IOException { + repositoriesServiceSupplier = mock(Supplier.class); + repositoriesService = mock(RepositoriesService.class); + when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService); + remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(repositoriesServiceSupplier); + } + + public void testNewLockManager() throws IOException { + + String testRepository = "testRepository"; + String testIndexUUID = "testIndexUUID"; + String testShardId = "testShardId"; + + BlobStoreRepository repository = mock(BlobStoreRepository.class); + BlobStore blobStore = mock(BlobStore.class); + BlobContainer blobContainer = mock(BlobContainer.class); + when(repository.blobStore()).thenReturn(blobStore); + when(repository.basePath()).thenReturn(new BlobPath().add("base_path")); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + when(blobContainer.listBlobs()).thenReturn(Collections.emptyMap()); + + when(repositoriesService.repository(testRepository)).thenReturn(repository); + + RemoteStoreLockManager lockManager = remoteStoreLockManagerFactory.newLockManager(testRepository, testIndexUUID, testShardId); + + assertTrue(lockManager != null); + ArgumentCaptor blobPathCaptor = ArgumentCaptor.forClass(BlobPath.class); + verify(blobStore, times(1)).blobContainer(blobPathCaptor.capture()); + List blobPaths = blobPathCaptor.getAllValues(); + assertEquals("base_path/" + testIndexUUID + "/" + testShardId + "/segments/lock_files/", blobPaths.get(0).buildAsString()); + + verify(repositoriesService).repository(testRepository); + } + +} diff --git a/server/src/test/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManagerTests.java b/server/src/test/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManagerTests.java new file mode 100644 index 0000000000000..2a3851514db3c --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManagerTests.java @@ -0,0 +1,98 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.lockmanager; + +import junit.framework.TestCase; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.junit.Before; +import org.opensearch.index.store.RemoteBufferedOutputDirectory; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; + +public class RemoteStoreMetadataLockManagerTests extends OpenSearchTestCase { + private RemoteBufferedOutputDirectory lockDirectory; + private RemoteStoreMetadataLockManager remoteStoreMetadataLockManager; + String testLockName = "testLock"; + String testMetadata = "testMetadata"; + String testAcquirerId = "testAcquirerId"; + + @Before + public void setup() throws IOException { + lockDirectory = mock(RemoteBufferedOutputDirectory.class); + + remoteStoreMetadataLockManager = new RemoteStoreMetadataLockManager(lockDirectory); + } + + private Collection getListOfLocksMock() { + return Arrays.asList( + String.join(RemoteStoreLockManagerUtils.SEPARATOR, testMetadata, testAcquirerId) + + RemoteStoreLockManagerUtils.LOCK_FILE_EXTENSION, + String.join(RemoteStoreLockManagerUtils.SEPARATOR, testMetadata, "acquirerId2") + + RemoteStoreLockManagerUtils.LOCK_FILE_EXTENSION + ); + } + + public void testAcquire() throws IOException { + IndexOutput indexOutput = mock(IndexOutput.class); + FileLockInfo testLockInfo = FileLockInfo.getLockInfoBuilder().withFileToLock(testMetadata).withAcquirerId(testAcquirerId).build(); + when(lockDirectory.createOutput(eq(testLockInfo.generateLockName()), eq(IOContext.DEFAULT))).thenReturn(indexOutput); + remoteStoreMetadataLockManager.acquire(testLockInfo); + verify(indexOutput).close(); + } + + public void testAcquireOnlyFileToLockPassed() { // only fileToLock was passed to acquire call. + IndexOutput indexOutput = mock(IndexOutput.class); + when(lockDirectory.createOutput(eq(testLockName), eq(IOContext.DEFAULT))).thenReturn(indexOutput); + FileLockInfo testLockInfo = FileLockInfo.getLockInfoBuilder().withFileToLock(testMetadata).build(); + assertThrows(IllegalArgumentException.class, () -> remoteStoreMetadataLockManager.acquire(testLockInfo)); + } + + public void testAcquireOnlyAcquirerIdPassed() { // only AcquirerId was passed to acquire call. + IndexOutput indexOutput = mock(IndexOutput.class); + when(lockDirectory.createOutput(eq(testLockName), eq(IOContext.DEFAULT))).thenReturn(indexOutput); + LockInfo testLockInfo = FileLockInfo.getLockInfoBuilder().withAcquirerId(testAcquirerId).build(); + assertThrows(IllegalArgumentException.class, () -> remoteStoreMetadataLockManager.acquire(testLockInfo)); + } + + public void testRelease() throws IOException { + when(lockDirectory.listAll()).thenReturn(getListOfLocksMock().toArray(new String[0])); + FileLockInfo testLockInfo = FileLockInfo.getLockInfoBuilder().withAcquirerId(testAcquirerId).build(); + + remoteStoreMetadataLockManager.release(testLockInfo); + verify(lockDirectory).deleteFile( + String.join(RemoteStoreLockManagerUtils.SEPARATOR, testMetadata, testAcquirerId) + + RemoteStoreLockManagerUtils.LOCK_FILE_EXTENSION + ); + } + + public void testReleaseExceptionCase() { // acquirerId is Not passed during release lock call. + FileLockInfo testLockInfo = FileLockInfo.getLockInfoBuilder().withFileToLock(testMetadata).build(); + assertThrows(IllegalArgumentException.class, () -> remoteStoreMetadataLockManager.release(testLockInfo)); + } + + public void testIsAcquired() throws IOException { + FileLockInfo testLockInfo = FileLockInfo.getLockInfoBuilder().withFileToLock(testMetadata).build(); + when(lockDirectory.listFilesByPrefix(testLockInfo.getLockPrefix())).thenReturn(getListOfLocksMock()); + TestCase.assertTrue(remoteStoreMetadataLockManager.isAcquired(testLockInfo)); + } + + public void testIsAcquiredExceptionCase() { // metadata file is not passed during isAcquired call. + FileLockInfo testLockInfo = FileLockInfo.getLockInfoBuilder().withAcquirerId(testAcquirerId).build(); + assertThrows(IllegalArgumentException.class, () -> remoteStoreMetadataLockManager.isAcquired(testLockInfo)); + } +} diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index bb3b016560fa7..b3833655ab1ea 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -633,7 +633,7 @@ protected Store createRemoteStore(Path path, ShardRouting shardRouting, IndexMet ShardPath remoteShardPath = new ShardPath(false, remoteNodePath.resolve(shardId), remoteNodePath.resolve(shardId), shardId); RemoteDirectory dataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); RemoteDirectory metadataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); - RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory); + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, null); return createStore(shardId, new IndexSettings(metadata, nodeSettings), remoteSegmentStoreDirectory); }