Skip to content

Commit afd45c1

Browse files
committed
Revert "Closing a ReleasableBytesStreamOutput closes the underlying BigArray (elastic#23572)"
This reverts commit 6bfecdf.
1 parent d31d2ca commit afd45c1

File tree

17 files changed

+88
-265
lines changed

17 files changed

+88
-265
lines changed

core/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,13 @@
3030
*/
3131
public final class ReleasablePagedBytesReference extends PagedBytesReference implements Releasable {
3232

33-
private final Releasable releasable;
34-
35-
public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length,
36-
Releasable releasable) {
33+
public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length) {
3734
super(bigarrays, byteArray, length);
38-
this.releasable = releasable;
3935
}
4036

4137
@Override
4238
public void close() {
43-
Releasables.close(releasable);
39+
Releasables.close(byteArray);
4440
}
4541

4642
}

core/src/main/java/org/elasticsearch/common/compress/Compressor.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.elasticsearch.common.compress;
2121

2222
import org.elasticsearch.common.bytes.BytesReference;
23-
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
2423
import org.elasticsearch.common.io.stream.StreamInput;
2524
import org.elasticsearch.common.io.stream.StreamOutput;
2625

@@ -32,9 +31,5 @@ public interface Compressor {
3231

3332
StreamInput streamInput(StreamInput in) throws IOException;
3433

35-
/**
36-
* Creates a new stream output that compresses the contents and writes to the provided stream
37-
* output. Closing the returned {@link StreamOutput} will close the provided stream output.
38-
*/
3934
StreamOutput streamOutput(StreamOutput out) throws IOException;
4035
}

core/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.common.compress;
2121

