diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 5db4ee1949aca..ae2787b930681 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -65,10 +65,9 @@ import org.elasticsearch.common.compress.DeflateCompressor; import org.elasticsearch.common.compress.NotXContentException; import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.BufferedStreamOutput; import org.elasticsearch.common.io.stream.InputStreamStreamInput; -import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; -import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; +import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.TruncatedOutputStream; @@ -148,7 +147,6 @@ import org.elasticsearch.xcontent.XContentType; import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; import java.io.Closeable; import java.io.FilterInputStream; import java.io.IOException; @@ -156,6 +154,7 @@ import java.io.OutputStream; import java.io.UncheckedIOException; import java.nio.file.NoSuchFileException; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -1718,28 +1717,35 @@ void writeTo(StreamOutput out) throws IOException { * need no further synchronization. *

*/ - private final BytesStreamOutput shardDeleteResults; + private final RecyclerBytesStreamOutput shardDeleteResults; private final TruncatedOutputStream truncatedShardDeleteResultsOutputStream; private final StreamOutput compressed; private int resultsCount = 0; private int leakedBlobsCount = 0; - private final ArrayList resources = new ArrayList<>(); + private final ArrayDeque resources = new ArrayDeque<>(3); private final ShardGenerations.Builder shardGenerationsBuilder = ShardGenerations.builder(); ShardBlobsToDelete() { - this.shardDeleteResults = new ReleasableBytesStreamOutput(bigArrays); - this.truncatedShardDeleteResultsOutputStream = new TruncatedOutputStream( - new BufferedOutputStream( + boolean success = false; + try { + this.shardDeleteResults = new RecyclerBytesStreamOutput(bigArrays.bytesRefRecycler()); + resources.addFirst(LeakTracker.wrap(shardDeleteResults)); + this.truncatedShardDeleteResultsOutputStream = new TruncatedOutputStream( new DeflaterOutputStream(Streams.flushOnCloseStream(shardDeleteResults)), - DeflateCompressor.BUFFER_SIZE - ), - shardDeleteResults::size, - maxHeapSizeForSnapshotDeletion - ); - this.compressed = new OutputStreamStreamOutput(this.truncatedShardDeleteResultsOutputStream); - resources.add(compressed); - resources.add(LeakTracker.wrap((Releasable) shardDeleteResults)); + shardDeleteResults::size, + maxHeapSizeForSnapshotDeletion + ); + final var buffer = bigArrays.bytesRefRecycler().obtain(); + resources.addFirst(buffer); + this.compressed = new BufferedStreamOutput(this.truncatedShardDeleteResultsOutputStream, buffer.v()); + resources.addFirst(compressed); + success = true; + } finally { + if (success == false) { + close(); + } + } } synchronized void addShardDeleteResult(