Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,9 @@
import org.elasticsearch.common.compress.DeflateCompressor;
import org.elasticsearch.common.compress.NotXContentException;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.BufferedStreamOutput;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.TruncatedOutputStream;
Expand Down Expand Up @@ -148,14 +147,14 @@
import org.elasticsearch.xcontent.XContentType;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -1718,28 +1717,35 @@ void writeTo(StreamOutput out) throws IOException {
* need no further synchronization.
* </p>
*/
private final BytesStreamOutput shardDeleteResults;
private final RecyclerBytesStreamOutput shardDeleteResults;
private final TruncatedOutputStream truncatedShardDeleteResultsOutputStream;
private final StreamOutput compressed;

private int resultsCount = 0;
private int leakedBlobsCount = 0;
private final ArrayList<Closeable> resources = new ArrayList<>();
private final ArrayDeque<Closeable> resources = new ArrayDeque<>(3);
private final ShardGenerations.Builder shardGenerationsBuilder = ShardGenerations.builder();

ShardBlobsToDelete() {
this.shardDeleteResults = new ReleasableBytesStreamOutput(bigArrays);
this.truncatedShardDeleteResultsOutputStream = new TruncatedOutputStream(
new BufferedOutputStream(
boolean success = false;
try {
this.shardDeleteResults = new RecyclerBytesStreamOutput(bigArrays.bytesRefRecycler());
resources.addFirst(LeakTracker.wrap(shardDeleteResults));
this.truncatedShardDeleteResultsOutputStream = new TruncatedOutputStream(
new DeflaterOutputStream(Streams.flushOnCloseStream(shardDeleteResults)),
DeflateCompressor.BUFFER_SIZE
),
shardDeleteResults::size,
maxHeapSizeForSnapshotDeletion
);
this.compressed = new OutputStreamStreamOutput(this.truncatedShardDeleteResultsOutputStream);
resources.add(compressed);
resources.add(LeakTracker.wrap((Releasable) shardDeleteResults));
shardDeleteResults::size,
maxHeapSizeForSnapshotDeletion
);
final var buffer = bigArrays.bytesRefRecycler().obtain();
resources.addFirst(buffer);
this.compressed = new BufferedStreamOutput(this.truncatedShardDeleteResultsOutputStream, buffer.v());
resources.addFirst(compressed);
success = true;
} finally {
if (success == false) {
close();
}
}
}

synchronized void addShardDeleteResult(
Expand Down