2222
import org.elasticsearch.common.bytes.BytesReference;
23+
import org.elasticsearch.common.compress.Compressor;
2324
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
2425
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
2526
import org.elasticsearch.common.io.stream.StreamInput;
@@ -46,7 +47,7 @@ public class DeflateCompressor implements Compressor {
4647
// It needs to be different from other compressors and to not be specific
4748
// enough so that no stream starting with these bytes could be detected as
4849
// a XContent
49-
private static final byte[] HEADER = new byte[]{'D', 'F', 'L', '\0'};
50+
private static final byte[] HEADER = new byte[] { 'D', 'F', 'L', '\0' };
5051
// 3 is a good trade-off between speed and compression ratio
5152
private static final int LEVEL = 3;
5253
// We use buffering on the input and output of in/def-laters in order to
@@ -87,7 +88,6 @@ public StreamInput streamInput(StreamInput in) throws IOException {
8788
decompressedIn = new BufferedInputStream(decompressedIn, BUFFER_SIZE);
8889
return new InputStreamStreamInput(decompressedIn) {
8990
final AtomicBoolean closed = new AtomicBoolean(false);
90-
9191
public void close() throws IOException {
9292
try {
9393
super.close();
@@ -107,11 +107,10 @@ public StreamOutput streamOutput(StreamOutput out) throws IOException {
107107
final boolean nowrap = true;
108108
final Deflater deflater = new Deflater(LEVEL, nowrap);
109109
final boolean syncFlush = true;
110-
DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush);
111-
OutputStream compressedOut = new BufferedOutputStream(deflaterOutputStream, BUFFER_SIZE);
110+
OutputStream compressedOut = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush);
111+
compressedOut = new BufferedOutputStream(compressedOut, BUFFER_SIZE);
112112
return new OutputStreamStreamOutput(compressedOut) {
113113
final AtomicBoolean closed = new AtomicBoolean(false);
114-
115114
public void close() throws IOException {
116115
try {
117116
super.close();

core/src/main/java/org/elasticsearch/common/io/stream/BytesStream.java renamed to core/src/main/java/org/elasticsearch/common/io/BytesStream.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
* under the License.
1818
*/
1919

20-
package org.elasticsearch.common.io.stream;
20+
package org.elasticsearch.common.io;
2121

2222
import org.elasticsearch.common.bytes.BytesReference;
2323

24-
public abstract class BytesStream extends StreamOutput {
24+
public interface BytesStream {
2525

26-
public abstract BytesReference bytes();
27-
}
26+
BytesReference bytes();
27+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.common.io;
21+
22+
import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
23+
24+
/**
25+
* A bytes stream that requires its bytes to be released once no longer used.
26+
*/
27+
public interface ReleasableBytesStream extends BytesStream {
28+
29+
@Override
30+
ReleasablePagedBytesReference bytes();
31+
32+
}

core/src/main/java/org/elasticsearch/common/io/Streams.java

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,6 @@
2020
package org.elasticsearch.common.io;
2121

2222
import org.apache.lucene.util.IOUtils;
23-
import org.elasticsearch.common.bytes.BytesReference;
24-
import org.elasticsearch.common.io.stream.BytesStream;
25-
import org.elasticsearch.common.io.stream.StreamOutput;
2623
import org.elasticsearch.common.util.Callback;
2724

2825
import java.io.BufferedReader;
@@ -239,56 +236,4 @@ public static void readAllLines(InputStream input, Callback<String> callback) th
239236
}
240237
}
241238
}
242-
243-
/**
244-
* Wraps the given {@link BytesStream} in a {@link StreamOutput} that simply flushes when
245-
* close is called.
246-
*/
247-
public static BytesStream flushOnCloseStream(BytesStream os) {
248-
return new FlushOnCloseOutputStream(os);
249-
}
250-
251-
/**
252-
* A wrapper around a {@link BytesStream} that makes the close operation a flush. This is
253-
* needed as sometimes a stream will be closed but the bytes that the stream holds still need
254-
* to be used and the stream cannot be closed until the bytes have been consumed.
255-
*/
256-
private static class FlushOnCloseOutputStream extends BytesStream {
257-
258-
private final BytesStream delegate;
259-
260-
private FlushOnCloseOutputStream(BytesStream bytesStreamOutput) {
261-
this.delegate = bytesStreamOutput;
262-
}
263-
264-
@Override
265-
public void writeByte(byte b) throws IOException {
266-
delegate.writeByte(b);
267-
}
268-
269-
@Override
270-
public void writeBytes(byte[] b, int offset, int length) throws IOException {
271-
delegate.writeBytes(b, offset, length);
272-
}
273-
274-
@Override
275-
public void flush() throws IOException {
276-
delegate.flush();
277-
}
278-
279-
@Override
280-
public void close() throws IOException {
281-
flush();
282-
}
283-
284-
@Override
285-
public void reset() throws IOException {
286-
delegate.reset();
287-
}
288-
289-
@Override
290-
public BytesReference bytes() {
291-
return delegate.bytes();
292-
}
293-
}
294239
}

core/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.common.bytes.BytesReference;
2323
import org.elasticsearch.common.bytes.PagedBytesReference;
24+
import org.elasticsearch.common.io.BytesStream;
2425
import org.elasticsearch.common.util.BigArrays;
2526
import org.elasticsearch.common.util.ByteArray;
2627

@@ -30,7 +31,7 @@
3031
* A @link {@link StreamOutput} that uses {@link BigArrays} to acquire pages of
3132
* bytes, which avoids frequent reallocation &amp; copying of the internal data.
3233
*/
33-
public class BytesStreamOutput extends BytesStream {
34+
public class BytesStreamOutput extends StreamOutput implements BytesStream {
3435

3536
protected final BigArrays bigArrays;
3637

@@ -49,7 +50,7 @@ public BytesStreamOutput() {
4950
/**
5051
* Create a non recycling {@link BytesStreamOutput} with enough initial pages acquired
5152
* to satisfy the capacity given by expected size.
52-
*
53+
*
5354
* @param expectedSize the expected maximum size of the stream in bytes.
5455
*/
5556
public BytesStreamOutput(int expectedSize) {
@@ -128,7 +129,7 @@ public void close() {
128129

129130
/**
130131
* Returns the current size of the buffer.
131-
*
132+
*
132133
* @return the value of the <code>count</code> field, which is the number of valid
133134
* bytes in this output stream.
134135
* @see java.io.ByteArrayOutputStream#count
@@ -150,7 +151,7 @@ public long ramBytesUsed() {
150151
return bytes.ramBytesUsed();
151152
}
152153

153-
void ensureCapacity(long offset) {
154+
private void ensureCapacity(long offset) {
154155
if (offset > Integer.MAX_VALUE) {
155156
throw new IllegalArgumentException(getClass().getSimpleName() + " cannot hold more than 2GB of data");
156157
}

core/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,56 +20,29 @@
2020
package org.elasticsearch.common.io.stream;
2121

2222
import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
23-
import org.elasticsearch.common.lease.Releasable;
24-
import org.elasticsearch.common.lease.Releasables;
23+
import org.elasticsearch.common.io.ReleasableBytesStream;
2524
import org.elasticsearch.common.util.BigArrays;
26-
import org.elasticsearch.common.util.ByteArray;
2725

2826
/**
2927
* An bytes stream output that allows providing a {@link BigArrays} instance
3028
* expecting it to require releasing its content ({@link #bytes()}) once done.
3129
* <p>
32-
* Please note, closing this stream will release the bytes that are in use by any
33-
* {@link ReleasablePagedBytesReference} returned from {@link #bytes()}, so this
34-
* stream should only be closed after the bytes have been output or copied
35-
* elsewhere.
30+
* Please note, its is the responsibility of the caller to make sure the bytes
31+
* reference do not "escape" and are released only once.
3632
*/
37-
public class ReleasableBytesStreamOutput extends BytesStreamOutput
38-
implements Releasable {
39-
40-
private Releasable releasable;
33+
public class ReleasableBytesStreamOutput extends BytesStreamOutput implements ReleasableBytesStream {
4134

4235
public ReleasableBytesStreamOutput(BigArrays bigarrays) {
43-
this(BigArrays.PAGE_SIZE_IN_BYTES, bigarrays);
36+
super(BigArrays.PAGE_SIZE_IN_BYTES, bigarrays);
4437
}
4538

4639
public ReleasableBytesStreamOutput(int expectedSize, BigArrays bigArrays) {
4740
super(expectedSize, bigArrays);
48-
this.releasable = Releasables.releaseOnce(this.bytes);
4941
}
5042

51-
/**
52-
* Returns a {@link Releasable} implementation of a
53-
* {@link org.elasticsearch.common.bytes.BytesReference} that represents the current state of
54-
* the bytes in the stream.
55-
*/
5643
@Override
5744
public ReleasablePagedBytesReference bytes() {
58-
return new ReleasablePagedBytesReference(bigArrays, bytes, count, releasable);
59-
}
60-
61-
@Override
62-
public void close() {
63-
Releasables.close(releasable);
45+
return new ReleasablePagedBytesReference(bigArrays, bytes, count);
6446
}
6547

66-
@Override
67-
void ensureCapacity(long offset) {
68-
final ByteArray prevBytes = this.bytes;
69-
super.ensureCapacity(offset);
70-
if (prevBytes != this.bytes) {
71-
// re-create the releasable with the new reference
72-
releasable = Releasables.releaseOnce(this.bytes);
73-
}
74-
}
7548
}

core/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.lucene.util.BytesRef;
2323
import org.elasticsearch.common.bytes.BytesReference;
2424
import org.elasticsearch.common.geo.GeoPoint;
25-
import org.elasticsearch.common.io.stream.BytesStream;
25+
import org.elasticsearch.common.io.BytesStream;
2626
import org.elasticsearch.common.io.stream.BytesStreamOutput;
2727
import org.elasticsearch.common.lease.Releasable;
2828
import org.elasticsearch.common.text.Text;
@@ -53,7 +53,7 @@
5353
/**
5454
* A utility to build XContent (ie json).
5555
*/
56-
public final class XContentBuilder implements Releasable, Flushable {
56+
public final class XContentBuilder implements BytesStream, Releasable, Flushable {
5757

5858
/**
5959
* Create a new {@link XContentBuilder} using the given {@link XContent} content.
@@ -1041,6 +1041,7 @@ public XContentGenerator generator() {
10411041
return this.generator;
10421042
}
10431043

1044+
@Override
10441045
public BytesReference bytes() {
10451046
close();
10461047
return ((BytesStream) bos).bytes();

core/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ public Location add(final Operation operation) throws IOException {
439439
}
440440
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
441441
} finally {
442-
Releasables.close(out);
442+
Releasables.close(out.bytes());
443443
}
444444
}
445445

@@ -1332,7 +1332,7 @@ public static void writeOperations(StreamOutput outStream, List<Operation> toWri
13321332
bytes.writeTo(outStream);
13331333
}
13341334
} finally {
1335-
Releasables.close(out);
1335+
Releasables.close(out.bytes());
13361336
}
13371337

13381338
}

0 commit comments

Comments
 (0)