Skip to content

Commit de6ed75

Browse files
authored
Add CompressibleBytesOutputStream for compression
This is a follow-up to #23941. Currently there are a number of complexities related to compression. The raw DeflaterOutputStream must be closed prior to sending bytes to ensure that EOS bytes are written. But the underlying ReleasableBytesStreamOutput cannot be closed until the bytes are sent to ensure that the bytes are not reused. Right now we have three different stream references hanging around in TCPTransport to handle this complexity. This commit introduces CompressibleBytesOutputStream to be one stream implemenation that will behave properly with or without compression enabled. Relates #27540
1 parent 7fd8494 commit de6ed75

File tree

3 files changed

+257
-30
lines changed

3 files changed

+257
-30
lines changed
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.transport;
21+
22+
import org.apache.lucene.util.IOUtils;
23+
import org.elasticsearch.common.bytes.BytesReference;
24+
import org.elasticsearch.common.compress.CompressorFactory;
25+
import org.elasticsearch.common.io.Streams;
26+
import org.elasticsearch.common.io.stream.BytesStream;
27+
import org.elasticsearch.common.io.stream.StreamOutput;
28+
import org.elasticsearch.common.lease.Releasable;
29+
30+
import java.io.IOException;
31+
import java.util.zip.DeflaterOutputStream;
32+
33+
/**
34+
* This class exists to provide a stream with optional compression. This is useful as using compression
35+
* requires that the underlying {@link DeflaterOutputStream} be closed to write EOS bytes. However, the
36+
* {@link BytesStream} should not be closed yet, as we have not used the bytes. This class handles these
37+
* intricacies.
38+
*
39+
* {@link CompressibleBytesOutputStream#materializeBytes()} should be called when all the bytes have been
40+
* written to this stream. If compression is enabled, the proper EOS bytes will be written at that point.
41+
* The underlying {@link BytesReference} will be returned.
42+
*
43+
* {@link CompressibleBytesOutputStream#close()} should be called when the bytes are no longer needed and
44+
* can be safely released.
45+
*/
46+
final class CompressibleBytesOutputStream extends StreamOutput implements Releasable {
47+
48+
private final StreamOutput stream;
49+
private final BytesStream bytesStreamOutput;
50+
private final boolean shouldCompress;
51+
52+
CompressibleBytesOutputStream(BytesStream bytesStreamOutput, boolean shouldCompress) throws IOException {
53+
this.bytesStreamOutput = bytesStreamOutput;
54+
this.shouldCompress = shouldCompress;
55+
if (shouldCompress) {
56+
this.stream = CompressorFactory.COMPRESSOR.streamOutput(Streams.flushOnCloseStream(bytesStreamOutput));
57+
} else {
58+
this.stream = bytesStreamOutput;
59+
}
60+
}
61+
62+
/**
63+
* This method ensures that compression is complete and returns the underlying bytes.
64+
*
65+
* @return bytes underlying the stream
66+
* @throws IOException if an exception occurs when writing or flushing
67+
*/
68+
BytesReference materializeBytes() throws IOException {
69+
// If we are using compression the stream needs to be closed to ensure that EOS marker bytes are written.
70+
// The actual ReleasableBytesStreamOutput will not be closed yet as it is wrapped in flushOnCloseStream when
71+
// passed to the deflater stream.
72+
if (shouldCompress) {
73+
stream.close();
74+
}
75+
76+
return bytesStreamOutput.bytes();
77+
}
78+
79+
@Override
80+
public void writeByte(byte b) throws IOException {
81+
stream.write(b);
82+
}
83+
84+
@Override
85+
public void writeBytes(byte[] b, int offset, int length) throws IOException {
86+
stream.writeBytes(b, offset, length);
87+
}
88+
89+
@Override
90+
public void flush() throws IOException {
91+
stream.flush();
92+
}
93+
94+
@Override
95+
public void close() {
96+
if (stream == bytesStreamOutput) {
97+
assert shouldCompress == false : "If the streams are the same we should not be compressing";
98+
IOUtils.closeWhileHandlingException(stream);
99+
} else {
100+
assert shouldCompress : "If the streams are different we should be compressing";
101+
IOUtils.closeWhileHandlingException(stream, bytesStreamOutput);
102+
}
103+
}
104+
105+
@Override
106+
public void reset() throws IOException {
107+
stream.reset();
108+
}
109+
}

