|
18 | 18 | */ |
19 | 19 | package org.elasticsearch.transport; |
20 | 20 |
|
21 | | -import org.elasticsearch.core.internal.io.IOUtils; |
22 | 21 | import org.elasticsearch.Version; |
23 | 22 | import org.elasticsearch.action.ActionListener; |
24 | 23 | import org.elasticsearch.cluster.node.DiscoveryNode; |
25 | 24 | import org.elasticsearch.common.bytes.BytesReference; |
26 | 25 | import org.elasticsearch.common.io.stream.BytesStreamOutput; |
27 | 26 | import org.elasticsearch.common.io.stream.InputStreamStreamInput; |
28 | 27 | import org.elasticsearch.common.io.stream.NamedWriteableRegistry; |
| 28 | +import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; |
29 | 29 | import org.elasticsearch.common.io.stream.StreamInput; |
30 | 30 | import org.elasticsearch.common.network.NetworkService; |
31 | 31 | import org.elasticsearch.common.settings.Settings; |
|
35 | 35 | import org.elasticsearch.common.util.CancellableThreads; |
36 | 36 | import org.elasticsearch.common.util.concurrent.AbstractRunnable; |
37 | 37 | import org.elasticsearch.common.util.concurrent.EsExecutors; |
| 38 | +import org.elasticsearch.core.internal.io.IOUtils; |
38 | 39 | import org.elasticsearch.indices.breaker.CircuitBreakerService; |
39 | 40 | import org.elasticsearch.mocksocket.MockServerSocket; |
40 | 41 | import org.elasticsearch.mocksocket.MockSocket; |
@@ -153,19 +154,20 @@ private void readMessage(MockChannel mockChannel, StreamInput input) throws IOEx |
153 | 154 | if (msgSize == -1) { |
154 | 155 | socket.getOutputStream().flush(); |
155 | 156 | } else { |
156 | | - BytesStreamOutput output = new BytesStreamOutput(); |
157 | | - final byte[] buffer = new byte[msgSize]; |
158 | | - input.readFully(buffer); |
159 | | - output.write(minimalHeader); |
160 | | - output.writeInt(msgSize); |
161 | | - output.write(buffer); |
162 | | - final BytesReference bytes = output.bytes(); |
163 | | - if (TcpTransport.validateMessageHeader(bytes)) { |
164 | | - InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); |
165 | | - messageReceived(bytes.slice(TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE, msgSize), |
166 | | - mockChannel, mockChannel.profile, remoteAddress, msgSize); |
167 | | - } else { |
168 | | - // ping message - we just drop all stuff |
| 157 | + try (BytesStreamOutput output = new ReleasableBytesStreamOutput(msgSize, bigArrays)) { |
| 158 | + final byte[] buffer = new byte[msgSize]; |
| 159 | + input.readFully(buffer); |
| 160 | + output.write(minimalHeader); |
| 161 | + output.writeInt(msgSize); |
| 162 | + output.write(buffer); |
| 163 | + final BytesReference bytes = output.bytes(); |
| 164 | + if (TcpTransport.validateMessageHeader(bytes)) { |
| 165 | + InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); |
| 166 | + messageReceived(bytes.slice(TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE, msgSize), |
| 167 | + mockChannel, mockChannel.profile, remoteAddress, msgSize); |
| 168 | + } else { |
| 169 | + // ping message - we just drop all stuff |
| 170 | + } |
169 | 171 | } |
170 | 172 | } |
171 | 173 | } |
|
0 commit comments