From ee8717dacfe7de3bb6a09d150be90595a0bbe294 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Thu, 14 Aug 2025 15:12:11 +0530 Subject: [PATCH 1/8] Clone index input in multipart upload flow Signed-off-by: Sachin Kale --- .../index/store/RemoteDirectory.java | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index da759f21addff..7c03ba66caf6f 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -397,22 +397,22 @@ private void uploadBlob( assert ioContext != IOContext.READONCE : "Remote upload will fail with IoContext.READONCE"; long expectedChecksum = calculateChecksumOfChecksum(from, src); long contentLength; - try (IndexInput indexInput = from.openInput(src, ioContext)) { - contentLength = indexInput.length(); - } + IndexInput indexInput = from.openInput(src, ioContext); + contentLength = indexInput.length(); boolean remoteIntegrityEnabled = false; if (getBlobContainer() instanceof AsyncMultiStreamBlobContainer) { remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) getBlobContainer()).remoteIntegrityCheckSupported(); } lowPriorityUpload = lowPriorityUpload || contentLength > ByteSizeUnit.GB.toBytes(15); RemoteTransferContainer.OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier; + if (lowPriorityUpload) { offsetRangeInputStreamSupplier = (size, position) -> lowPriorityUploadRateLimiter.apply( - new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position) + new OffsetRangeIndexInputStream(indexInput.clone(), size, position) ); } else { offsetRangeInputStreamSupplier = (size, position) -> uploadRateLimiter.apply( - new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position) + new OffsetRangeIndexInputStream(indexInput.clone(), size, position) ); } RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( @@ -457,6 +457,14 @@ private void uploadBlob( } }); + completionListener = ActionListener.runAfter(completionListener, () -> { + try { + indexInput.close(); + } catch(IOException e) { + logger.warn("Error occurred while closing index input", e); + } + }); + WriteContext writeContext = remoteTransferContainer.createWriteContext(); ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(writeContext, completionListener); } From cf27be86953e893a7e2d7b5da0b954e77ff5a537 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Thu, 14 Aug 2025 15:42:29 +0530 Subject: [PATCH 2/8] Fix spotless error Signed-off-by: Sachin Kale --- .../org/opensearch/index/store/RemoteDirectory.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 7c03ba66caf6f..c4ad060c10711 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -458,11 +458,11 @@ private void uploadBlob( }); completionListener = ActionListener.runAfter(completionListener, () -> { - try { - indexInput.close(); - } catch(IOException e) { - logger.warn("Error occurred while closing index input", e); - } + try { + indexInput.close(); + } catch (IOException e) { + logger.warn("Error occurred while closing index input", e); + } }); WriteContext writeContext = remoteTransferContainer.createWriteContext(); From 18138d733343c1d0384dedac9815fa9f6e955bd8 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Thu, 14 Aug 2025 17:21:13 +0530 Subject: [PATCH 3/8] Fix for segments_N file Signed-off-by: Sachin Kale --- .../store/RemoteSegmentStoreDirectory.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 47b92a7bca454..5f76c22041832 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -599,13 +599,16 @@ public IndexInput openBlockInput(String name, long position, long length, IOCont public void copyFrom(Directory from, String src, IOContext context, ActionListener listener, boolean lowPriorityUpload) { try { final String remoteFileName = getNewRemoteSegmentFilename(src); - boolean uploaded = remoteDataDirectory.copyFrom(from, src, remoteFileName, context, () -> { - try { - postUpload(from, src, remoteFileName, getChecksumOfLocalFile(from, src)); - } catch (IOException e) { - throw new RuntimeException("Exception in segment postUpload for file " + src, e); - } - }, listener, lowPriorityUpload); + boolean uploaded = false; + if(src.startsWith("segments_") == false) { + uploaded = remoteDataDirectory.copyFrom(from, src, remoteFileName, context, () -> { + try { + postUpload(from, src, remoteFileName, getChecksumOfLocalFile(from, src)); + } catch (IOException e) { + throw new RuntimeException("Exception in segment postUpload for file " + src, e); + } + }, listener, lowPriorityUpload); + } if (uploaded == false) { copyFrom(from, src, src, context); listener.onResponse(null); From 046476187b22432f62150321ee63c2dc99289a72 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Thu, 14 Aug 2025 22:35:32 +0530 Subject: [PATCH 4/8] Fix spotless error Signed-off-by: Sachin Kale --- .../org/opensearch/index/store/RemoteSegmentStoreDirectory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 5f76c22041832..aa47125b5ff19 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -600,7 +600,7 @@ public void copyFrom(Directory from, String src, IOContext context, ActionListen try { final String remoteFileName = getNewRemoteSegmentFilename(src); boolean uploaded = false; - if(src.startsWith("segments_") == false) { + if (src.startsWith("segments_") == false) { uploaded = remoteDataDirectory.copyFrom(from, src, remoteFileName, context, () -> { try { postUpload(from, src, remoteFileName, getChecksumOfLocalFile(from, src)); From b04e72a9a4651353a45be7d82bd67e3f284e5361 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Mon, 18 Aug 2025 16:52:31 +0530 Subject: [PATCH 5/8] Fix test Signed-off-by: Sachin Kale --- .../index/shard/RemoteStoreRefreshListenerTests.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 35372c0f6c0e4..8094f1c91ee13 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -54,6 +54,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.mockito.stubbing.Answer; + import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; import static org.opensearch.index.store.RemoteSegmentStoreDirectory.METADATA_FILES_TO_FETCH; import static org.opensearch.test.RemoteStoreTestUtils.createMetadataFileBytes; @@ -822,7 +824,7 @@ private Tuple mockIn when(shard.getRemoteStoreSettings()).thenReturn(remoteStoreSettings); if (testUploadTimeout) { when(remoteStoreSettings.getClusterRemoteSegmentTransferTimeout()).thenReturn(TimeValue.timeValueMillis(10)); - doAnswer(invocation -> { + Answer answer = invocation -> { ActionListener actionListener = invocation.getArgument(5); indexShard.getThreadPool().executor(ThreadPool.Names.GENERIC).execute(() -> { try { @@ -833,7 +835,11 @@ private Tuple mockIn actionListener.onResponse(null); }); return true; - }).when(remoteDirectory).copyFrom(any(), any(), any(), any(), any(), any(ActionListener.class), any(Boolean.class)); + }; + doAnswer(answer).when(remoteDirectory) + .copyFrom(any(), any(), any(), any(), any(), any(ActionListener.class), any(Boolean.class)); + + doAnswer(answer).when(remoteDirectory).copyFrom(any(), any(), any(), any()); } RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener( From b1069f8c581c2ac38fb30f8049aeac97ed076bca Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Tue, 19 Aug 2025 13:57:57 +0530 Subject: [PATCH 6/8] Make sure to close IndexInput on exception Signed-off-by: Sachin Kale --- .../index/store/RemoteDirectory.java | 134 +++++++++--------- 1 file changed, 70 insertions(+), 64 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index c4ad060c10711..d4ad8a91afb47 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -398,75 +398,81 @@ private void uploadBlob( long expectedChecksum = calculateChecksumOfChecksum(from, src); long contentLength; IndexInput indexInput = from.openInput(src, ioContext); - contentLength = indexInput.length(); - boolean remoteIntegrityEnabled = false; - if (getBlobContainer() instanceof AsyncMultiStreamBlobContainer) { - remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) getBlobContainer()).remoteIntegrityCheckSupported(); - } - lowPriorityUpload = lowPriorityUpload || contentLength > ByteSizeUnit.GB.toBytes(15); - RemoteTransferContainer.OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier; - - if (lowPriorityUpload) { - offsetRangeInputStreamSupplier = (size, position) -> lowPriorityUploadRateLimiter.apply( - new OffsetRangeIndexInputStream(indexInput.clone(), size, position) - ); - } else { - offsetRangeInputStreamSupplier = (size, position) -> uploadRateLimiter.apply( - new OffsetRangeIndexInputStream(indexInput.clone(), size, position) - ); - } - RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( - src, - remoteFileName, - contentLength, - true, - lowPriorityUpload ? WritePriority.LOW : WritePriority.NORMAL, - offsetRangeInputStreamSupplier, - expectedChecksum, - remoteIntegrityEnabled - ); - ActionListener completionListener = ActionListener.wrap(resp -> { - try { - postUploadRunner.run(); - listener.onResponse(null); - } catch (Exception e) { - logger.error(() -> new ParameterizedMessage("Exception in segment postUpload for file [{}]", src), e); - listener.onFailure(e); - } - }, ex -> { - logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", src), ex); - IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(ex); - if (corruptIndexException != null) { - listener.onFailure(corruptIndexException); - return; - } - Throwable throwable = ExceptionsHelper.unwrap(ex, CorruptFileException.class); - if (throwable != null) { - CorruptFileException corruptFileException = (CorruptFileException) throwable; - listener.onFailure(new CorruptIndexException(corruptFileException.getMessage(), corruptFileException.getFileName())); - return; + try { + contentLength = indexInput.length(); + boolean remoteIntegrityEnabled = false; + if (getBlobContainer() instanceof AsyncMultiStreamBlobContainer) { + remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) getBlobContainer()).remoteIntegrityCheckSupported(); } - listener.onFailure(ex); - }); + lowPriorityUpload = lowPriorityUpload || contentLength > ByteSizeUnit.GB.toBytes(15); + RemoteTransferContainer.OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier; - completionListener = ActionListener.runBefore(completionListener, () -> { - try { - remoteTransferContainer.close(); - } catch (Exception e) { - logger.warn("Error occurred while closing streams", e); + if (lowPriorityUpload) { + offsetRangeInputStreamSupplier = (size, position) -> lowPriorityUploadRateLimiter.apply( + new OffsetRangeIndexInputStream(indexInput.clone(), size, position) + ); + } else { + offsetRangeInputStreamSupplier = (size, position) -> uploadRateLimiter.apply( + new OffsetRangeIndexInputStream(indexInput.clone(), size, position) + ); } - }); + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + src, + remoteFileName, + contentLength, + true, + lowPriorityUpload ? WritePriority.LOW : WritePriority.NORMAL, + offsetRangeInputStreamSupplier, + expectedChecksum, + remoteIntegrityEnabled + ); + ActionListener completionListener = ActionListener.wrap(resp -> { + try { + postUploadRunner.run(); + listener.onResponse(null); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Exception in segment postUpload for file [{}]", src), e); + listener.onFailure(e); + } + }, ex -> { + logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", src), ex); + IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(ex); + if (corruptIndexException != null) { + listener.onFailure(corruptIndexException); + return; + } + Throwable throwable = ExceptionsHelper.unwrap(ex, CorruptFileException.class); + if (throwable != null) { + CorruptFileException corruptFileException = (CorruptFileException) throwable; + listener.onFailure(new CorruptIndexException(corruptFileException.getMessage(), corruptFileException.getFileName())); + return; + } + listener.onFailure(ex); + }); - completionListener = ActionListener.runAfter(completionListener, () -> { - try { - indexInput.close(); - } catch (IOException e) { - logger.warn("Error occurred while closing index input", e); - } - }); + completionListener = ActionListener.runBefore(completionListener, () -> { + try { + remoteTransferContainer.close(); + } catch (Exception e) { + logger.warn("Error occurred while closing streams", e); + } + }); - WriteContext writeContext = remoteTransferContainer.createWriteContext(); - ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(writeContext, completionListener); + completionListener = ActionListener.runAfter(completionListener, () -> { + try { + indexInput.close(); + } catch (IOException e) { + logger.warn("Error occurred while closing index input", e); + } + }); + + WriteContext writeContext = remoteTransferContainer.createWriteContext(); + ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(writeContext, completionListener); + } catch(Exception e) { + logger.warn("Exception while calling asyncBlobUpload, closing IndexInput to avoid leak"); + indexInput.close(); + throw e; + } } private long calculateChecksumOfChecksum(Directory directory, String file) throws IOException { From 93b9237533d1db47f7170a64f823e2aef5ecc8ca Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Tue, 19 Aug 2025 14:16:18 +0530 Subject: [PATCH 7/8] Fix spotless errors Signed-off-by: Sachin Kale --- .../main/java/org/opensearch/index/store/RemoteDirectory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index d4ad8a91afb47..25ee020de8562 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -468,7 +468,7 @@ private void uploadBlob( WriteContext writeContext = remoteTransferContainer.createWriteContext(); ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(writeContext, completionListener); - } catch(Exception e) { + } catch (Exception e) { logger.warn("Exception while calling asyncBlobUpload, closing IndexInput to avoid leak"); indexInput.close(); throw e; From c4cf7f45fabf81b631d003f2d67acc46eb66bab4 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Tue, 19 Aug 2025 14:17:47 +0530 Subject: [PATCH 8/8] Address PR comments Signed-off-by: Sachin Kale --- .../opensearch/index/store/RemoteSegmentStoreDirectory.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index aa47125b5ff19..af8382e2a3154 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.index.SegmentInfos; @@ -600,7 +601,7 @@ public void copyFrom(Directory from, String src, IOContext context, ActionListen try { final String remoteFileName = getNewRemoteSegmentFilename(src); boolean uploaded = false; - if (src.startsWith("segments_") == false) { + if (src.startsWith(IndexFileNames.SEGMENTS) == false) { uploaded = remoteDataDirectory.copyFrom(from, src, remoteFileName, context, () -> { try { postUpload(from, src, remoteFileName, getChecksumOfLocalFile(from, src));