core/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.elasticsearch.common.compress.Compressor;
4141
import org.elasticsearch.common.compress.CompressorFactory;
4242
import org.elasticsearch.common.compress.NotCompressedException;
43-
import org.elasticsearch.common.io.Streams;
4443
import org.elasticsearch.common.io.stream.BytesStreamOutput;
4544
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
4645
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -1094,18 +1093,18 @@ private void sendRequestToChannel(final DiscoveryNode node, final Channel target
10941093
if (compress) {
10951094
options = TransportRequestOptions.builder(options).withCompress(true).build();
10961095
}
1096+
1097+
// only compress if asked and the request is not bytes. Otherwise only
1098+
// the header part is compressed, and the "body" can't be extracted as compressed
1099+
final boolean compressMessage = options.compress() && canCompress(request);
1100+
10971101
status = TransportStatus.setRequest(status);
10981102
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
1099-
// we wrap this in a release once since if the onRequestSent callback throws an exception
1100-
// we might release things twice and this should be prevented
1101-
final Releasable toRelease = Releasables.releaseOnce(() -> Releasables.close(bStream.bytes()));
1102-
StreamOutput stream = Streams.flushOnCloseStream(bStream);
1103+
final CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, compressMessage);
1104+
boolean addedReleaseListener = false;
11031105
try {
1104-
// only compress if asked, and, the request is not bytes, since then only
1105-
// the header part is compressed, and the "body" can't be extracted as compressed
1106-
if (options.compress() && canCompress(request)) {
1106+
if (compressMessage) {
11071107
status = TransportStatus.setCompress(status);
1108-
stream = CompressorFactory.COMPRESSOR.streamOutput(stream);
11091108
}
11101109

11111110
// we pick the smallest of the 2, to support both backward and forward compatibility
@@ -1116,14 +1115,17 @@ private void sendRequestToChannel(final DiscoveryNode node, final Channel target
11161115
stream.setVersion(version);
11171116
threadPool.getThreadContext().writeTo(stream);
11181117
stream.writeString(action);
1119-
BytesReference message = buildMessage(requestId, status, node.getVersion(), request, stream, bStream);
1118+
BytesReference message = buildMessage(requestId, status, node.getVersion(), request, stream);
11201119
final TransportRequestOptions finalOptions = options;
11211120
// this might be called in a different thread
1122-
SendListener onRequestSent = new SendListener(toRelease,
1123-
() -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions));
1121+
SendListener onRequestSent = new SendListener(stream,
1122+
() -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions));
11241123
internalSendMessage(targetChannel, message, onRequestSent);
1124+
addedReleaseListener = true;
11251125
} finally {
1126-
IOUtils.close(stream);
1126+
if (!addedReleaseListener) {
1127+
IOUtils.close(stream);
1128+
}
11271129
}
11281130
}
11291131

@@ -1185,26 +1187,26 @@ private void sendResponse(Version nodeVersion, Channel channel, final TransportR
11851187
}
11861188
status = TransportStatus.setResponse(status); // TODO share some code with sendRequest
11871189
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
1188-
// we wrap this in a release once since if the onRequestSent callback throws an exception
1189-
// we might release things twice and this should be prevented
1190-
final Releasable toRelease = Releasables.releaseOnce(() -> Releasables.close(bStream.bytes()));
1191-
StreamOutput stream = Streams.flushOnCloseStream(bStream);
1190+
CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, options.compress());
1191+
boolean addedReleaseListener = false;
11921192
try {
11931193
if (options.compress()) {
11941194
status = TransportStatus.setCompress(status);
1195-
stream = CompressorFactory.COMPRESSOR.streamOutput(stream);
11961195
}
11971196
threadPool.getThreadContext().writeTo(stream);
11981197
stream.setVersion(nodeVersion);
1199-
BytesReference reference = buildMessage(requestId, status, nodeVersion, response, stream, bStream);
1198+
BytesReference reference = buildMessage(requestId, status, nodeVersion, response, stream);
12001199

12011200
final TransportResponseOptions finalOptions = options;
12021201
// this might be called in a different thread
1203-
SendListener listener = new SendListener(toRelease,
1204-
() -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions));
1202+
SendListener listener = new SendListener(stream,
1203+
() -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions));
12051204
internalSendMessage(channel, reference, listener);
1205+
addedReleaseListener = true;
12061206
} finally {
1207-
IOUtils.close(stream);
1207+
if (!addedReleaseListener) {
1208+
IOUtils.close(stream);
1209+
}
12081210
}
12091211
}
12101212

