From 672404300cc2d8af738f46f2a812356ae8a04ddc Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat <2025sandeepkumawat@gmail.com> Date: Mon, 13 May 2024 16:21:28 +0530 Subject: [PATCH] initial commits Signed-off-by: Sandeep Kumawat <2025sandeepkumawat@gmail.com> --- .../index/translog/RemoteFsTranslog.java | 19 ++- .../transfer/BlobStoreTransferService.java | 34 ++++- .../index/translog/transfer/FileSnapshot.java | 10 ++ .../translog/transfer/TransferService.java | 14 ++ .../translog/transfer/TransferSnapshot.java | 4 + .../TranslogCheckpointTransferSnapshot.java | 16 ++- .../transfer/TranslogTransferManager.java | 126 ++++++++++++++++-- 7 files changed, 200 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index da905b9605dfd..69600c7f4066c 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -91,6 +91,7 @@ public class RemoteFsTranslog extends Translog { private static final int SYNC_PERMIT = 1; private final Semaphore syncPermit = new Semaphore(SYNC_PERMIT); private final AtomicBoolean pauseSync = new AtomicBoolean(false); + boolean ckpAsMetadata; public RemoteFsTranslog( TranslogConfig config, @@ -110,6 +111,7 @@ public RemoteFsTranslog( this.startedPrimarySupplier = startedPrimarySupplier; this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker); + ckpAsMetadata = true; this.translogTransferManager = buildTranslogTransferManager( blobStoreRepository, threadPool, @@ -117,7 +119,8 @@ public RemoteFsTranslog( fileTransferTracker, remoteTranslogTransferTracker, indexSettings().getRemoteStorePathStrategy(), - remoteStoreSettings + remoteStoreSettings, + ckpAsMetadata ); try { download(translogTransferManager, location, logger); @@ -288,7 +291,8 @@ public static TranslogTransferManager buildTranslogTransferManager( FileTransferTracker fileTransferTracker, RemoteTranslogTransferTracker tracker, RemoteStorePathStrategy pathStrategy, - RemoteStoreSettings remoteStoreSettings + RemoteStoreSettings remoteStoreSettings, + boolean ckpAsMetadata ) { assert Objects.nonNull(pathStrategy); String indexUUID = shardId.getIndex().getUUID(); @@ -310,7 +314,16 @@ public static TranslogTransferManager buildTranslogTransferManager( .build(); BlobPath mdPath = pathStrategy.generatePath(mdPathInput); BlobStoreTransferService transferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool); - return new TranslogTransferManager(shardId, transferService, dataPath, mdPath, fileTransferTracker, tracker, remoteStoreSettings); + return new TranslogTransferManager( + shardId, + transferService, + dataPath, + mdPath, + fileTransferTracker, + tracker, + remoteStoreSettings, + ckpAsMetadata + ); } @Override diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index bec2d78d9af62..0ea32a2ce5821 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -28,14 +28,12 @@ import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import org.opensearch.threadpool.ThreadPool; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.nio.channels.FileChannel; import java.nio.file.StandardOpenOption; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; +import java.util.*; import static org.opensearch.common.blobstore.BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC; @@ -90,22 +88,41 @@ public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable public void uploadBlobs( Set fileSnapshots, final Map blobPaths, + final Map fileMetadataMap, ActionListener listener, WritePriority writePriority ) { fileSnapshots.forEach(fileSnapshot -> { BlobPath blobPath = blobPaths.get(fileSnapshot.getPrimaryTerm()); + InputStream fileMetadata = fileMetadataMap.get(fileSnapshot); if (!(blobStore.blobContainer(blobPath) instanceof AsyncMultiStreamBlobContainer)) { uploadBlob(ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, blobPath, listener, writePriority); } else { - uploadBlob(fileSnapshot, listener, blobPath, writePriority); + uploadBlob(fileSnapshot, fileMetadata, listener, blobPath, writePriority); } }); } + public Map buildFileMetadata(InputStream fileMetadata) throws IOException { + Map metadata = new HashMap<>(); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + byte[] buffer = new byte[4096]; + int bytesRead; + + while ((bytesRead = fileMetadata.read(buffer)) != -1) { + byteArrayOutputStream.write(buffer, 0, bytesRead); + } + + byte[] bytes = byteArrayOutputStream.toByteArray(); + String metadataString = Base64.getEncoder().encodeToString(bytes); + metadata.put("ckp-data", metadataString); + return metadata; + } + private void uploadBlob( TransferFileSnapshot fileSnapshot, + InputStream fileMetadata, ActionListener listener, BlobPath blobPath, WritePriority writePriority @@ -113,6 +130,10 @@ private void uploadBlob( try { ChannelFactory channelFactory = FileChannel::open; + Map metadata = null; + if(fileMetadata != null){ + metadata = buildFileMetadata(fileMetadata); + } long contentLength; try (FileChannel channel = channelFactory.open(fileSnapshot.getPath(), StandardOpenOption.READ)) { contentLength = channel.size(); @@ -130,7 +151,8 @@ private void uploadBlob( writePriority, (size, position) -> new OffsetRangeFileInputStream(fileSnapshot.getPath(), size, position), Objects.requireNonNull(fileSnapshot.getChecksum()), - remoteIntegrityEnabled + remoteIntegrityEnabled, + metadata ); ActionListener completionListener = ActionListener.wrap(resp -> listener.onResponse(fileSnapshot), ex -> { logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java index dcec94edd694f..83368e98aa13e 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -22,6 +22,7 @@ import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Arrays; +import java.util.Map; import java.util.Objects; /** @@ -108,6 +109,7 @@ public static class TransferFileSnapshot extends FileSnapshot { private final long primaryTerm; private Long checksum; + private Map metadata; public TransferFileSnapshot(Path path, long primaryTerm, Long checksum) throws IOException { super(path); @@ -128,6 +130,14 @@ public long getPrimaryTerm() { return primaryTerm; } + public void setMetadata(Map metadata) { + this.metadata = metadata; + } + + public Map getMetadata() { + return metadata; + } + @Override public int hashCode() { return Objects.hash(primaryTerm, super.hashCode()); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 0894ebf500ebd..6613abb119a14 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -57,6 +57,20 @@ void uploadBlobs( WritePriority writePriority ) throws Exception; + /** + * Uploads multiple {@link TransferFileSnapshot}, once the upload is complete the callback is invoked + * @param fileSnapshots the file snapshots to upload + * @param blobPaths Primary term to {@link BlobPath} map + * @param listener the callback to be invoked once uploads complete successfully/fail + */ + void uploadBlobs( + Set fileSnapshots, + final Map blobPaths, + final Map fileMetadataMap, + ActionListener listener, + WritePriority writePriority + ) throws Exception; + /** * Uploads the {@link TransferFileSnapshot} blob * @param fileSnapshot the file snapshot to upload diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java index ef34fd31a296b..0e54ee65e38b0 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java @@ -12,6 +12,8 @@ import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; +import java.io.IOException; +import java.util.Map; import java.util.Set; /** @@ -39,4 +41,6 @@ public interface TransferSnapshot { * @return the translog transfer metadata */ TranslogTransferMetadata getTranslogTransferMetadata(); + + Map getTranslogCheckpointSnapshotMap(); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java index fb78731246a07..df18764e2951c 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java @@ -13,12 +13,10 @@ import java.io.Closeable; import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -64,6 +62,14 @@ public Set getTranslogFileSnapshots() { return translogCheckpointFileInfoTupleSet.stream().map(Tuple::v1).collect(Collectors.toSet()); } + public Map getTranslogCheckpointSnapshotMap() { + Map tlogCkpSnapshots = new HashMap<>(); + translogCheckpointFileInfoTupleSet.forEach(tuple -> { + tlogCkpSnapshots.put(tuple.v1(), tuple.v2()); + }); + return tlogCkpSnapshots; + } + @Override public TranslogTransferMetadata getTranslogTransferMetadata() { return new TranslogTransferMetadata( diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 1087244623b87..164263b62add2 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -16,6 +16,7 @@ import org.opensearch.common.SetOnce; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.FetchBlobResult; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.io.stream.BytesStreamOutput; @@ -36,6 +37,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Base64; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -63,6 +65,7 @@ public class TranslogTransferManager { private final RemoteTranslogTransferTracker remoteTranslogTransferTracker; private final RemoteStoreSettings remoteStoreSettings; private static final int METADATA_FILES_TO_FETCH = 10; + boolean ckpAsMetadata; private final Logger logger; @@ -79,7 +82,8 @@ public TranslogTransferManager( BlobPath remoteMetadataTransferPath, FileTransferTracker fileTransferTracker, RemoteTranslogTransferTracker remoteTranslogTransferTracker, - RemoteStoreSettings remoteStoreSettings + RemoteStoreSettings remoteStoreSettings, + boolean ckpAsMetadata ) { this.shardId = shardId; this.transferService = transferService; @@ -89,6 +93,7 @@ public TranslogTransferManager( this.logger = Loggers.getLogger(getClass(), shardId); this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; this.remoteStoreSettings = remoteStoreSettings; + this.ckpAsMetadata = ckpAsMetadata; } public RemoteTranslogTransferTracker getRemoteTranslogTransferTracker() { @@ -110,8 +115,21 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans long prevUploadTimeInMillis = remoteTranslogTransferTracker.getTotalUploadTimeInMillis(); try { - toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots())); - toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots()))); + Map tlogCkpSnapshotMap = transferSnapshot.getTranslogCheckpointSnapshotMap(); + Map fileMetadataMap = new HashMap<>(); + if (ckpAsMetadata) { + toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots())); + toUpload.forEach(fileSnapshot -> { + try { + fileMetadataMap.put(fileSnapshot, tlogCkpSnapshotMap.get(fileSnapshot).inputStream()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } else { + toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots())); + toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots()))); + } if (toUpload.isEmpty()) { logger.trace("Nothing to upload for transfer"); return true; @@ -149,7 +167,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans // TODO: Ideally each file's upload start time should be when it is actually picked for upload // https://github.com/opensearch-project/OpenSearch/issues/9729 fileTransferTracker.recordFileTransferStartTime(uploadStartTime); - transferService.uploadBlobs(toUpload, blobPathMap, latchedActionListener, WritePriority.HIGH); + transferService.uploadBlobs(toUpload, blobPathMap, fileMetadataMap, latchedActionListener, WritePriority.HIGH); try { if (latch.await(remoteStoreSettings.getClusterRemoteTranslogTransferTimeout().millis(), TimeUnit.MILLISECONDS) == false) { @@ -236,15 +254,101 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca generation, location ); - // Download Checkpoint file from remote to local FS String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation)); - downloadToFS(ckpFileName, location, primaryTerm); - // Download translog file from remote to local FS String translogFilename = Translog.getFilename(Long.parseLong(generation)); - downloadToFS(translogFilename, location, primaryTerm); + if (ckpAsMetadata == false) { + // Download Checkpoint file from remote to local FS + downloadToFS(ckpFileName, location, primaryTerm); + // Download translog file from remote to local FS + downloadToFS(translogFilename, location, primaryTerm); + } else { + // Download translog.tlog file with object metadata from remote to local FS + Map metadata = downloadTranslogToFSAndGetMetadata(translogFilename, location, primaryTerm, generation); + try { + assert metadata != null && !metadata.isEmpty() && metadata.containsKey("ckp-data"); + recoverCkpFileFromMetadata(metadata, location, generation, translogFilename); + } catch (Exception e) { + throw new IOException("Failed to recover checkpoint file from remote", e); + } + } return true; } + private Map downloadTranslogToFSAndGetMetadata(String fileName, Path location, String primaryTerm, String generation) + throws IOException { + Path filePath = location.resolve(fileName); + // Here, we always override the existing file if present. + // We need to change this logic when we introduce incremental download + deleteFileIfExists(filePath); + + boolean downloadStatus = false; + long bytesToRead = 0, downloadStartTime = System.nanoTime(); + Map metadata; + + FetchBlobResult inputStreamWithMetadata = transferService.downloadBlobWithMetadata( + remoteDataTransferPath.add(primaryTerm), + fileName + ); + try { + InputStream inputStream = inputStreamWithMetadata.getInputStream(); + metadata = inputStreamWithMetadata.getMetadata(); + + bytesToRead = inputStream.available(); + Files.copy(inputStream, filePath); + downloadStatus = true; + + } finally { + remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L); + if (downloadStatus) { + remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead); + } + } + + // Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync + fileTransferTracker.add(fileName, true); + + return metadata; + } + + /** + * Process the provided metadata and tries to write the content of the checkpoint (ckp) file to the FS. + */ + private void recoverCkpFileFromMetadata(Map metadata, Path location, String generation, String fileName) + throws IOException { + + boolean downloadStatus = false; + long bytesToRead = 0; + try { + String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation)); + Path filePath = location.resolve(ckpFileName); + // Here, we always override the existing file if present. + deleteFileIfExists(filePath); + + String ckpDataBase64 = metadata.get("ckp-data"); + if (ckpDataBase64 == null) { + logger.error("Error processing metadata for translog file: {}", fileName); + throw new IllegalStateException( + "Checkpoint file data (key - ckp-data) is expected but not found in metadata for file: " + fileName + ); + } + byte[] ckpFileBytes = Base64.getDecoder().decode(ckpDataBase64); + bytesToRead = ckpFileBytes.length; + + Files.write(filePath, ckpFileBytes); + downloadStatus = true; + } finally { + if (downloadStatus) { + remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead); + } + } + } + + public void deleteFileIfExists(Path filePath) throws IOException { + if (Files.exists(filePath)) { + Files.delete(filePath); + } + } + private void downloadToFS(String fileName, Path location, String primaryTerm) throws IOException { Path filePath = location.resolve(fileName); // Here, we always override the existing file if present. @@ -391,7 +495,11 @@ public void deleteGenerationAsync(long primaryTerm, Set generations, Runna // Add .ckp and .tlog file to translog file list which is located in basePath/ String ckpFileName = Translog.getCommitCheckpointFileName(generation); String translogFileName = Translog.getFilename(generation); - translogFiles.addAll(List.of(ckpFileName, translogFileName)); + if (ckpAsMetadata == false) { + translogFiles.addAll(List.of(ckpFileName, translogFileName)); + } else { + translogFiles.add(translogFileName); + } }); // Delete the translog and checkpoint files asynchronously deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion);