Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -392,11 +391,7 @@ private ReleasableBytesReference maybeSerializeClusterState(
assert clusterState.nodes().isLocalNodeElectedMaster();

try (var bytesStream = transportService.newNetworkBytesStream()) {
try (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised no tests needed updating? I would have assumed there was a test for the branch where this try block failed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow. This is just removing the now-unnecessary OutputStreamStreamOutput wrapper, there's no change in failure handling or anything here is there?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original code was:

try (var bytesStream = transportService.newNetworkBytesStream()) {
    try (
        var stream = new OutputStreamStreamOutput(
            CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStream))
        )
    ) {
        stream.setTransportVersion(version);
        clusterState.writeTo(stream);
    } catch (IOException e) {
        throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", e, discoveryNode);
    }
    ...

I would have expected there to be a test for the ElasticsearchException that used mocklog or equivalent to throw an exception when the OutputStreamStreamOutput was created. Since we've changed this now to:

var stream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(Streams.flushOnCloseStream(bytesStream)))

I suspected that any test similar to the above would need to be updated. Checking JoinValidationServiceTests here yields no test for this code branch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating an OutputStreamStreamOutput throws no exceptions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake - I didn't check the constructor for OutputStreamStreamOutput. I didn't realise this try/catch block was only for auto-closable, I thought it might be protecting against an IOException too.

As a note, I still don't think the ElasticsearchException is tested, since I cannot find any reference to it in the JoinValidationServiceTests, nor the message failed to serialize cluster state for publishing to node anywhere else in the Elasticsearch codebase, but that isn't relevant to this change

var stream = new OutputStreamStreamOutput(
CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStream))
)
) {
try (var stream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(Streams.flushOnCloseStream(bytesStream))) {
stream.setTransportVersion(version);
clusterState.writeTo(stream);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.PositionTrackingOutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -252,11 +251,7 @@ public PublicationContext newPublicationContext(ClusterStatePublicationEvent clu
private ReleasableBytesReference serializeFullClusterState(ClusterState clusterState, DiscoveryNode node, TransportVersion version) {
try (RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream()) {
final long uncompressedBytes;
try (
StreamOutput stream = new PositionTrackingOutputStreamStreamOutput(
CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStream))
)
) {
try (StreamOutput stream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(Streams.flushOnCloseStream(bytesStream))) {
stream.setTransportVersion(version);
stream.writeBoolean(true);
clusterState.writeTo(stream);
Expand Down Expand Up @@ -285,11 +280,7 @@ private ReleasableBytesReference serializeDiffClusterState(
final long clusterStateVersion = newState.version();
try (RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream()) {
final long uncompressedBytes;
try (
StreamOutput stream = new PositionTrackingOutputStreamStreamOutput(
CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStream))
)
) {
try (StreamOutput stream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(Streams.flushOnCloseStream(bytesStream))) {
stream.setTransportVersion(version);
stream.writeBoolean(false);
diff.writeTo(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public CompressedXContent(ToXContent xcontent, ToXContent.Params params) throws
BytesStreamOutput bStream = baos.get();
try {
OutputStream checkedStream = new DigestOutputStream(
CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream),
CompressorFactory.COMPRESSOR.threadLocalStreamOutput(bStream),
messageDigest
);
try (XContentBuilder builder = XContentFactory.jsonBuilder(checkedStream)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.BufferedInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -48,12 +49,12 @@ public int read() throws IOException {
InputStream threadLocalInputStream(InputStream in) throws IOException;

/**
* Creates a new output stream that compresses the contents and writes to the provided output stream.
* Closing the returned {@link OutputStream} will close the provided output stream.
* Creates a new {@link StreamOutput} that compresses the contents and writes to the provided output stream.
* Closing the returned {@link StreamOutput} will close the delegate {@code out} stream.
* Note: The returned stream may only be used on the thread that created it as it might use thread-local resources and must be safely
* closed after use
*/
OutputStream threadLocalOutputStream(OutputStream out) throws IOException;
StreamOutput threadLocalStreamOutput(OutputStream out) throws IOException;

/**
* Decompress bytes into a newly allocated buffer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@

package org.elasticsearch.common.compress;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BufferedStreamOutput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Streams;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -171,7 +173,7 @@ public void close() throws IOException {
}

@Override
public OutputStream threadLocalOutputStream(OutputStream out) throws IOException {
public StreamOutput threadLocalStreamOutput(OutputStream out) throws IOException {
out.write(HEADER);
final ReleasableReference<Deflater> current = deflaterForStreamRef.get();
final Releasable releasable;
Expand All @@ -191,13 +193,13 @@ public void close() throws IOException {
try {
super.close();
} finally {
// We are ensured to only call this once since we wrap this stream in a BufferedOutputStream that will only close
// We are ensured to only call this once since we wrap this stream in a BufferedStreamOutput that will only close
// its delegate once below
releasable.close();
}
}
};
return new BufferedOutputStream(deflaterOutputStream, BUFFER_SIZE);
return new BufferedStreamOutput(deflaterOutputStream, new BytesRef(new byte[BUFFER_SIZE], 0, BUFFER_SIZE));
}

private static final ThreadLocal<BytesStreamOutput> baos = ThreadLocal.withInitial(BytesStreamOutput::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class BufferedStreamOutput extends StreamOutput {
private final int endPosition;
private int position;
private long flushedBytes;
private boolean isClosed;

/**
* Wrap the given stream, using the given {@link BytesRef} for the buffer. It is the caller's responsibility to make sure that nothing
Expand Down Expand Up @@ -129,8 +130,11 @@ private boolean assertTrashBuffer() {

@Override
public void close() throws IOException {
flush();
delegate.close();
if (isClosed == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a change of behaviour. Can it be documented in a Javadoc please? (Either at the method or class level)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually the expected behaviour of close() - see java.io.Closeable#close. Turns out we hardly use or test that fact.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, interesting! Is this worth adding to a github issue or other relevant documentation to track this testing gap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we'd ever get around to acting on such an issue tbh, it's too broad. I think it's enough to have assertions etc in the places where it actually matters.

isClosed = true;
flush();
delegate.close();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public abstract class BytesStream extends StreamOutput {
public void writeWithSizePrefix(Writeable writeable) throws IOException {
long pos = position();
seek(pos + Integer.BYTES);
try (var out = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.noCloseStream(this)))) {
try (var out = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(Streams.noCloseStream(this))) {
out.setTransportVersion(getTransportVersion());
writeable.writeTo(out);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public T expand() {
public Serialized<T> asSerialized(Reader<T> reader, NamedWriteableRegistry registry) {
// TODO: this path is currently not used in production code, if it ever is this should start using pooled buffers
BytesStreamOutput buffer = new BytesStreamOutput();
try (var out = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(buffer))) {
try (var out = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(buffer)) {
out.setTransportVersion(TransportVersion.current());
reference.writeTo(out);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void legacyWriteWithSizePrefix(Writeable writeable) throws IOException {
*/
public void writeWithSizePrefix(Writeable writeable) throws IOException {
final BytesStreamOutput tmp = new BytesStreamOutput();
try (var o = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(tmp))) {
try (var o = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(tmp)) {
o.setTransportVersion(version);
writeable.writeTo(o);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1273,7 +1273,7 @@ private void addGlobalMetadataDocuments(Metadata metadata) throws IOException {
private void writePages(ToXContent metadata, PageWriter pageWriter) throws IOException {
try (
PageWriterOutputStream paginatedStream = new PageWriterOutputStream(documentBuffer, pageWriter);
OutputStream compressedStream = CompressorFactory.COMPRESSOR.threadLocalOutputStream(paginatedStream);
OutputStream compressedStream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(paginatedStream);
XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.SMILE, compressedStream)
) {
xContentBuilder.startObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ public void close() {
};
XContentBuilder builder = XContentFactory.contentBuilder(
XContentType.SMILE,
compress ? CompressorFactory.COMPRESSOR.threadLocalOutputStream(indexOutputOutputStream) : indexOutputOutputStream
compress ? CompressorFactory.COMPRESSOR.threadLocalStreamOutput(indexOutputOutputStream) : indexOutputOutputStream
)
) {
ToXContent.Params params = extraParams.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,7 @@ private static BytesReference serializeMessageBody(
private static StreamOutput wrapCompressed(Compression.Scheme compressionScheme, RecyclerBytesStreamOutput bytesStream)
throws IOException {
if (compressionScheme == Compression.Scheme.DEFLATE) {
return new OutputStreamStreamOutput(
CompressorFactory.COMPRESSOR.threadLocalOutputStream(org.elasticsearch.core.Streams.noCloseStream(bytesStream))
);
return CompressorFactory.COMPRESSOR.threadLocalStreamOutput(Streams.noCloseStream(bytesStream));
} else if (compressionScheme == Compression.Scheme.LZ4) {
return new OutputStreamStreamOutput(Compression.Scheme.lz4OutputStream(Streams.noCloseStream(bytesStream)));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistryTests;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -427,9 +426,7 @@ public void testJoinValidationRejectsMismatchedClusterUUID() throws IOException
private static BytesTransportRequest serializeClusterState(ClusterState clusterState) {
try (
var bytesStream = new BytesStreamOutput();
var compressedStream = new OutputStreamStreamOutput(
CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStream))
)
var compressedStream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(Streams.flushOnCloseStream(bytesStream))
) {
compressedStream.setTransportVersion(TransportVersion.current());
clusterState.writeTo(compressedStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ private void doTest(byte bytes[]) throws IOException {
int postpadding = r.nextInt(70000);
byte[] buffer = new byte[prepadding + bufferSize + postpadding];
int len;
try (OutputStream os = c.threadLocalOutputStream(bos)) {
try (OutputStream os = c.threadLocalStreamOutput(bos)) {
r.nextBytes(buffer); // fill block completely with junk
while ((len = rawIn.read(buffer, prepadding, bufferSize)) != -1) {
os.write(buffer, prepadding, len);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ public void testRandom() throws IOException {
public void testDifferentCompressedRepresentation() throws Exception {
byte[] b = "---\nf:abcdefghijabcdefghij".getBytes(StandardCharsets.UTF_8);
BytesStreamOutput bout = new BytesStreamOutput();
try (OutputStream out = compressor.threadLocalOutputStream(bout)) {
try (OutputStream out = compressor.threadLocalStreamOutput(bout)) {
out.write(b);
out.flush();
out.write(b);
}
final BytesReference b1 = bout.bytes();

bout = new BytesStreamOutput();
try (OutputStream out = compressor.threadLocalOutputStream(bout)) {
try (OutputStream out = compressor.threadLocalStreamOutput(bout)) {
out.write(b);
out.write(b);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
import org.elasticsearch.test.ESTestCase;

import java.io.ByteArrayOutputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

Expand Down Expand Up @@ -136,4 +139,35 @@ public void write(byte[] b, int off, int len) {
}
assertArrayEquals("wrote out of bounds", bufferPoolCopy, bufferPool);
}

public void testDoubleClose() throws IOException {
var buffered = new BufferedStreamOutput(new AssertClosedOnceOutputStream(), new BytesRef(new byte[10], 0, 10));
buffered.close();
buffered.close();
}

public void testDoubleCloseInTryWithResources() throws IOException {
try (
var buffered = new BufferedStreamOutput(new AssertClosedOnceOutputStream(), new BytesRef(new byte[10], 0, 10));
var wrapper = new FilterOutputStream(buffered)
) {
if (randomBoolean()) {
wrapper.write(0);
}
// {wrapper} is closed first, and propagates the close to {buffered}, but it's a separate resource so {buffered} is then closed
// again
}
}

private static class AssertClosedOnceOutputStream extends OutputStream {
private final AtomicBoolean isClosed = new AtomicBoolean();

@Override
public void write(int b) {}

@Override
public void close() {
assertTrue(isClosed.compareAndSet(false, true));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void testStoredValue() throws IOException {

// case 2: a value that looks compressed: this used to fail in 1.x
BytesStreamOutput out = new BytesStreamOutput();
try (OutputStream compressed = CompressorFactory.COMPRESSOR.threadLocalOutputStream(out)) {
try (OutputStream compressed = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(out)) {
new BytesArray(binaryValue1).writeTo(compressed);
}
final byte[] binaryValue2 = BytesReference.toBytes(out.bytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.recycler.Recycler;
Expand All @@ -35,7 +34,7 @@ public class DeflateTransportDecompressorTests extends ESTestCase {
public void testSimpleCompression() throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
byte randomByte = randomByte();
try (OutputStream deflateStream = CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(output))) {
try (OutputStream deflateStream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(Streams.flushOnCloseStream(output))) {
deflateStream.write(randomByte);
}

Expand All @@ -54,11 +53,7 @@ public void testSimpleCompression() throws IOException {

public void testMultiPageCompression() throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
try (
StreamOutput deflateStream = new OutputStreamStreamOutput(
CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(output))
)
) {
try (StreamOutput deflateStream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(Streams.flushOnCloseStream(output))) {
for (int i = 0; i < 10000; ++i) {
deflateStream.writeInt(i);
}
Expand Down Expand Up @@ -86,11 +81,7 @@ public void testMultiPageCompression() throws IOException {

public void testIncrementalMultiPageCompression() throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
try (
StreamOutput deflateStream = new OutputStreamStreamOutput(
CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(output))
)
) {
try (StreamOutput deflateStream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(Streams.flushOnCloseStream(output))) {
for (int i = 0; i < 10000; ++i) {
deflateStream.writeInt(i);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,7 @@ private void addResultFieldAndFinish(Writeable response, XContentBuilder source)
os = Streams.noCloseStream(os);
TransportVersion minNodeVersion = clusterService.state().getMinTransportVersion();
TransportVersion.writeVersion(minNodeVersion, new OutputStreamStreamOutput(os));
os = CompressorFactory.COMPRESSOR.threadLocalOutputStream(os);
try (OutputStreamStreamOutput out = new OutputStreamStreamOutput(os)) {
try (var out = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(os)) {
out.setTransportVersion(minNodeVersion);
response.writeTo(out);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void doAdd(Collection<MonitoringDoc> docs) throws ExportException {
if (docs != null && docs.isEmpty() == false) {
final BytesStreamOutput scratch = new BytesStreamOutput();
final CountingOutputStream countingStream;
try (OutputStream payload = CompressorFactory.COMPRESSOR.threadLocalOutputStream(scratch)) {
try (OutputStream payload = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(scratch)) {
countingStream = new CountingOutputStream(payload);
for (MonitoringDoc monitoringDoc : docs) {
writeDocument(monitoringDoc, countingStream);
Expand Down
Loading