@@ -1231,8 +1233,8 @@ final BytesReference buildHeader(long requestId, byte status, Version protocolVe
12311233
/**
12321234
* Serializes the given message into a bytes representation
12331235
*/
1234-
private BytesReference buildMessage(long requestId, byte status, Version nodeVersion, TransportMessage message, StreamOutput stream,
1235-
ReleasableBytesStreamOutput writtenBytes) throws IOException {
1236+
private BytesReference buildMessage(long requestId, byte status, Version nodeVersion, TransportMessage message,
1237+
CompressibleBytesOutputStream stream) throws IOException {
12361238
final BytesReference zeroCopyBuffer;
12371239
if (message instanceof BytesTransportRequest) { // what a shitty optimization - we should use a direct send method instead
12381240
BytesTransportRequest bRequest = (BytesTransportRequest) message;
@@ -1243,12 +1245,12 @@ private BytesReference buildMessage(long requestId, byte status, Version nodeVer
12431245
message.writeTo(stream);
12441246
zeroCopyBuffer = BytesArray.EMPTY;
12451247
}
1246-
// we have to close the stream here - flush is not enough since we might be compressing the content
1247-
// and if we do that the close method will write some marker bytes (EOS marker) and otherwise
1248-
// we barf on the decompressing end when we read past EOF on purpose in the #validateRequest method.
1249-
// this might be a problem in deflate after all but it's important to close it for now.
1250-
stream.close();
1251-
final BytesReference messageBody = writtenBytes.bytes();
1248+
// we have to call materializeBytes() here before accessing the bytes. A CompressibleBytesOutputStream
1249+
// might be implementing compression. And materializeBytes() ensures that some marker bytes (EOS marker)
1250+
// are written. Otherwise we barf on the decompressing end when we read past EOF on purpose in the
1251+
// #validateRequest method. this might be a problem in deflate after all but it's important to write
1252+
// the marker bytes.
1253+
final BytesReference messageBody = stream.materializeBytes();
12521254
final BytesReference header = buildHeader(requestId, status, stream.getVersion(), messageBody.length() + zeroCopyBuffer.length());
12531255
return new CompositeBytesReference(header, messageBody, zeroCopyBuffer);
12541256
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.transport;
21+
22+
import org.elasticsearch.common.bytes.BytesReference;
23+
import org.elasticsearch.common.compress.CompressorFactory;
24+
import org.elasticsearch.common.io.stream.BytesStream;
25+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
26+
import org.elasticsearch.common.io.stream.StreamInput;
27+
import org.elasticsearch.test.ESTestCase;
28+
29+
import java.io.EOFException;
30+
import java.io.IOException;
31+
32+
public class CompressibleBytesOutputStreamTests extends ESTestCase {
33+
34+
public void testStreamWithoutCompression() throws IOException {
35+
BytesStream bStream = new ZeroOutOnCloseStream();
36+
CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, false);
37+
38+
byte[] expectedBytes = randomBytes(randomInt(30));
39+
stream.write(expectedBytes);
40+
41+
BytesReference bytesRef = stream.materializeBytes();
42+
43+
assertFalse(CompressorFactory.COMPRESSOR.isCompressed(bytesRef));
44+
45+
StreamInput streamInput = bytesRef.streamInput();
46+
byte[] actualBytes = new byte[expectedBytes.length];
47+
streamInput.readBytes(actualBytes, 0, expectedBytes.length);
48+
49+
assertEquals(-1, streamInput.read());
50+
assertArrayEquals(expectedBytes, actualBytes);
51+
stream.close();
52+
53+
// The bytes should be zeroed out on close
54+
for (byte b : bytesRef.toBytesRef().bytes) {
55+
assertEquals((byte) 0, b);
56+
}
57+
}
58+
59+
public void testStreamWithCompression() throws IOException {
60+
BytesStream bStream = new ZeroOutOnCloseStream();
61+
CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, true);
62+
63+
byte[] expectedBytes = randomBytes(randomInt(30));
64+
stream.write(expectedBytes);
65+
66+
BytesReference bytesRef = stream.materializeBytes();
67+
68+
assertTrue(CompressorFactory.COMPRESSOR.isCompressed(bytesRef));
69+
70+
StreamInput streamInput = CompressorFactory.COMPRESSOR.streamInput(bytesRef.streamInput());
71+
byte[] actualBytes = new byte[expectedBytes.length];
72+
streamInput.readBytes(actualBytes, 0, expectedBytes.length);
73+
74+
assertEquals(-1, streamInput.read());
75+
assertArrayEquals(expectedBytes, actualBytes);
76+
stream.close();
77+
78+
// The bytes should be zeroed out on close
79+
for (byte b : bytesRef.toBytesRef().bytes) {
80+
assertEquals((byte) 0, b);
81+
}
82+
}
83+
84+
public void testCompressionWithCallingMaterializeFails() throws IOException {
85+
BytesStream bStream = new ZeroOutOnCloseStream();
86+
CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, true);
87+
88+
byte[] expectedBytes = randomBytes(randomInt(30));
89+
stream.write(expectedBytes);
90+
91+
92+
StreamInput streamInput = CompressorFactory.COMPRESSOR.streamInput(bStream.bytes().streamInput());
93+
byte[] actualBytes = new byte[expectedBytes.length];
94+
EOFException e = expectThrows(EOFException.class, () -> streamInput.readBytes(actualBytes, 0, expectedBytes.length));
95+
assertEquals("Unexpected end of ZLIB input stream", e.getMessage());
96+
97+
stream.close();
98+
}
99+
100+
private static byte[] randomBytes(int length) {
101+
byte[] bytes = new byte[length];
102+
for (int i = 0; i < bytes.length; ++i) {
103+
bytes[i] = randomByte();
104+
}
105+
return bytes;
106+
}
107+
108+
private static class ZeroOutOnCloseStream extends BytesStreamOutput {
109+
110+
@Override
111+
public void close() {
112+
int size = (int) bytes.size();
113+
bytes.set(0, new byte[size], 0, size);
114+
}
115+
}
116+
}

0 commit comments

Comments
 (0)