From 3829a849dae4f67242cd6587f51143b3b7e23aea Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sat, 27 May 2017 15:16:15 -0500 Subject: [PATCH 01/10] Add first stream implementation --- .../transport/TCPTransportOutputStream.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 core/src/main/java/org/elasticsearch/transport/TCPTransportOutputStream.java diff --git a/core/src/main/java/org/elasticsearch/transport/TCPTransportOutputStream.java b/core/src/main/java/org/elasticsearch/transport/TCPTransportOutputStream.java new file mode 100644 index 0000000000000..102032a2ead03 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/transport/TCPTransportOutputStream.java @@ -0,0 +1,26 @@ + + +package org.elasticsearch.transport; + +import org.elasticsearch.common.compress.CompressorFactory; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lease.Releasable; + +import java.io.IOException; + +public class TCPTransportOutputStream extends BytesStreamOutput implements Releasable { + + private final ReleasableBytesStreamOutput bytesStreamOutput; + private final boolean shouldCompress; + private StreamOutput stream; + + public TCPTransportOutputStream(ReleasableBytesStreamOutput bytesStreamOutput, boolean shouldCompress) throws IOException { + this.bytesStreamOutput = bytesStreamOutput; + this.shouldCompress = shouldCompress; + if (shouldCompress) { + this.stream = CompressorFactory.COMPRESSOR.streamOutput(bytesStreamOutput); + } + } +} From 5e73e97c31cd3cdbcc95b5b551492518d8d5aedc Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sun, 28 May 2017 12:40:34 -0500 Subject: [PATCH 02/10] Use one stream in tcp transport --- .../CompressibleBytesOutputStream.java | 76 +++++++++++++++++++ .../transport/TCPTransportOutputStream.java | 26 ------- .../elasticsearch/transport/TcpTransport.java | 40 +++++----- 3 files changed, 94 insertions(+), 48 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java delete mode 100644 core/src/main/java/org/elasticsearch/transport/TCPTransportOutputStream.java diff --git a/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java b/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java new file mode 100644 index 0000000000000..68c0e71db8d69 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java @@ -0,0 +1,76 @@ + + +package org.elasticsearch.transport; + +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.CompressorFactory; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.stream.BytesStream; +import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.util.BigArrays; + +import java.io.IOException; + +public class CompressibleBytesOutputStream extends BytesStream implements Releasable { + + private final StreamOutput stream; + private final ReleasableBytesStreamOutput bytesStreamOutput; + private final boolean shouldCompress; + private boolean finishCalled = false; + + public CompressibleBytesOutputStream(BigArrays bigArrays, boolean shouldCompress) throws IOException { + bytesStreamOutput = new ReleasableBytesStreamOutput(bigArrays); + this.shouldCompress = shouldCompress; + if (shouldCompress) { + this.stream = CompressorFactory.COMPRESSOR.streamOutput(Streams.flushOnCloseStream(bytesStreamOutput)); + } else { + stream = bytesStreamOutput; + } + } + + public void finishStream() throws IOException { + this.finishCalled = true; + + if (shouldCompress) { + stream.close(); + } + } + + @Override + public BytesReference bytes() { + assert finishCalled : "Must call finishStream() before accessing underlying bytes"; + return bytesStreamOutput.bytes(); + } + + @Override + public void writeByte(byte b) throws IOException { + stream.write(b); + } + + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + stream.writeBytes(b, offset, length); + } + + @Override + public void flush() throws IOException { + stream.flush(); + } + + @Override + public void close() { + // If we are not using compression stream == bytesStreamOutput + if (shouldCompress) { + IOUtils.closeWhileHandlingException(stream); + } + bytesStreamOutput.close(); + } + + @Override + public void reset() throws IOException { + stream.reset(); + } +} diff --git a/core/src/main/java/org/elasticsearch/transport/TCPTransportOutputStream.java b/core/src/main/java/org/elasticsearch/transport/TCPTransportOutputStream.java deleted file mode 100644 index 102032a2ead03..0000000000000 --- a/core/src/main/java/org/elasticsearch/transport/TCPTransportOutputStream.java +++ /dev/null @@ -1,26 +0,0 @@ - - -package org.elasticsearch.transport; - -import org.elasticsearch.common.compress.CompressorFactory; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.lease.Releasable; - -import java.io.IOException; - -public class TCPTransportOutputStream extends BytesStreamOutput implements Releasable { - - private final ReleasableBytesStreamOutput bytesStreamOutput; - private final boolean shouldCompress; - private StreamOutput stream; - - public TCPTransportOutputStream(ReleasableBytesStreamOutput bytesStreamOutput, boolean shouldCompress) throws IOException { - this.bytesStreamOutput = bytesStreamOutput; - this.shouldCompress = shouldCompress; - if (shouldCompress) { - this.stream = CompressorFactory.COMPRESSOR.streamOutput(bytesStreamOutput); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 5713cc27c09c3..ed0b529b297a4 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.NotCompressedException; import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.stream.BytesStream; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -1032,15 +1033,13 @@ private void sendRequestToChannel(final DiscoveryNode node, final Channel target options = TransportRequestOptions.builder(options).withCompress(true).build(); } status = TransportStatus.setRequest(status); - ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); + final CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bigArrays, options.compress()); boolean addedReleaseListener = false; - StreamOutput stream = Streams.flushOnCloseStream(bStream); try { // only compress if asked, and, the request is not bytes, since then only // the header part is compressed, and the "body" can't be extracted as compressed if (options.compress() && canCompress(request)) { status = TransportStatus.setCompress(status); - stream = CompressorFactory.COMPRESSOR.streamOutput(stream); } // we pick the smallest of the 2, to support both backward and forward compatibility @@ -1051,18 +1050,17 @@ private void sendRequestToChannel(final DiscoveryNode node, final Channel target stream.setVersion(version); threadPool.getThreadContext().writeTo(stream); stream.writeString(action); - BytesReference message = buildMessage(requestId, status, node.getVersion(), request, stream, bStream); + BytesReference message = buildMessage(requestId, status, node.getVersion(), request, stream); final TransportRequestOptions finalOptions = options; - final StreamOutput finalStream = stream; // this might be called in a different thread SendListener onRequestSent = new SendListener( - () -> IOUtils.closeWhileHandlingException(finalStream, bStream), + () -> IOUtils.closeWhileHandlingException(stream), () -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions)); internalSendMessage(targetChannel, message, onRequestSent); addedReleaseListener = true; } finally { if (!addedReleaseListener) { - IOUtils.close(stream, bStream); + IOUtils.close(stream); } } } @@ -1124,28 +1122,25 @@ private void sendResponse(Version nodeVersion, Channel channel, final TransportR options = TransportResponseOptions.builder(options).withCompress(true).build(); } status = TransportStatus.setResponse(status); // TODO share some code with sendRequest - ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); + CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bigArrays, options.compress()); boolean addedReleaseListener = false; - StreamOutput stream = Streams.flushOnCloseStream(bStream); try { if (options.compress()) { status = TransportStatus.setCompress(status); - stream = CompressorFactory.COMPRESSOR.streamOutput(stream); } threadPool.getThreadContext().writeTo(stream); stream.setVersion(nodeVersion); - BytesReference reference = buildMessage(requestId, status, nodeVersion, response, stream, bStream); + BytesReference reference = buildMessage(requestId, status, nodeVersion, response, stream); final TransportResponseOptions finalOptions = options; - final StreamOutput finalStream = stream; // this might be called in a different thread - SendListener listener = new SendListener(() -> IOUtils.closeWhileHandlingException(finalStream, bStream), + SendListener listener = new SendListener(() -> IOUtils.closeWhileHandlingException(stream), () -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions)); internalSendMessage(channel, reference, listener); addedReleaseListener = true; } finally { if (!addedReleaseListener) { - IOUtils.close(stream, bStream); + IOUtils.close(stream); } } } @@ -1173,8 +1168,8 @@ final BytesReference buildHeader(long requestId, byte status, Version protocolVe /** * Serializes the given message into a bytes representation */ - private BytesReference buildMessage(long requestId, byte status, Version nodeVersion, TransportMessage message, StreamOutput stream, - ReleasableBytesStreamOutput writtenBytes) throws IOException { + private BytesReference buildMessage(long requestId, byte status, Version nodeVersion, TransportMessage message, + CompressibleBytesOutputStream stream) throws IOException { final BytesReference zeroCopyBuffer; if (message instanceof BytesTransportRequest) { // what a shitty optimization - we should use a direct send method instead BytesTransportRequest bRequest = (BytesTransportRequest) message; @@ -1185,12 +1180,13 @@ private BytesReference buildMessage(long requestId, byte status, Version nodeVer message.writeTo(stream); zeroCopyBuffer = BytesArray.EMPTY; } - // we have to close the stream here - flush is not enough since we might be compressing the content - // and if we do that the close method will write some marker bytes (EOS marker) and otherwise - // we barf on the decompressing end when we read past EOF on purpose in the #validateRequest method. - // this might be a problem in deflate after all but it's important to close it for now. - stream.close(); - final BytesReference messageBody = writtenBytes.bytes(); + // we have to call finishStream here before accessing the bytes. A CompressibleBytesOutputStream + // might be implementing compression. And finish stream ensures that some marker bytes (EOS marker) + // are written. Otherwise we barf on the decompressing end when we read past EOF on purpose in the + // #validateRequest method. this might be a problem in deflate after all but it's important to write + // the marker bytes. + stream.finishStream(); + final BytesReference messageBody = stream.bytes(); final BytesReference header = buildHeader(requestId, status, stream.getVersion(), messageBody.length() + zeroCopyBuffer.length()); return new CompositeBytesReference(header, messageBody, zeroCopyBuffer); } From 467585a0c0ea2905425c490529ef1c69aa03d592 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sun, 28 May 2017 12:47:13 -0500 Subject: [PATCH 03/10] Slightly rework streams --- .../CompressibleBytesOutputStream.java | 22 ++++++++++--------- .../elasticsearch/transport/TcpTransport.java | 3 +-- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java b/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java index 68c0e71db8d69..acb29dfe15502 100644 --- a/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java +++ b/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java @@ -14,12 +14,11 @@ import java.io.IOException; -public class CompressibleBytesOutputStream extends BytesStream implements Releasable { +public class CompressibleBytesOutputStream extends StreamOutput implements Releasable { private final StreamOutput stream; private final ReleasableBytesStreamOutput bytesStreamOutput; private final boolean shouldCompress; - private boolean finishCalled = false; public CompressibleBytesOutputStream(BigArrays bigArrays, boolean shouldCompress) throws IOException { bytesStreamOutput = new ReleasableBytesStreamOutput(bigArrays); @@ -31,18 +30,21 @@ public CompressibleBytesOutputStream(BigArrays bigArrays, boolean shouldCompress } } - public void finishStream() throws IOException { - this.finishCalled = true; - + /** + * This method ensures that compression is complete and returns the underlying bytes. + * + * @return bytes underlying the stream + * @throws IOException if an exception occurs when writing or flushing + */ + public BytesStream finishStream() throws IOException { + // If we are using compression the stream needs to be closed to ensure that EOS marker bytes are written. + // The actual ReleasableBytesStreamOutput will not be closed yet it is wrapped in flushOnCloseStream when + // passed to the deflater stream. if (shouldCompress) { stream.close(); } - } - @Override - public BytesReference bytes() { - assert finishCalled : "Must call finishStream() before accessing underlying bytes"; - return bytesStreamOutput.bytes(); + return bytesStreamOutput; } @Override diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index ed0b529b297a4..ec50e6cd4efc5 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -1185,8 +1185,7 @@ private BytesReference buildMessage(long requestId, byte status, Version nodeVer // are written. Otherwise we barf on the decompressing end when we read past EOF on purpose in the // #validateRequest method. this might be a problem in deflate after all but it's important to write // the marker bytes. - stream.finishStream(); - final BytesReference messageBody = stream.bytes(); + final BytesReference messageBody = stream.finishStream().bytes(); final BytesReference header = buildHeader(requestId, status, stream.getVersion(), messageBody.length() + zeroCopyBuffer.length()); return new CompositeBytesReference(header, messageBody, zeroCopyBuffer); } From 85d42dc3c51d86f0a83ce515b15cc751982da0fc Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sun, 28 May 2017 18:00:08 -0500 Subject: [PATCH 04/10] Fix checkstyle and compress boolean logic --- .../CompressibleBytesOutputStream.java | 22 ++++++++++++++++--- .../elasticsearch/transport/TcpTransport.java | 10 ++++----- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java b/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java index acb29dfe15502..0d6322d39b9a2 100644 --- a/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java +++ b/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java @@ -1,9 +1,25 @@ - +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.elasticsearch.transport; import org.apache.lucene.util.IOUtils; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStream; @@ -38,7 +54,7 @@ public CompressibleBytesOutputStream(BigArrays bigArrays, boolean shouldCompress */ public BytesStream finishStream() throws IOException { // If we are using compression the stream needs to be closed to ensure that EOS marker bytes are written. - // The actual ReleasableBytesStreamOutput will not be closed yet it is wrapped in flushOnCloseStream when + // The actual ReleasableBytesStreamOutput will not be closed yet as it is wrapped in flushOnCloseStream when // passed to the deflater stream. if (shouldCompress) { stream.close(); diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index ec50e6cd4efc5..99c212acfef36 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -1032,13 +1032,14 @@ private void sendRequestToChannel(final DiscoveryNode node, final Channel target if (compress) { options = TransportRequestOptions.builder(options).withCompress(true).build(); } + boolean compressMessage = options.compress() && canCompress(request); status = TransportStatus.setRequest(status); - final CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bigArrays, options.compress()); + final CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bigArrays, compressMessage); boolean addedReleaseListener = false; try { // only compress if asked, and, the request is not bytes, since then only // the header part is compressed, and the "body" can't be extracted as compressed - if (options.compress() && canCompress(request)) { + if (compressMessage) { status = TransportStatus.setCompress(status); } @@ -1053,8 +1054,7 @@ private void sendRequestToChannel(final DiscoveryNode node, final Channel target BytesReference message = buildMessage(requestId, status, node.getVersion(), request, stream); final TransportRequestOptions finalOptions = options; // this might be called in a different thread - SendListener onRequestSent = new SendListener( - () -> IOUtils.closeWhileHandlingException(stream), + SendListener onRequestSent = new SendListener(stream, () -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions)); internalSendMessage(targetChannel, message, onRequestSent); addedReleaseListener = true; @@ -1134,7 +1134,7 @@ private void sendResponse(Version nodeVersion, Channel channel, final TransportR final TransportResponseOptions finalOptions = options; // this might be called in a different thread - SendListener listener = new SendListener(() -> IOUtils.closeWhileHandlingException(stream), + SendListener listener = new SendListener(stream, () -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions)); internalSendMessage(channel, reference, listener); addedReleaseListener = true; From 54cee2f5386375be2778e9e1f11a7e4e1ed17b70 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sun, 28 May 2017 18:14:36 -0500 Subject: [PATCH 05/10] Pass stream to constructor --- .../transport/CompressibleBytesOutputStream.java | 10 ++++------ .../java/org/elasticsearch/transport/TcpTransport.java | 6 ++++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java b/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java index 0d6322d39b9a2..a21656aeda3a8 100644 --- a/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java +++ b/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java @@ -23,21 +23,19 @@ import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStream; -import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.util.BigArrays; import java.io.IOException; public class CompressibleBytesOutputStream extends StreamOutput implements Releasable { private final StreamOutput stream; - private final ReleasableBytesStreamOutput bytesStreamOutput; + private final BytesStream bytesStreamOutput; private final boolean shouldCompress; - public CompressibleBytesOutputStream(BigArrays bigArrays, boolean shouldCompress) throws IOException { - bytesStreamOutput = new ReleasableBytesStreamOutput(bigArrays); + public CompressibleBytesOutputStream(BytesStream bytesStreamOutput, boolean shouldCompress) throws IOException { + this.bytesStreamOutput = bytesStreamOutput; this.shouldCompress = shouldCompress; if (shouldCompress) { this.stream = CompressorFactory.COMPRESSOR.streamOutput(Streams.flushOnCloseStream(bytesStreamOutput)); @@ -84,7 +82,7 @@ public void close() { if (shouldCompress) { IOUtils.closeWhileHandlingException(stream); } - bytesStreamOutput.close(); + IOUtils.closeWhileHandlingException(bytesStreamOutput); } @Override diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 99c212acfef36..6ce1bc3625049 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -1034,7 +1034,8 @@ private void sendRequestToChannel(final DiscoveryNode node, final Channel target } boolean compressMessage = options.compress() && canCompress(request); status = TransportStatus.setRequest(status); - final CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bigArrays, compressMessage); + ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); + final CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, compressMessage); boolean addedReleaseListener = false; try { // only compress if asked, and, the request is not bytes, since then only @@ -1122,7 +1123,8 @@ private void sendResponse(Version nodeVersion, Channel channel, final TransportR options = TransportResponseOptions.builder(options).withCompress(true).build(); } status = TransportStatus.setResponse(status); // TODO share some code with sendRequest - CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bigArrays, options.compress()); + ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); + CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, options.compress()); boolean addedReleaseListener = false; try { if (options.compress()) { From a84e7e39a6d330f627054f454ed48f718c815d18 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 29 May 2017 16:48:49 -0500 Subject: [PATCH 06/10] Make a number of changes based on review --- .../transport/CompressibleBytesOutputStream.java | 14 +++++++------- .../org/elasticsearch/transport/TcpTransport.java | 6 ++++-- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java b/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java index a21656aeda3a8..2690a4a96aca7 100644 --- a/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java +++ b/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java @@ -28,19 +28,19 @@ import java.io.IOException; -public class CompressibleBytesOutputStream extends StreamOutput implements Releasable { +final class CompressibleBytesOutputStream extends StreamOutput implements Releasable { private final StreamOutput stream; private final BytesStream bytesStreamOutput; private final boolean shouldCompress; - public CompressibleBytesOutputStream(BytesStream bytesStreamOutput, boolean shouldCompress) throws IOException { + CompressibleBytesOutputStream(BytesStream bytesStreamOutput, boolean shouldCompress) throws IOException { this.bytesStreamOutput = bytesStreamOutput; this.shouldCompress = shouldCompress; if (shouldCompress) { this.stream = CompressorFactory.COMPRESSOR.streamOutput(Streams.flushOnCloseStream(bytesStreamOutput)); } else { - stream = bytesStreamOutput; + this.stream = bytesStreamOutput; } } @@ -50,7 +50,7 @@ public CompressibleBytesOutputStream(BytesStream bytesStreamOutput, boolean shou * @return bytes underlying the stream * @throws IOException if an exception occurs when writing or flushing */ - public BytesStream finishStream() throws IOException { + BytesStream finishStream() throws IOException { // If we are using compression the stream needs to be closed to ensure that EOS marker bytes are written. // The actual ReleasableBytesStreamOutput will not be closed yet as it is wrapped in flushOnCloseStream when // passed to the deflater stream. @@ -78,11 +78,11 @@ public void flush() throws IOException { @Override public void close() { - // If we are not using compression stream == bytesStreamOutput - if (shouldCompress) { + if (stream == bytesStreamOutput) { IOUtils.closeWhileHandlingException(stream); + } else { + IOUtils.closeWhileHandlingException(stream, bytesStreamOutput); } - IOUtils.closeWhileHandlingException(bytesStreamOutput); } @Override diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 3564a01839909..1c14c8ca4ac23 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -1032,14 +1032,16 @@ private void sendRequestToChannel(final DiscoveryNode node, final Channel target if (compress) { options = TransportRequestOptions.builder(options).withCompress(true).build(); } + + // only compress if asked and the request is not bytes. Otherwise only + // the header part is compressed, and the "body" can't be extracted as compressed boolean compressMessage = options.compress() && canCompress(request); + status = TransportStatus.setRequest(status); ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); final CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, compressMessage); boolean addedReleaseListener = false; try { - // only compress if asked, and, the request is not bytes, since then only - // the header part is compressed, and the "body" can't be extracted as compressed if (compressMessage) { status = TransportStatus.setCompress(status); } From 43bd03d109481bafd21736118aabc8bba534b191 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 29 May 2017 19:01:44 -0500 Subject: [PATCH 07/10] Add test --- .../CompressibleBytesOutputStream.java | 5 +- .../elasticsearch/transport/TcpTransport.java | 4 +- .../CompressibleBytesOutputStreamTests.java | 120 ++++++++++++++++++ 3 files changed, 124 insertions(+), 5 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/transport/CompressibleBytesOutputStreamTests.java diff --git a/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java b/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java index 2690a4a96aca7..53025d5d2b886 100644 --- a/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java +++ b/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java @@ -20,6 +20,7 @@ package org.elasticsearch.transport; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStream; @@ -50,7 +51,7 @@ final class CompressibleBytesOutputStream extends StreamOutput implements Releas * @return bytes underlying the stream * @throws IOException if an exception occurs when writing or flushing */ - BytesStream finishStream() throws IOException { + BytesReference materializeBytes() throws IOException { // If we are using compression the stream needs to be closed to ensure that EOS marker bytes are written. // The actual ReleasableBytesStreamOutput will not be closed yet as it is wrapped in flushOnCloseStream when // passed to the deflater stream. @@ -58,7 +59,7 @@ BytesStream finishStream() throws IOException { stream.close(); } - return bytesStreamOutput; + return bytesStreamOutput.bytes(); } @Override diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 1c14c8ca4ac23..b39c7aff7f482 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -41,8 +41,6 @@ import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.NotCompressedException; -import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.io.stream.BytesStream; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -1189,7 +1187,7 @@ private BytesReference buildMessage(long requestId, byte status, Version nodeVer // are written. Otherwise we barf on the decompressing end when we read past EOF on purpose in the // #validateRequest method. this might be a problem in deflate after all but it's important to write // the marker bytes. - final BytesReference messageBody = stream.finishStream().bytes(); + final BytesReference messageBody = stream.materializeBytes(); final BytesReference header = buildHeader(requestId, status, stream.getVersion(), messageBody.length() + zeroCopyBuffer.length()); return new CompositeBytesReference(header, messageBody, zeroCopyBuffer); } diff --git a/core/src/test/java/org/elasticsearch/transport/CompressibleBytesOutputStreamTests.java b/core/src/test/java/org/elasticsearch/transport/CompressibleBytesOutputStreamTests.java new file mode 100644 index 0000000000000..6fb9b8198cb64 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/transport/CompressibleBytesOutputStreamTests.java @@ -0,0 +1,120 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.CompressorFactory; +import org.elasticsearch.common.io.stream.BytesStream; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.test.ESTestCase; + +import java.io.EOFException; +import java.io.IOException; + +public class CompressibleBytesOutputStreamTests extends ESTestCase { + + public void testStreamWithoutCompression() throws IOException { + BytesStream bStream = new ZeroOutOnCloseStream(); + CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, false); + + byte[] expectedBytes = randomBytes(randomInt(30)); + stream.write(expectedBytes); + + BytesReference bytesRef = stream.materializeBytes(); + + assertFalse(CompressorFactory.COMPRESSOR.isCompressed(bytesRef)); + + StreamInput streamInput = bytesRef.streamInput(); + byte[] actualBytes = new byte[expectedBytes.length]; + streamInput.readBytes(actualBytes, 0, expectedBytes.length); + + assertEquals(-1, streamInput.read()); + assertArrayEquals(expectedBytes, actualBytes); + stream.close(); + + // The bytes should be zeroed out on close + for (byte b : bytesRef.toBytesRef().bytes) { + assertEquals((byte) 0, b); + } + } + + public void testStreamWithCompression() throws IOException { + BytesStream bStream = new ZeroOutOnCloseStream(); + CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, true); + + byte[] expectedBytes = randomBytes(randomInt(30)); + stream.write(expectedBytes); + + BytesReference bytesRef = stream.materializeBytes(); + + assertTrue(CompressorFactory.COMPRESSOR.isCompressed(bytesRef)); + + StreamInput streamInput = CompressorFactory.COMPRESSOR.streamInput(bytesRef.streamInput()); + byte[] actualBytes = new byte[expectedBytes.length]; + streamInput.readBytes(actualBytes, 0, expectedBytes.length); + + assertEquals(-1, streamInput.read()); + assertArrayEquals(expectedBytes, actualBytes); + stream.close(); + + // The bytes should be zeroed out on close + for (byte b : bytesRef.toBytesRef().bytes) { + assertEquals((byte) 0, b); + } + } + + public void testCompressionWithCallingMaterializeFails() throws IOException { + BytesStream bStream = new ZeroOutOnCloseStream(); + CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, true); + + byte[] expectedBytes = randomBytes(randomInt(30)); + stream.write(expectedBytes); + + + try { + StreamInput streamInput = CompressorFactory.COMPRESSOR.streamInput(bStream.bytes().streamInput()); + byte[] actualBytes = new byte[expectedBytes.length]; + streamInput.readBytes(actualBytes, 0, expectedBytes.length); + fail("Expected to receive EOFException"); + } catch (EOFException e) { + assertEquals("Unexpected end of ZLIB input stream", e.getMessage()); + } + + stream.close(); + } + + private static byte[] randomBytes(int length) { + byte[] bytes = new byte[length]; + for (int i = 0; i < bytes.length; ++i) { + bytes[i] = randomByte(); + } + return bytes; + } + + private static class ZeroOutOnCloseStream extends BytesStreamOutput { + + @Override + public void close() { + int size = (int) bytes.size(); + bytes.set(0, new byte[size], 0, size); + } + } +} From d121346764168ce6c2cbb7c73183fb14bd17b81e Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 30 May 2017 11:04:48 -0500 Subject: [PATCH 08/10] Make changes based on review --- .../CompressibleBytesOutputStream.java | 17 ++++++++++++++++- .../elasticsearch/transport/TcpTransport.java | 6 +++--- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java b/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java index 53025d5d2b886..87a8b287a711a 100644 --- a/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java +++ b/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java @@ -28,7 +28,21 @@ import org.elasticsearch.common.lease.Releasable; import java.io.IOException; +import java.util.zip.DeflaterOutputStream; +/** + * This class exists to provide a stream with optional compression. This is useful as using compression + * requires that the underlying {@link DeflaterOutputStream} be closed to write EOS bytes. However, the + * {@link BytesStream} should not be closed yet, as we have not used the bytes. This class handles these + * intricacies. + * + * {@link CompressibleBytesOutputStream#materializeBytes()} should be called when all the bytes have been + * written to this stream. If compression is enabled, the proper EOS bytes will be written at that point. + * The underlying {@link BytesReference} will be returned. + * + * {@link CompressibleBytesOutputStream#close()} should be called when the bytes are no longer needed and + * can be safely released. + */ final class CompressibleBytesOutputStream extends StreamOutput implements Releasable { private final StreamOutput stream; @@ -79,7 +93,8 @@ public void flush() throws IOException { @Override public void close() { - if (stream == bytesStreamOutput) { + // If we are not using compression stream == bytesStreamOutput + if (shouldCompress == false) { IOUtils.closeWhileHandlingException(stream); } else { IOUtils.closeWhileHandlingException(stream, bytesStreamOutput); diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index b39c7aff7f482..209127b1cddde 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -1033,7 +1033,7 @@ private void sendRequestToChannel(final DiscoveryNode node, final Channel target // only compress if asked and the request is not bytes. Otherwise only // the header part is compressed, and the "body" can't be extracted as compressed - boolean compressMessage = options.compress() && canCompress(request); + final boolean compressMessage = options.compress() && canCompress(request); status = TransportStatus.setRequest(status); ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); @@ -1182,8 +1182,8 @@ private BytesReference buildMessage(long requestId, byte status, Version nodeVer message.writeTo(stream); zeroCopyBuffer = BytesArray.EMPTY; } - // we have to call finishStream here before accessing the bytes. A CompressibleBytesOutputStream - // might be implementing compression. And finish stream ensures that some marker bytes (EOS marker) + // we have to call materializeBytes() here before accessing the bytes. A CompressibleBytesOutputStream + // might be implementing compression. And materializeBytes() ensures that some marker bytes (EOS marker) // are written. Otherwise we barf on the decompressing end when we read past EOF on purpose in the // #validateRequest method. this might be a problem in deflate after all but it's important to write // the marker bytes. From fdb58f0c1d6e7fe9a4c0e261fa9777c4f2dfba61 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 30 May 2017 11:10:03 -0500 Subject: [PATCH 09/10] Use expectThrows --- .../CompressibleBytesOutputStreamTests.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/transport/CompressibleBytesOutputStreamTests.java b/core/src/test/java/org/elasticsearch/transport/CompressibleBytesOutputStreamTests.java index 6fb9b8198cb64..721f53f10f9cb 100644 --- a/core/src/test/java/org/elasticsearch/transport/CompressibleBytesOutputStreamTests.java +++ b/core/src/test/java/org/elasticsearch/transport/CompressibleBytesOutputStreamTests.java @@ -89,14 +89,10 @@ public void testCompressionWithCallingMaterializeFails() throws IOException { stream.write(expectedBytes); - try { - StreamInput streamInput = CompressorFactory.COMPRESSOR.streamInput(bStream.bytes().streamInput()); - byte[] actualBytes = new byte[expectedBytes.length]; - streamInput.readBytes(actualBytes, 0, expectedBytes.length); - fail("Expected to receive EOFException"); - } catch (EOFException e) { - assertEquals("Unexpected end of ZLIB input stream", e.getMessage()); - } + StreamInput streamInput = CompressorFactory.COMPRESSOR.streamInput(bStream.bytes().streamInput()); + byte[] actualBytes = new byte[expectedBytes.length]; + EOFException e = expectThrows(EOFException.class, () -> streamInput.readBytes(actualBytes, 0, expectedBytes.length)); + assertEquals("Unexpected end of ZLIB input stream", e.getMessage()); stream.close(); } From 7f1af8de22a6ac91c7ee5ad76757e678646d2de4 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 31 May 2017 10:11:27 -0500 Subject: [PATCH 10/10] Add assertions --- .../transport/CompressibleBytesOutputStream.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java b/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java index 87a8b287a711a..8e5d5b027bec4 100644 --- a/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java +++ b/core/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java @@ -93,10 +93,11 @@ public void flush() throws IOException { @Override public void close() { - // If we are not using compression stream == bytesStreamOutput - if (shouldCompress == false) { + if (stream == bytesStreamOutput) { + assert shouldCompress == false : "If the streams are the same we should not be compressing"; IOUtils.closeWhileHandlingException(stream); } else { + assert shouldCompress : "If the streams are different we should be compressing"; IOUtils.closeWhileHandlingException(stream, bytesStreamOutput); } }