diff --git a/CHANGELOG.md b/CHANGELOG.md index 8132c1281e412..979314505aa6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Github workflow for changelog verification ([#4085](https://github.com/opensearch-project/OpenSearch/pull/4085)) ### Changed - - Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308)) +- Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308)) +- Use RemoteSegmentStoreDirectory instead of RemoteDirectory ([#4240](https://github.com/opensearch-project/OpenSearch/pull/4240)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index f8604caeab414..e52a2ba39ed52 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -70,7 +70,6 @@ import org.opensearch.index.shard.SearchOperationListener; import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.store.FsDirectoryFactory; -import org.opensearch.index.store.RemoteDirectoryFactory; import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; @@ -487,7 +486,7 @@ public IndexService newIndexService( NamedWriteableRegistry namedWriteableRegistry, BooleanSupplier idFieldDataEnabled, ValuesSourceRegistry valuesSourceRegistry, - RemoteDirectoryFactory remoteDirectoryFactory + IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory ) throws IOException { final IndexEventListener eventListener = freeze(); Function> readerWrapperFactory = indexReaderWrapper diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 67a8e691fda0d..670af1f1c6fd9 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -48,8 +48,6 @@ import org.apache.lucene.search.Sort; import org.apache.lucene.search.UsageTrackingQueryCachingPolicy; import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.util.SetOnce; import org.apache.lucene.util.ThreadInterruptedException; import org.opensearch.Assertions; @@ -3228,8 +3226,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro final List internalRefreshListener = new ArrayList<>(); internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric)); if (isRemoteStoreEnabled()) { - Directory remoteDirectory = ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate(); - internalRefreshListener.add(new RemoteStoreRefreshListener(store.directory(), remoteDirectory)); + internalRefreshListener.add(new RemoteStoreRefreshListener(this)); } if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) { internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher)); diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 4b549ec485c0e..0d32e8d56e4d2 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -11,32 +11,54 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.index.engine.EngineException; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; import java.io.IOException; -import java.nio.file.NoSuchFileException; -import java.util.Arrays; -import java.util.HashSet; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; /** * RefreshListener implementation to upload newly created segment files to the remote store + * + * @opensearch.internal */ -public class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener { +public final class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener { + // Visible for testing + static final Set EXCLUDE_FILES = Set.of("write.lock"); + // Visible for testing + static final int LAST_N_METADATA_FILES_TO_KEEP = 10; + private final IndexShard indexShard; private final Directory storeDirectory; - private final Directory remoteDirectory; - // ToDo: This can be a map with metadata of the uploaded file as value of the map (GitHub #3398) - private final Set filesUploadedToRemoteStore; + private final RemoteSegmentStoreDirectory remoteDirectory; + private final Map localSegmentChecksumMap; + private long primaryTerm; private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class); - public RemoteStoreRefreshListener(Directory storeDirectory, Directory remoteDirectory) throws IOException { - this.storeDirectory = storeDirectory; - this.remoteDirectory = remoteDirectory; - // ToDo: Handle failures in reading list of files (GitHub #3397) - this.filesUploadedToRemoteStore = new HashSet<>(Arrays.asList(remoteDirectory.listAll())); + public RemoteStoreRefreshListener(IndexShard indexShard) { + this.indexShard = indexShard; + this.storeDirectory = indexShard.store().directory(); + this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) + .getDelegate()).getDelegate(); + this.primaryTerm = indexShard.getOperationPrimaryTerm(); + localSegmentChecksumMap = new HashMap<>(); } @Override @@ -46,42 +68,112 @@ public void beforeRefresh() throws IOException { /** * Upload new segment files created as part of the last refresh to the remote segment store. - * The method also deletes segment files from remote store which are not part of local filesystem. + * This method also uploads remote_segments_metadata file which contains metadata of each segment file uploaded. * @param didRefresh true if the refresh opened a new reference - * @throws IOException in case of I/O error in reading list of local files */ @Override - public void afterRefresh(boolean didRefresh) throws IOException { - if (didRefresh) { - Set localFiles = Set.of(storeDirectory.listAll()); - localFiles.stream().filter(file -> !filesUploadedToRemoteStore.contains(file)).forEach(file -> { - try { - remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT); - filesUploadedToRemoteStore.add(file); - } catch (NoSuchFileException e) { - logger.info( - () -> new ParameterizedMessage("The file {} does not exist anymore. It can happen in case of temp files", file), - e - ); - } catch (IOException e) { - // ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397) - logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e); - } - }); + public void afterRefresh(boolean didRefresh) { + synchronized (this) { + try { + if (indexShard.shardRouting.primary()) { + if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { + this.primaryTerm = indexShard.getOperationPrimaryTerm(); + this.remoteDirectory.init(); + } + try { + String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory); + if (!remoteDirectory.containsFile( + lastCommittedLocalSegmentFileName, + getChecksumOfLocalFile(lastCommittedLocalSegmentFileName) + )) { + deleteStaleCommits(); + } + try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { + SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); + Collection refreshedLocalFiles = segmentInfos.files(true); + + List segmentInfosFiles = refreshedLocalFiles.stream() + .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) + .collect(Collectors.toList()); + Optional latestSegmentInfos = segmentInfosFiles.stream() + .max(Comparator.comparingLong(IndexFileNames::parseGeneration)); - Set remoteFilesToBeDeleted = new HashSet<>(); - // ToDo: Instead of deleting files in sync, mark them and delete in async/periodic flow (GitHub #3142) - filesUploadedToRemoteStore.stream().filter(file -> !localFiles.contains(file)).forEach(file -> { - try { - remoteDirectory.deleteFile(file); - remoteFilesToBeDeleted.add(file); - } catch (IOException e) { - // ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397) - logger.warn(() -> new ParameterizedMessage("Exception while deleting file {} from the remote segment store", file), e); + if (latestSegmentInfos.isPresent()) { + refreshedLocalFiles.addAll(SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true)); + segmentInfosFiles.stream() + .filter(file -> !file.equals(latestSegmentInfos.get())) + .forEach(refreshedLocalFiles::remove); + + boolean uploadStatus = uploadNewSegments(refreshedLocalFiles); + if (uploadStatus) { + remoteDirectory.uploadMetadata( + refreshedLocalFiles, + storeDirectory, + indexShard.getOperationPrimaryTerm(), + segmentInfos.getGeneration() + ); + localSegmentChecksumMap.keySet() + .stream() + .filter(file -> !refreshedLocalFiles.contains(file)) + .collect(Collectors.toSet()) + .forEach(localSegmentChecksumMap::remove); + } + } + } catch (EngineException e) { + logger.warn("Exception while reading SegmentInfosSnapshot", e); + } + } catch (IOException e) { + // We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried + // in the next refresh. This should not affect durability of the indexed data after remote trans-log integration. + logger.warn("Exception while uploading new segments to the remote segment store", e); + } } - }); + } catch (Throwable t) { + logger.error("Exception in RemoteStoreRefreshListener.afterRefresh()", t); + } + } + } + + // Visible for testing + boolean uploadNewSegments(Collection localFiles) throws IOException { + AtomicBoolean uploadSuccess = new AtomicBoolean(true); + localFiles.stream().filter(file -> !EXCLUDE_FILES.contains(file)).filter(file -> { + try { + return !remoteDirectory.containsFile(file, getChecksumOfLocalFile(file)); + } catch (IOException e) { + logger.info( + "Exception while reading checksum of local segment file: {}, ignoring the exception and re-uploading the file", + file + ); + return true; + } + }).forEach(file -> { + try { + remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT); + } catch (IOException e) { + uploadSuccess.set(false); + // ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397) + logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e); + } + }); + return uploadSuccess.get(); + } + + private String getChecksumOfLocalFile(String file) throws IOException { + if (!localSegmentChecksumMap.containsKey(file)) { + try (IndexInput indexInput = storeDirectory.openInput(file, IOContext.DEFAULT)) { + String checksum = Long.toString(CodecUtil.retrieveChecksum(indexInput)); + localSegmentChecksumMap.put(file, checksum); + } + } + return localSegmentChecksumMap.get(file); + } - remoteFilesToBeDeleted.forEach(filesUploadedToRemoteStore::remove); + private void deleteStaleCommits() { + try { + remoteDirectory.deleteStaleSegments(LAST_N_METADATA_FILES_TO_KEEP); + } catch (IOException e) { + logger.info("Exception while deleting stale commits from remote segment store, will retry delete post next commit", e); } } } diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 1190e8e6ab3d2..06916c4cc87fe 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -449,7 +449,12 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco } indexShard.preRecovery(); indexShard.prepareForIndexRecovery(); - final Directory remoteDirectory = remoteStore.directory(); + assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory"; + FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory(); + assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory + : "Store.directory is not enclosing an instance of FilterDirectory"; + FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); + final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate(); final Store store = indexShard.store(); final Directory storeDirectory = store.directory(); store.incRef(); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteIndexInput.java b/server/src/main/java/org/opensearch/index/store/RemoteIndexInput.java index 8f8d5dd5418ae..2c809563ca961 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteIndexInput.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteIndexInput.java @@ -27,27 +27,37 @@ public class RemoteIndexInput extends IndexInput { private final InputStream inputStream; private final long size; + private long filePointer; public RemoteIndexInput(String name, InputStream inputStream, long size) { super(name); this.inputStream = inputStream; this.size = size; + this.filePointer = 0; } @Override public byte readByte() throws IOException { byte[] buffer = new byte[1]; - inputStream.read(buffer); + int numberOfBytesRead = inputStream.read(buffer); + if (numberOfBytesRead != -1) { + filePointer += numberOfBytesRead; + } return buffer[0]; } @Override public void readBytes(byte[] b, int offset, int len) throws IOException { int bytesRead = inputStream.read(b, offset, len); - while (bytesRead > 0 && bytesRead < len) { - len -= bytesRead; - offset += bytesRead; - bytesRead = inputStream.read(b, offset, len); + if (bytesRead == len) { + filePointer += bytesRead; + } else { + while (bytesRead > 0 && bytesRead < len) { + filePointer += bytesRead; + len -= bytesRead; + offset += bytesRead; + bytesRead = inputStream.read(b, offset, len); + } } } @@ -61,11 +71,6 @@ public long length() { return size; } - @Override - public void seek(long pos) throws IOException { - inputStream.skip(pos); - } - /** * Guaranteed to throw an exception and leave the RemoteIndexInput unmodified. * This method is not implemented as it is not used for the file transfer to/from the remote store. @@ -73,10 +78,18 @@ public void seek(long pos) throws IOException { * @throws UnsupportedOperationException always */ @Override - public long getFilePointer() { + public void seek(long pos) throws IOException { throw new UnsupportedOperationException(); } + /** + * Returns the current position in this file in terms of number of bytes read so far. + */ + @Override + public long getFilePointer() { + return filePointer; + } + /** * Guaranteed to throw an exception and leave the RemoteIndexInput unmodified. * This method is not implemented as it is not used for the file transfer to/from the remote store. diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index d7d6b29d08bfc..505ad6fafd550 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -24,9 +24,13 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; /** @@ -132,8 +136,9 @@ private Map readMetadataFile(String metadataFil /** * Metadata of a segment that is uploaded to remote segment store. */ - static class UploadedSegmentMetadata { - private static final String SEPARATOR = "::"; + public static class UploadedSegmentMetadata { + // Visible for testing + static final String SEPARATOR = "::"; private final String originalFilename; private final String uploadedFilename; private final String checksum; @@ -366,7 +371,69 @@ private String getLocalSegmentFilename(String remoteFilename) { } // Visible for testing - Map getSegmentsUploadedToRemoteStore() { - return this.segmentsUploadedToRemoteStore; + public Map getSegmentsUploadedToRemoteStore() { + return Collections.unmodifiableMap(this.segmentsUploadedToRemoteStore); + } + + /** + * Delete stale segment and metadata files + * One metadata file is kept per commit (refresh updates the same file). To read segments uploaded to remote store, + * we just need to read the latest metadata file. All the stale metadata files can be safely deleted. + * @param lastNMetadataFilesToKeep number of metadata files to keep + * @throws IOException in case of I/O error while reading from / writing to remote segment store + */ + public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException { + Collection metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX); + List sortedMetadataFileList = metadataFiles.stream().sorted(METADATA_FILENAME_COMPARATOR).collect(Collectors.toList()); + if (sortedMetadataFileList.size() <= lastNMetadataFilesToKeep) { + logger.info( + "Number of commits in remote segment store={}, lastNMetadataFilesToKeep={}", + sortedMetadataFileList.size(), + lastNMetadataFilesToKeep + ); + return; + } + List latestNMetadataFiles = sortedMetadataFileList.subList( + sortedMetadataFileList.size() - lastNMetadataFilesToKeep, + sortedMetadataFileList.size() + ); + Map activeSegmentFilesMetadataMap = new HashMap<>(); + Set activeSegmentRemoteFilenames = new HashSet<>(); + for (String metadataFile : latestNMetadataFiles) { + Map segmentMetadataMap = readMetadataFile(metadataFile); + activeSegmentFilesMetadataMap.putAll(segmentMetadataMap); + activeSegmentRemoteFilenames.addAll( + segmentMetadataMap.values().stream().map(metadata -> metadata.uploadedFilename).collect(Collectors.toSet()) + ); + } + for (String metadataFile : sortedMetadataFileList.subList(0, sortedMetadataFileList.size() - lastNMetadataFilesToKeep)) { + Map staleSegmentFilesMetadataMap = readMetadataFile(metadataFile); + Set staleSegmentRemoteFilenames = staleSegmentFilesMetadataMap.values() + .stream() + .map(metadata -> metadata.uploadedFilename) + .collect(Collectors.toSet()); + AtomicBoolean deletionSuccessful = new AtomicBoolean(true); + staleSegmentRemoteFilenames.stream().filter(file -> !activeSegmentRemoteFilenames.contains(file)).forEach(file -> { + try { + remoteDataDirectory.deleteFile(file); + if (!activeSegmentFilesMetadataMap.containsKey(getLocalSegmentFilename(file))) { + segmentsUploadedToRemoteStore.remove(getLocalSegmentFilename(file)); + } + } catch (NoSuchFileException e) { + logger.info("Segment file {} corresponding to metadata file {} does not exist in remote", file, metadataFile); + } catch (IOException e) { + deletionSuccessful.set(false); + logger.info( + "Exception while deleting segment file {} corresponding to metadata file {}. Deletion will be re-tried", + file, + metadataFile + ); + } + }); + if (deletionSuccessful.get()) { + logger.info("Deleting stale metadata file {} from remote segment store", metadataFile); + remoteMetadataDirectory.deleteFile(metadataFile); + } + } } } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java similarity index 58% rename from server/src/main/java/org/opensearch/index/store/RemoteDirectoryFactory.java rename to server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index 62f398cdad207..e77eb52bd3891 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -27,11 +27,11 @@ * * @opensearch.internal */ -public class RemoteDirectoryFactory implements IndexStorePlugin.RemoteDirectoryFactory { +public class RemoteSegmentStoreDirectoryFactory implements IndexStorePlugin.RemoteDirectoryFactory { private final Supplier repositoriesService; - public RemoteDirectoryFactory(Supplier repositoriesService) { + public RemoteSegmentStoreDirectoryFactory(Supplier repositoriesService) { this.repositoriesService = repositoriesService; } @@ -39,13 +39,23 @@ public RemoteDirectoryFactory(Supplier repositoriesService) public Directory newDirectory(String repositoryName, IndexSettings indexSettings, ShardPath path) throws IOException { try (Repository repository = repositoriesService.get().repository(repositoryName)) { assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; - BlobPath blobPath = new BlobPath(); - blobPath = blobPath.add(indexSettings.getIndex().getName()).add(String.valueOf(path.getShardId().getId())); - BlobContainer blobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(blobPath); - return new RemoteDirectory(blobContainer); + BlobPath commonBlobPath = ((BlobStoreRepository) repository).basePath(); + commonBlobPath = commonBlobPath.add(indexSettings.getIndex().getUUID()) + .add(String.valueOf(path.getShardId().getId())) + .add("segments"); + + RemoteDirectory dataDirectory = createRemoteDirectory(repository, commonBlobPath, "data"); + RemoteDirectory metadataDirectory = createRemoteDirectory(repository, commonBlobPath, "metadata"); + + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory); } catch (RepositoryMissingException e) { throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e); } } + private RemoteDirectory createRemoteDirectory(Repository repository, BlobPath commonBlobPath, String extention) { + BlobPath extendedPath = commonBlobPath.add(extention); + BlobContainer dataBlobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(extendedPath); + return new RemoteDirectory(dataBlobContainer); + } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index fdb609ba7bbff..6808803ee0988 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -132,7 +132,6 @@ import org.opensearch.index.shard.IndexingOperationListener; import org.opensearch.index.shard.IndexingStats; import org.opensearch.index.shard.ShardId; -import org.opensearch.index.store.RemoteDirectoryFactory; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.cluster.IndicesClusterStateService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; @@ -266,7 +265,7 @@ public class IndicesService extends AbstractLifecycleComponent private final Set danglingIndicesToWrite = Sets.newConcurrentHashSet(); private final boolean nodeWriteDanglingIndicesInfo; private final ValuesSourceRegistry valuesSourceRegistry; - private final RemoteDirectoryFactory remoteDirectoryFactory; + private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory; @Override protected void doStart() { @@ -295,7 +294,7 @@ public IndicesService( Map directoryFactories, ValuesSourceRegistry valuesSourceRegistry, Map recoveryStateFactories, - RemoteDirectoryFactory remoteDirectoryFactory + IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory ) { this.settings = settings; this.threadPool = threadPool; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index d3f0912cab638..3f4eadc52fd2a 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -39,12 +39,12 @@ import org.opensearch.common.util.FeatureFlags; import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; import org.opensearch.index.IndexingPressureService; -import org.opensearch.index.store.RemoteDirectoryFactory; import org.opensearch.indices.replication.SegmentReplicationSourceFactory; import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.SegmentReplicationSourceService; import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.threadpool.RunnableTaskExecutionListener; +import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.Assertions; import org.opensearch.Build; @@ -629,7 +629,9 @@ protected Node( rerouteServiceReference.set(rerouteService); clusterService.setRerouteService(rerouteService); - final RemoteDirectoryFactory remoteDirectoryFactory = new RemoteDirectoryFactory(repositoriesServiceReference::get); + final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory = new RemoteSegmentStoreDirectoryFactory( + repositoriesServiceReference::get + ); final IndicesService indicesService = new IndicesService( settings, diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index 45d93a5a12847..6bfdd9ae16773 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -89,7 +89,7 @@ import org.opensearch.index.similarity.NonNegativeScoresSimilarity; import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.store.FsDirectoryFactory; -import org.opensearch.index.store.RemoteDirectoryFactory; +import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.indices.IndicesModule; import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.analysis.AnalysisModule; @@ -234,7 +234,7 @@ private IndexService newIndexService(IndexModule module) throws IOException { writableRegistry(), () -> false, null, - new RemoteDirectoryFactory(() -> repositoriesService) + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService) ); } diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 8c00ab97a46ea..662afa80f65fc 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -2689,8 +2689,9 @@ public void testRestoreShardFromRemoteStore() throws IOException { storeDirectory.deleteFile(file); } + assertEquals(0, storeDirectory.listAll().length); + Directory remoteDirectory = ((FilterDirectory) ((FilterDirectory) target.remoteStore().directory()).getDelegate()).getDelegate(); - ((BaseDirectoryWrapper) remoteDirectory).setCheckIndexOnClose(false); // extra0 file is added as a part of https://lucene.apache.org/core/7_2_1/test-framework/org/apache/lucene/mockfile/ExtrasFS.html // Safe to remove without impacting the test diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index af92d821a9043..6b05d67836272 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -8,132 +8,209 @@ package org.opensearch.index.shard; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.Directory; -import org.apache.lucene.store.IOContext; -import org.opensearch.test.OpenSearchTestCase; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.tests.store.BaseDirectoryWrapper; +import org.junit.After; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.engine.InternalEngineFactory; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.index.store.Store; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; -import java.nio.file.NoSuchFileException; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CountDownLatch; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.doThrow; +public class RemoteStoreRefreshListenerTests extends IndexShardTestCase { + private IndexShard indexShard; + private RemoteStoreRefreshListener remoteStoreRefreshListener; -public class RemoteStoreRefreshListenerTests extends OpenSearchTestCase { - private Directory storeDirectory; - private Directory remoteDirectory; + public void setup(boolean primary, int numberOfDocs) throws IOException { + indexShard = newStartedShard( + primary, + Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true).build(), + new InternalEngineFactory() + ); - private RemoteStoreRefreshListener remoteStoreRefreshListener; + indexDocs(1, numberOfDocs); + indexShard.refresh("test"); - public void setup(String[] remoteFiles) throws IOException { - storeDirectory = mock(Directory.class); - remoteDirectory = mock(Directory.class); - when(remoteDirectory.listAll()).thenReturn(remoteFiles); - remoteStoreRefreshListener = new RemoteStoreRefreshListener(storeDirectory, remoteDirectory); + remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard); } - public void testAfterRefreshFalse() throws IOException { - setup(new String[0]); - remoteStoreRefreshListener.afterRefresh(false); - verify(storeDirectory, times(0)).listAll(); + private void indexDocs(int startDocId, int numberOfDocs) throws IOException { + for (int i = startDocId; i < startDocId + numberOfDocs; i++) { + indexDoc(indexShard, "_doc", Integer.toString(i)); + } } - public void testAfterRefreshTrueNoLocalFiles() throws IOException { - setup(new String[0]); + @After + public void tearDown() throws Exception { + Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) indexShard.store().directory()).getDelegate()).getDelegate(); + ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); + closeShards(indexShard); + super.tearDown(); + } - when(storeDirectory.listAll()).thenReturn(new String[0]); + public void testAfterRefresh() throws IOException { + setup(true, 3); + assertDocs(indexShard, "1", "2", "3"); - remoteStoreRefreshListener.afterRefresh(true); - verify(storeDirectory).listAll(); - verify(remoteDirectory, times(0)).copyFrom(any(), any(), any(), any()); - verify(remoteDirectory, times(0)).deleteFile(any()); - } + try (Store remoteStore = indexShard.remoteStore()) { + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = + (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate(); - public void testAfterRefreshOnlyUploadFiles() throws IOException { - setup(new String[0]); + verifyUploadedSegments(remoteSegmentStoreDirectory); - String[] localFiles = new String[] { "segments_1", "0.si", "0.cfs", "0.cfe" }; - when(storeDirectory.listAll()).thenReturn(localFiles); + // This is to check if reading data from remote segment store works as well. + remoteSegmentStoreDirectory.init(); - remoteStoreRefreshListener.afterRefresh(true); - verify(storeDirectory).listAll(); - verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT); - verify(remoteDirectory).copyFrom(storeDirectory, "0.si", "0.si", IOContext.DEFAULT); - verify(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfs", IOContext.DEFAULT); - verify(remoteDirectory).copyFrom(storeDirectory, "0.cfe", "0.cfe", IOContext.DEFAULT); - verify(remoteDirectory, times(0)).deleteFile(any()); + verifyUploadedSegments(remoteSegmentStoreDirectory); + } } - public void testAfterRefreshOnlyUploadAndDelete() throws IOException { - setup(new String[] { "0.si", "0.cfs" }); + public void testAfterCommit() throws IOException { + setup(true, 3); + assertDocs(indexShard, "1", "2", "3"); + flushShard(indexShard); - String[] localFiles = new String[] { "segments_1", "1.si", "1.cfs", "1.cfe" }; - when(storeDirectory.listAll()).thenReturn(localFiles); + try (Store remoteStore = indexShard.remoteStore()) { + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = + (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate(); - remoteStoreRefreshListener.afterRefresh(true); - verify(storeDirectory).listAll(); - verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT); - verify(remoteDirectory).copyFrom(storeDirectory, "1.si", "1.si", IOContext.DEFAULT); - verify(remoteDirectory).copyFrom(storeDirectory, "1.cfs", "1.cfs", IOContext.DEFAULT); - verify(remoteDirectory).copyFrom(storeDirectory, "1.cfe", "1.cfe", IOContext.DEFAULT); - verify(remoteDirectory).deleteFile("0.si"); - verify(remoteDirectory).deleteFile("0.cfs"); + verifyUploadedSegments(remoteSegmentStoreDirectory); + + // This is to check if reading data from remote segment store works as well. + remoteSegmentStoreDirectory.init(); + + verifyUploadedSegments(remoteSegmentStoreDirectory); + } } - public void testAfterRefreshOnlyDelete() throws IOException { - setup(new String[] { "0.si", "0.cfs" }); + public void testRefreshAfterCommit() throws IOException { + setup(true, 3); + assertDocs(indexShard, "1", "2", "3"); + flushShard(indexShard); - String[] localFiles = new String[] { "0.si" }; - when(storeDirectory.listAll()).thenReturn(localFiles); + indexDocs(4, 4); + indexShard.refresh("test"); - remoteStoreRefreshListener.afterRefresh(true); - verify(storeDirectory).listAll(); - verify(remoteDirectory, times(0)).copyFrom(any(), any(), any(), any()); - verify(remoteDirectory).deleteFile("0.cfs"); - } + indexDocs(8, 4); + indexShard.refresh("test"); - public void testAfterRefreshTempLocalFile() throws IOException { - setup(new String[0]); + try (Store remoteStore = indexShard.remoteStore()) { + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = + (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate(); - String[] localFiles = new String[] { "segments_1", "0.si", "0.cfs.tmp" }; - when(storeDirectory.listAll()).thenReturn(localFiles); - doThrow(new NoSuchFileException("0.cfs.tmp")).when(remoteDirectory) - .copyFrom(storeDirectory, "0.cfs.tmp", "0.cfs.tmp", IOContext.DEFAULT); + verifyUploadedSegments(remoteSegmentStoreDirectory); - remoteStoreRefreshListener.afterRefresh(true); - verify(storeDirectory).listAll(); - verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT); - verify(remoteDirectory).copyFrom(storeDirectory, "0.si", "0.si", IOContext.DEFAULT); - verify(remoteDirectory, times(0)).deleteFile(any()); + // This is to check if reading data from remote segment store works as well. + remoteSegmentStoreDirectory.init(); + + verifyUploadedSegments(remoteSegmentStoreDirectory); + } } - public void testAfterRefreshConsecutive() throws IOException { - setup(new String[0]); + public void testAfterMultipleCommits() throws IOException { + setup(true, 3); + assertDocs(indexShard, "1", "2", "3"); - String[] localFiles = new String[] { "segments_1", "0.si", "0.cfs", "0.cfe" }; - when(storeDirectory.listAll()).thenReturn(localFiles); - doThrow(new IOException("0.cfs")).when(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfe", IOContext.DEFAULT); - doThrow(new IOException("0.cfe")).when(remoteDirectory).copyFrom(storeDirectory, "0.cfe", "0.cfe", IOContext.DEFAULT); + for (int i = 0; i < RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP + 3; i++) { + indexDocs(4 * (i + 1), 4); + flushShard(indexShard); + } + try (Store remoteStore = indexShard.remoteStore()) { + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = + (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate(); + + verifyUploadedSegments(remoteSegmentStoreDirectory); + + // This is to check if reading data from remote segment store works as well. + remoteSegmentStoreDirectory.init(); + + verifyUploadedSegments(remoteSegmentStoreDirectory); + } + } + + public void testReplica() throws IOException { + setup(false, 3); remoteStoreRefreshListener.afterRefresh(true); - verify(storeDirectory).listAll(); - verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT); - verify(remoteDirectory).copyFrom(storeDirectory, "0.si", "0.si", IOContext.DEFAULT); - verify(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfs", IOContext.DEFAULT); - verify(remoteDirectory).copyFrom(storeDirectory, "0.cfe", "0.cfe", IOContext.DEFAULT); - verify(remoteDirectory, times(0)).deleteFile(any()); - String[] localFilesSecondRefresh = new String[] { "segments_1", "0.cfs", "1.cfs", "1.cfe" }; - when(storeDirectory.listAll()).thenReturn(localFilesSecondRefresh); + try (Store remoteStore = indexShard.remoteStore()) { + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = + (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate(); + + assertEquals(0, remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().size()); + } + } + public void testReplicaPromotion() throws IOException, InterruptedException { + setup(false, 3); remoteStoreRefreshListener.afterRefresh(true); - verify(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfs", IOContext.DEFAULT); - verify(remoteDirectory).copyFrom(storeDirectory, "1.cfs", "1.cfs", IOContext.DEFAULT); - verify(remoteDirectory).copyFrom(storeDirectory, "1.cfe", "1.cfe", IOContext.DEFAULT); - verify(remoteDirectory).deleteFile("0.si"); + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = + (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()).getDelegate()) + .getDelegate(); + + assertEquals(0, remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().size()); + + final ShardRouting replicaRouting = indexShard.routingEntry(); + promoteReplica( + indexShard, + Collections.singleton(replicaRouting.allocationId().getId()), + new IndexShardRoutingTable.Builder(replicaRouting.shardId()).addShard(replicaRouting).build() + ); + + // The following logic is referenced from IndexShardTests.testPrimaryFillsSeqNoGapsOnPromotion + // ToDo: Add wait logic as part of promoteReplica() + final CountDownLatch latch = new CountDownLatch(1); + indexShard.acquirePrimaryOperationPermit(new ActionListener<>() { + @Override + public void onResponse(Releasable releasable) { + releasable.close(); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + }, ThreadPool.Names.GENERIC, ""); + + latch.await(); + + indexDocs(4, 4); + indexShard.refresh("test"); + remoteStoreRefreshListener.afterRefresh(true); + + verifyUploadedSegments(remoteSegmentStoreDirectory); + + // This is to check if reading data from remote segment store works as well. + remoteSegmentStoreDirectory.init(); + + verifyUploadedSegments(remoteSegmentStoreDirectory); + } + + private void verifyUploadedSegments(RemoteSegmentStoreDirectory remoteSegmentStoreDirectory) throws IOException { + Map uploadedSegments = remoteSegmentStoreDirectory + .getSegmentsUploadedToRemoteStore(); + try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { + SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); + for (String file : segmentInfos.files(true)) { + if (!RemoteStoreRefreshListener.EXCLUDE_FILES.contains(file)) { + assertTrue(uploadedSegments.containsKey(file)); + } + } + } } } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteIndexInputTests.java b/server/src/test/java/org/opensearch/index/store/RemoteIndexInputTests.java index 273d3c7e37c56..cd35349e33b59 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteIndexInputTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteIndexInputTests.java @@ -44,6 +44,7 @@ public void testReadByte() throws IOException { when(inputStream.read()).thenReturn(10); assertEquals(10, remoteIndexInput.readByte()); + assertEquals(1, remoteIndexInput.getFilePointer()); verify(inputStream).read(any()); } @@ -52,13 +53,19 @@ public void testReadByteIOException() throws IOException { when(inputStream.read(any())).thenThrow(new IOException("Error reading")); assertThrows(IOException.class, () -> remoteIndexInput.readByte()); + assertEquals(0, remoteIndexInput.getFilePointer()); } public void testReadBytes() throws IOException { - byte[] buffer = new byte[10]; - remoteIndexInput.readBytes(buffer, 10, 20); + byte[] buffer = new byte[20]; + when(inputStream.read(eq(buffer), anyInt(), anyInt())).thenReturn(10).thenReturn(3).thenReturn(6).thenReturn(-1); + remoteIndexInput.readBytes(buffer, 0, 20); - verify(inputStream).read(buffer, 10, 20); + verify(inputStream).read(buffer, 0, 20); + verify(inputStream).read(buffer, 10, 10); + verify(inputStream).read(buffer, 13, 7); + verify(inputStream).read(buffer, 19, 1); + assertEquals(19, remoteIndexInput.getFilePointer()); } public void testReadBytesMultipleIterations() throws IOException { @@ -95,20 +102,14 @@ public void testLength() { assertEquals(FILESIZE, remoteIndexInput.length()); } - public void testSeek() throws IOException { - remoteIndexInput.seek(10); - - verify(inputStream).skip(10); - } - - public void testSeekIOException() throws IOException { - when(inputStream.skip(10)).thenThrow(new IOException("Error reading")); - - assertThrows(IOException.class, () -> remoteIndexInput.seek(10)); + public void testSeek() { + assertThrows(UnsupportedOperationException.class, () -> remoteIndexInput.seek(100L)); } - public void testGetFilePointer() { - assertThrows(UnsupportedOperationException.class, () -> remoteIndexInput.getFilePointer()); + public void testGetFilePointer() throws IOException { + when(inputStream.read(any(), eq(0), eq(8))).thenReturn(8); + remoteIndexInput.readBytes(new byte[8], 0, 8); + assertEquals(8, remoteIndexInput.getFilePointer()); } public void testSlice() { diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryFactoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java similarity index 70% rename from server/src/test/java/org/opensearch/index/store/RemoteDirectoryFactoryTests.java rename to server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java index e8357d2c184bf..0105d0dc309c2 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java @@ -11,6 +11,7 @@ import org.apache.lucene.store.Directory; import org.junit.Before; import org.mockito.ArgumentCaptor; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; @@ -27,29 +28,31 @@ import java.io.IOException; import java.nio.file.Path; import java.util.Collections; +import java.util.List; import java.util.function.Supplier; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; -public class RemoteDirectoryFactoryTests extends OpenSearchTestCase { +public class RemoteSegmentStoreDirectoryFactoryTests extends OpenSearchTestCase { private Supplier repositoriesServiceSupplier; private RepositoriesService repositoriesService; - private RemoteDirectoryFactory remoteDirectoryFactory; + private RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory; @Before public void setup() { repositoriesServiceSupplier = mock(Supplier.class); repositoriesService = mock(RepositoriesService.class); when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService); - remoteDirectoryFactory = new RemoteDirectoryFactory(repositoriesServiceSupplier); + remoteSegmentStoreDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(repositoriesServiceSupplier); } public void testNewDirectory() throws IOException { - Settings settings = Settings.builder().build(); + Settings settings = Settings.builder().put(IndexMetadata.SETTING_INDEX_UUID, "uuid_1").build(); IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings); Path tempDir = createTempDir().resolve(indexSettings.getUUID()).resolve("0"); ShardPath shardPath = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0)); @@ -57,20 +60,21 @@ public void testNewDirectory() throws IOException { BlobStore blobStore = mock(BlobStore.class); BlobContainer blobContainer = mock(BlobContainer.class); when(repository.blobStore()).thenReturn(blobStore); + when(repository.basePath()).thenReturn(new BlobPath().add("base_path")); when(blobStore.blobContainer(any())).thenReturn(blobContainer); when(blobContainer.listBlobs()).thenReturn(Collections.emptyMap()); when(repositoriesService.repository("remote_store_repository")).thenReturn(repository); - try (Directory directory = remoteDirectoryFactory.newDirectory("remote_store_repository", indexSettings, shardPath)) { - assertTrue(directory instanceof RemoteDirectory); + try (Directory directory = remoteSegmentStoreDirectoryFactory.newDirectory("remote_store_repository", indexSettings, shardPath)) { + assertTrue(directory instanceof RemoteSegmentStoreDirectory); ArgumentCaptor blobPathCaptor = ArgumentCaptor.forClass(BlobPath.class); - verify(blobStore).blobContainer(blobPathCaptor.capture()); - BlobPath blobPath = blobPathCaptor.getValue(); - assertEquals("foo/0/", blobPath.buildAsString()); + verify(blobStore, times(2)).blobContainer(blobPathCaptor.capture()); + List blobPaths = blobPathCaptor.getAllValues(); + assertEquals("base_path/uuid_1/0/segments/data/", blobPaths.get(0).buildAsString()); + assertEquals("base_path/uuid_1/0/segments/metadata/", blobPaths.get(1).buildAsString()); - directory.listAll(); - verify(blobContainer).listBlobs(); + verify(blobContainer).listBlobsByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX); verify(repositoriesService).repository("remote_store_repository"); } } @@ -85,7 +89,7 @@ public void testNewDirectoryRepositoryDoesNotExist() { assertThrows( IllegalArgumentException.class, - () -> remoteDirectoryFactory.newDirectory("remote_store_repository", indexSettings, shardPath) + () -> remoteSegmentStoreDirectoryFactory.newDirectory("remote_store_repository", indexSettings, shardPath) ); } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 4eabfa74625f2..96f14616fb54b 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -15,6 +15,7 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.tests.util.LuceneTestCase; import org.junit.Before; +import org.opensearch.common.UUIDs; import org.opensearch.common.collect.Set; import org.opensearch.test.OpenSearchTestCase; @@ -129,26 +130,52 @@ public void testInitNoMetadataFile() throws IOException { private Map getDummyMetadata(String prefix, int commitGeneration) { Map metadata = new HashMap<>(); - metadata.put(prefix + ".cfe", prefix + ".cfe::" + prefix + ".cfe__qrt::" + randomIntBetween(1000, 5000)); - metadata.put(prefix + ".cfs", prefix + ".cfs::" + prefix + ".cfs__zxd::" + randomIntBetween(1000, 5000)); - metadata.put(prefix + ".si", prefix + ".si::" + prefix + ".si__yui::" + randomIntBetween(1000, 5000)); + + metadata.put(prefix + ".cfe", prefix + ".cfe::" + prefix + ".cfe__" + UUIDs.base64UUID() + "::" + randomIntBetween(1000, 5000)); + metadata.put(prefix + ".cfs", prefix + ".cfs::" + prefix + ".cfs__" + UUIDs.base64UUID() + "::" + randomIntBetween(1000, 5000)); + metadata.put(prefix + ".si", prefix + ".si::" + prefix + ".si__" + UUIDs.base64UUID() + "::" + randomIntBetween(1000, 5000)); metadata.put( "segments_" + commitGeneration, - "segments_" + commitGeneration + "::segments_" + commitGeneration + "__exv::" + randomIntBetween(1000, 5000) + "segments_" + + commitGeneration + + "::segments_" + + commitGeneration + + "__" + + UUIDs.base64UUID() + + "::" + + randomIntBetween(1000, 5000) ); return metadata; } - private void populateMetadata() throws IOException { + private Map> populateMetadata() throws IOException { List metadataFiles = List.of("metadata__1__5__abc", "metadata__1__6__pqr", "metadata__2__1__zxv"); when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn( metadataFiles ); - IndexInput indexInput = mock(IndexInput.class); - Map dummyMetadata = getDummyMetadata("_0", 1); - when(indexInput.readMapOfStrings()).thenReturn(dummyMetadata); - when(remoteMetadataDirectory.openInput("metadata__2__1__zxv", IOContext.DEFAULT)).thenReturn(indexInput); + Map> metadataFilenameContentMapping = Map.of( + "metadata__1__5__abc", + getDummyMetadata("_0", 1), + "metadata__1__6__pqr", + getDummyMetadata("_0", 1), + "metadata__2__1__zxv", + getDummyMetadata("_0", 1) + ); + + IndexInput indexInput1 = mock(IndexInput.class); + when(indexInput1.readMapOfStrings()).thenReturn(metadataFilenameContentMapping.get("metadata__1__5__abc")); + when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(indexInput1); + + IndexInput indexInput2 = mock(IndexInput.class); + when(indexInput2.readMapOfStrings()).thenReturn(metadataFilenameContentMapping.get("metadata__1__6__pqr")); + when(remoteMetadataDirectory.openInput("metadata__1__6__pqr", IOContext.DEFAULT)).thenReturn(indexInput2); + + IndexInput indexInput3 = mock(IndexInput.class); + when(indexInput3.readMapOfStrings()).thenReturn(metadataFilenameContentMapping.get("metadata__2__1__zxv")); + when(remoteMetadataDirectory.openInput("metadata__2__1__zxv", IOContext.DEFAULT)).thenReturn(indexInput3); + + return metadataFilenameContentMapping; } public void testInit() throws IOException { @@ -291,20 +318,39 @@ public void testCopyFromException() throws IOException { } public void testContainsFile() throws IOException { - populateMetadata(); + List metadataFiles = List.of("metadata__1__5__abc"); + when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn( + metadataFiles + ); + + Map metadata = new HashMap<>(); + metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234"); + metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345"); + + Map> metadataFilenameContentMapping = Map.of("metadata__1__5__abc", metadata); + + IndexInput indexInput1 = mock(IndexInput.class); + when(indexInput1.readMapOfStrings()).thenReturn(metadataFilenameContentMapping.get("metadata__1__5__abc")); + when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(indexInput1); + remoteSegmentStoreDirectory.init(); - // This is not the correct way to add files but the other way is to open up access to fields in UploadedSegmentMetadata Map uploadedSegmentMetadataMap = remoteSegmentStoreDirectory .getSegmentsUploadedToRemoteStore(); - uploadedSegmentMetadataMap.put( - "_100.si", - new RemoteSegmentStoreDirectory.UploadedSegmentMetadata("_100.si", "_100.si__uuid1", "1234") + + assertThrows( + UnsupportedOperationException.class, + () -> uploadedSegmentMetadataMap.put( + "_100.si", + new RemoteSegmentStoreDirectory.UploadedSegmentMetadata("_100.si", "_100.si__uuid1", "1234") + ) ); - assertTrue(remoteSegmentStoreDirectory.containsFile("_100.si", "1234")); - assertFalse(remoteSegmentStoreDirectory.containsFile("_100.si", "2345")); - assertFalse(remoteSegmentStoreDirectory.containsFile("_200.si", "1234")); + assertTrue(remoteSegmentStoreDirectory.containsFile("_0.cfe", "1234")); + assertTrue(remoteSegmentStoreDirectory.containsFile("_0.cfs", "2345")); + assertFalse(remoteSegmentStoreDirectory.containsFile("_0.cfe", "1234000")); + assertFalse(remoteSegmentStoreDirectory.containsFile("_0.cfs", "2345000")); + assertFalse(remoteSegmentStoreDirectory.containsFile("_0.si", "23")); } public void testUploadMetadataEmpty() throws IOException { @@ -336,4 +382,84 @@ public void testUploadMetadataNonEmpty() throws IOException { String metadataString = remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().get("_0.si").toString(); verify(indexOutput).writeMapOfStrings(Map.of("_0.si", metadataString)); } + + public void testDeleteStaleCommitsException() throws IOException { + when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenThrow( + new IOException("Error reading") + ); + + assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.deleteStaleSegments(5)); + } + + public void testDeleteStaleCommitsWithinThreshold() throws IOException { + populateMetadata(); + + // popluateMetadata() adds stub to return 3 metadata files + // We are passing lastNMetadataFilesToKeep=5 here so that none of the metadata files will be deleted + remoteSegmentStoreDirectory.deleteStaleSegments(5); + + verify(remoteMetadataDirectory, times(0)).openInput(any(String.class), eq(IOContext.DEFAULT)); + } + + public void testDeleteStaleCommitsActualDelete() throws IOException { + Map> metadataFilenameContentMapping = populateMetadata(); + remoteSegmentStoreDirectory.init(); + + // popluateMetadata() adds stub to return 3 metadata files + // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted + remoteSegmentStoreDirectory.deleteStaleSegments(2); + + for (String metadata : metadataFilenameContentMapping.get("metadata__1__5__abc").values()) { + String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]; + verify(remoteDataDirectory).deleteFile(uploadedFilename); + } + ; + verify(remoteMetadataDirectory).deleteFile("metadata__1__5__abc"); + } + + public void testDeleteStaleCommitsActualDeleteIOException() throws IOException { + Map> metadataFilenameContentMapping = populateMetadata(); + remoteSegmentStoreDirectory.init(); + + String segmentFileWithException = metadataFilenameContentMapping.get("metadata__1__5__abc") + .values() + .stream() + .findAny() + .get() + .split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]; + doThrow(new IOException("Error")).when(remoteDataDirectory).deleteFile(segmentFileWithException); + // popluateMetadata() adds stub to return 3 metadata files + // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted + remoteSegmentStoreDirectory.deleteStaleSegments(2); + + for (String metadata : metadataFilenameContentMapping.get("metadata__1__5__abc").values()) { + String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]; + verify(remoteDataDirectory).deleteFile(uploadedFilename); + } + ; + verify(remoteMetadataDirectory, times(0)).deleteFile("metadata__1__5__abc"); + } + + public void testDeleteStaleCommitsActualDeleteNoSuchFileException() throws IOException { + Map> metadataFilenameContentMapping = populateMetadata(); + remoteSegmentStoreDirectory.init(); + + String segmentFileWithException = metadataFilenameContentMapping.get("metadata__1__5__abc") + .values() + .stream() + .findAny() + .get() + .split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]; + doThrow(new NoSuchFileException(segmentFileWithException)).when(remoteDataDirectory).deleteFile(segmentFileWithException); + // popluateMetadata() adds stub to return 3 metadata files + // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted + remoteSegmentStoreDirectory.deleteStaleSegments(2); + + for (String metadata : metadataFilenameContentMapping.get("metadata__1__5__abc").values()) { + String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]; + verify(remoteDataDirectory).deleteFile(uploadedFilename); + } + ; + verify(remoteMetadataDirectory).deleteFile("metadata__1__5__abc"); + } } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 4b8eec70f2c1a..4d3b841e203de 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -172,7 +172,7 @@ import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.PrimaryReplicaSyncer; -import org.opensearch.index.store.RemoteDirectoryFactory; +import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.indices.IndicesModule; import org.opensearch.indices.IndicesService; import org.opensearch.indices.ShardLimitValidator; @@ -1826,7 +1826,7 @@ public void onFailure(final Exception e) { emptyMap(), null, emptyMap(), - new RemoteDirectoryFactory(() -> repositoriesService) + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService) ); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); snapshotShardsService = new SnapshotShardsService( diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index f446538acccbb..08004b7e42fea 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -59,6 +59,10 @@ import org.opensearch.common.CheckedFunction; import org.opensearch.common.Nullable; import org.opensearch.common.UUIDs; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.fs.FsBlobContainer; +import org.opensearch.common.blobstore.fs.FsBlobStore; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lucene.uid.Versions; @@ -88,6 +92,8 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; +import org.opensearch.index.store.RemoteDirectory; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.InternalTranslogFactory; @@ -123,6 +129,7 @@ import org.opensearch.threadpool.ThreadPool; import java.io.IOException; +import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -532,7 +539,10 @@ protected IndexShard newShard( ShardId shardId = shardPath.getShardId(); NodeEnvironment.NodePath remoteNodePath = new NodeEnvironment.NodePath(createTempDir()); ShardPath remoteShardPath = new ShardPath(false, remoteNodePath.resolve(shardId), remoteNodePath.resolve(shardId), shardId); - storeProvider = is -> createStore(is, remoteShardPath); + RemoteDirectory dataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); + RemoteDirectory metadataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory); + storeProvider = is -> createStore(shardId, is, remoteSegmentStoreDirectory); remoteStore = storeProvider.apply(indexSettings); } indexShard = new IndexShard( @@ -570,6 +580,13 @@ protected IndexShard newShard( return indexShard; } + private RemoteDirectory newRemoteDirectory(Path f) throws IOException { + FsBlobStore fsBlobStore = new FsBlobStore(1024, f, false); + BlobPath blobPath = new BlobPath(); + BlobContainer fsBlobContainer = new FsBlobContainer(fsBlobStore, blobPath, f); + return new RemoteDirectory(fsBlobContainer); + } + /** * Takes an existing shard, closes it and starts a new initialing shard at the same location *