Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,34 @@
* be acquired or allocated up-front. Apart from the costs of the buffer creation & release a {@link BufferedStreamOutput} is likely
* more performant than an {@link OutputStreamStreamOutput} because it writes all fields directly to its local buffer and only copies data
* to the underlying stream when the buffer fills up.
*
* <hr>
*
* <h2>Wrapping a {@linkplain java.io.ByteArrayOutputStream}</h2>
*
* A {@link java.io.ByteArrayOutputStream} collects data in an underlying {@code byte[]} collector which typically doubles in size each time
* it receives a write operation which exhausts the remaining space in the existing collector. This works well if wrapped with a
* {@link BufferedStreamOutput} especially if the object is expected to be small enough to fit entirely into the buffer, because then
* there's only one slightly-oversized {@code byte[]} allocation to create the collector, plus another right-sized {@code byte[]} allocation
* to extract the result, assuming the buffer is not allocated afresh each time. However, subsequent flushes may need to allocate a larger
* collector, copying over the existing data to the new collector, so this can perform badly for larger objects. For objects larger than
* half the G1 region size (8MiB on a 32GiB heap) each reallocation will require a
* <a href="https://www.oracle.com/technical-resources/articles/java/g1gc.html">humongous allocation</a> which can be stressful for the
* garbage collector, so it's better to split the bytes into several smaller pages using utilities such as {@linkplain BytesStreamOutput},
* {@linkplain ReleasableBytesStreamOutput} or {@linkplain RecyclerBytesStreamOutput} in those cases.
* <p>
* A {@link BufferedStreamOutput} around a {@link java.io.ByteArrayOutputStream} is also good if the in-memory serialized representation
* will have a long lifetime, because the resulting {@code byte[]} is exactly the correct size. When using other approaches such as a
* {@linkplain BytesStreamOutput}, {@linkplain ReleasableBytesStreamOutput} or {@linkplain RecyclerBytesStreamOutput} there will be some
* amount of unused overhead bytes which may be particularly undesirable for long-lived objects.
* <p>
* An {@link OutputStreamStreamOutput} wrapper is almost certainly worse than a {@link BufferedStreamOutput} because it will make the
* {@link java.io.ByteArrayOutputStream} perform significantly more allocations and copies until the collecting buffer gets large enough.
* Most writes to a {@link OutputStreamStreamOutput} use a thread-local intermediate buffer (itself somewhat expensive) and then copy that
* intermediate buffer directly to the output.
* <p>
* Any memory allocated in this way is untracked by the {@link org.elasticsearch.common.breaker} subsystem unless the caller takes steps to
* add this tracking themselves.
*/
public class BufferedStreamOutput extends StreamOutput {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,28 @@
import java.util.Objects;

/**
* A @link {@link StreamOutput} that uses {@link BigArrays} to acquire pages of
* bytes, which avoids frequent reallocation &amp; copying of the internal data.
* A @link {@link StreamOutput} that accumulates the resulting data in memory, using {@link BigArrays} to avoids frequent reallocation &amp;
* copying of the internal data once the resulting data grows large enough whilst avoiding excessive overhead in the final result for small
* objects.
* <p>
* A {@link BytesStreamOutput} accumulates data using a non-recycling {@link BigArrays} and, as with an {@link OutputStreamStreamOutput}, it
* uses a thread-locally-cached buffer for some of its writes and pushes data to the underlying array in small chunks, causing frequent
* calls to {@link BigArrays#resize}. If the array is large enough (≥16kiB) then the resize operations happen in-place, allocating a new
* 16kiB {@code byte[]} and appending it to the array, but for smaller arrays these resize operations allocate a completely fresh
* {@code byte[]} into which they copy the entire contents of the old one.
* <p>
* {@link BigArrays#resize} grows smaller arrays more slowly than a {@link ByteArrayOutputStream}, with a target of 12.5% overhead rather
* than 100%, which means that a sequence of smaller writes causes more allocations and copying overall. It may be worth adding a
* {@link BufferedStreamOutput} wrapper to reduce the frequency of the resize operations, especially if a suitable buffer is already
* allocated and available.
* <p>
* The resulting {@link BytesReference} is a view over the underlying {@code byte[]} pages and involves no significant extra allocation to
* obtain. It is oversized: The worst case for overhead is when the data is one byte more than a 16kiB page and therefore the result must
* retain two pages even though all but one byte of the second page is unused. For smaller objects the overhead will be 12.5%.
* <p>
* Any memory allocated in this way is untracked by the {@link org.elasticsearch.common.breaker} subsystem unless the caller takes steps to
* add this tracking themselves.
*
*/
public class BytesStreamOutput extends BytesStream {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
Expand All @@ -26,9 +27,32 @@
import java.util.Objects;

/**
* A @link {@link StreamOutput} that uses {@link Recycler.V<BytesRef>} to acquire pages of bytes, which
* avoids frequent reallocation &amp; copying of the internal data. When {@link #close()} is called,
* the bytes will be released.
* A @link {@link StreamOutput} that uses a {@link Recycler<BytesRef>} to acquire pages of bytes, which avoids frequent reallocation &amp;
* copying of the internal data. When {@link #close()} is called, the bytes will be released.
* <p>
* Best only used for outputs which are either short-lived or large, because the resulting {@link ReleasableBytesReference} retains whole
* pages and this overhead may be significant on small and long-lived objects.
*
* A {@link RecyclerBytesStreamOutput} obtains pages (16kiB slices of a larger {@code byte[]}) from a {@code Recycler<BytesRef>} rather than
* using the {@link BigArrays} abstraction that {@link BytesStreamOutput} and {@link ReleasableBytesStreamOutput} both use. This means it
* can access the underlying {@code byte[]} directly and therefore avoids the intermediate buffer and the copy for almost all writes (the
* exception being occasional writes that get too close to the end of a page).
* <p>
* It does not attempt to grow its collector slowly in the same way that {@link BigArrays#resize} does. Instead, it always obtains from the
* recycler a whole new 16kiB page when the need arises. This works best when the serialized data has a short lifespan (e.g. it is an
* outbound network message) so the overhead has limited impact and the savings on allocations and copying (due to the absence of resize
* operations) are significant.
* <p>
* The resulting {@link ReleasableBytesReference} is a view over the underlying {@code byte[]} pages and involves no significant extra
* allocation to obtain. It is oversized: The worst case for overhead is when the data is a single byte, since this takes up a whole 16kiB
* page almost all of which is overhead. Nonetheless, if recycled pages are available then it may still be preferable to use them via a
* {@link RecyclerBytesStreamOutput}. If the result is large then the overhead is less important, and if the result will only be used for
* a short time, for instance soon being written to the network or to disk, then the imminent recycling of these pages may mean it is ok to
* keep it as-is. For results which are both small and long-lived it may be better to copy them into a freshly-allocated {@code byte[]}.
* <p>
* Any memory allocated in this way is not tracked by the {@link org.elasticsearch.common.breaker} subsystem, even if the
* {@code Recycler<BytesRef>} was obtained from {@link BigArrays#bytesRefRecycler()}, unless the caller takes steps to add this tracking
* themselves.
*/
public class RecyclerBytesStreamOutput extends BytesStream implements Releasable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,34 @@
import org.elasticsearch.core.Releasables;

/**
* An bytes stream output that allows providing a {@link BigArrays} instance
* expecting it to require releasing its content ({@link #bytes()}) once done.
* A @link {@link StreamOutput} that accumulates the resulting data in memory, using {@link BigArrays} to avoids frequent reallocation &amp;
* copying of the internal data once the resulting data grows large enough whilst avoiding excessive overhead in the final result for small
* objects.
* <p>
* Please note, closing this stream will release the bytes that are in use by any
* {@link ReleasableBytesReference} returned from {@link #bytes()}, so this
* stream should only be closed after the bytes have been output or copied
* elsewhere.
* A {@link ReleasableBytesStreamOutput} behaves similarly to a {@link BytesStreamOutput} except that it accumulates data using the provided
* {@link BigArrays}, which typically should be a recycling instance and thus the resulting bytes must be explicitly released when no
* longer needed. As with the {@link BytesStreamOutput} it uses a thread-locally-cached buffer for some of its
* writes and pushes data to the underlying array in small chunks, causing frequent calls to {@link BigArrays#resize}. If the array is large
* enough (≥8kiB) then the resize operations happen in-place, obtaining a recycled 16kiB page and appending it to the array, but for smaller
* arrays these resize operations allocate a completely fresh {@code byte[]} into which they copy the entire contents of the old one.
* <p>
* As above, smaller arrays grow slowly into freshly-allocated {@code byte[]} arrays with a target of 12.5% overhead. It may be worth adding
* a {@link BufferedStreamOutput} wrapper to reduce the frequency of the resize operations, especially if a suitable buffer is already
* allocated and available.
* <p>
* This is different from a {@link RecyclerBytesStreamOutput} which <i>only</i> uses recycled 16kiB pages and never itself allocates a raw
* {@code byte[]}.
* <p>
* The resulting {@link ReleasableBytesReference} is a view over the underlying {@code byte[]} pages and involves no significant extra
* allocation to obtain. It is oversized: The worst case for overhead is when the data is one byte more than a 16kiB page and therefore the
* result must retain two pages even though all but one byte of the second page is unused. The recycling {@link BigArrays} also switches to
* using recycled pages at half a page (8kiB) which also carries around 50% overhead. For smaller objects the overhead will be 12.5%.
* <p>
* Any memory allocated in this way is tracked by the {@link org.elasticsearch.common.breaker} subsystem if using a suitably-configured
* {@link BigArrays}.
* <p>
* Please note, closing this stream will release the bytes that are in use by any {@link ReleasableBytesReference} returned from
* {@link #bytes()}, so this stream should only be closed after the bytes have been output or copied elsewhere.
*/
public class ReleasableBytesStreamOutput extends BytesStreamOutput implements Releasable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,29 @@
import static java.util.Map.entry;

/**
* A stream from another node to this node. Technically, it can also be streamed from a byte array but that is mostly for testing.
*
* A stream into which a structured {@linkplain Writeable} may be written, e.g. for sending over a transport connection to a remote node.
* <p>
* This class's methods are optimized so you can put the methods that read and write a class next to each other and you can scan them
* visually for differences. That means that most variables should be read and written in a single line so even large objects fit both
* reading and writing on the screen. It also means that the methods on this class are named very similarly to {@link StreamInput}. Finally
* it means that the "barrier to entry" for adding new methods to this class is relatively low even though it is a shared class with code
* everywhere. That being said, this class deals primarily with {@code List}s rather than Arrays. For the most part calls should adapt to
* lists, either by storing {@code List}s internally or just converting to and from a {@code List} when calling. This comment is repeated
* on {@link StreamInput}.
* <hr>
* It is possible to use a {@linkplain StreamOutput} as an adapter to send a {@linkplain Writeable} to a raw-bytes {@linkplain OutputStream}
* such as a file or a compressing stream, for instance using {@link OutputStreamStreamOutput} or {@link BufferedStreamOutput}. Often,
* however, we want to capture the serialized representation of an object in memory as a {@code byte[]} or more generally a
* {@link BytesReference} (a sequence of slices of {@code byte[]}s). For example, this is how the {@link org.elasticsearch.transport}
* subsystem prepares a network message for transmission. There are several different ways to achieve this objective, with different
* performance characteristics, as described in the documentation for the following classes.
*
* <ul>
* <li>{@link BytesStreamOutput}</li>
* <li>{@link ReleasableBytesStreamOutput}</li>
* <li>{@link RecyclerBytesStreamOutput}</li>
* <li>{@link BufferedStreamOutput}</li>
* </ul>
*/
public abstract class StreamOutput extends OutputStream {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@

public class BytesRefRecycler implements Recycler<BytesRef> {

/**
* A non-recycling {@link BytesRefRecycler} which simply allocates a fresh 16kiB {@code byte[]} each time. This is only really
* appropriate for use in tests.
*/
// TODO move to test framework?
public static final BytesRefRecycler NON_RECYCLING_INSTANCE = new BytesRefRecycler(PageCacheRecycler.NON_RECYCLING_INSTANCE);

private final PageCacheRecycler recycler;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.common.io.stream;

import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.core.Releasable;

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.test.ESTestCase.between;
import static org.elasticsearch.test.ESTestCase.randomBoolean;
import static org.elasticsearch.test.ESTestCase.randomByte;
import static org.elasticsearch.test.ESTestCase.randomByteArrayOfLength;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;

/**
* A {@link Recycler} for {@link BytesRef} pages which returns pages with nontrivial offsets and verifies that the surrounding buffer pool
* is left untouched.
*/
public class MockBytesRefRecycler implements Recycler<BytesRef>, Releasable {

private final AtomicInteger activePageCount = new AtomicInteger();

@Override
public V<BytesRef> obtain() {
activePageCount.incrementAndGet();
final var bufferPool = randomByteArrayOfLength(between(pageSize(), pageSize() * 4));
final var offset = randomBoolean()
// align to a multiple of pageSize(), which is the usual case in production
? between(0, bufferPool.length / pageSize() - 1) * pageSize()
// no alignment, to detect cases where alignment is assumed
: between(0, bufferPool.length - pageSize());
final var bytesRef = new BytesRef(bufferPool, offset, pageSize());

final var bufferPoolCopy = ArrayUtil.copyArray(bufferPool); // keep for a later check for out-of-bounds writes
final var dummyByte = randomByte();
Arrays.fill(bufferPoolCopy, offset, offset + pageSize(), dummyByte);

return new V<>() {
@Override
public BytesRef v() {
return bytesRef;
}

@Override
public boolean isRecycled() {
throw new AssertionError("shouldn't matter");
}

@Override
public void close() {
// page must not be changed
assertSame(bufferPool, bytesRef.bytes);
assertEquals(offset, bytesRef.offset);
assertEquals(pageSize(), bytesRef.length);

Arrays.fill(bufferPool, offset, offset + pageSize(), dummyByte); // overwrite buffer contents to detect use-after-free
assertArrayEquals(bufferPoolCopy, bufferPool); // remainder of pool must be unmodified

activePageCount.decrementAndGet();
}
};
}

@Override
public int pageSize() {
return PageCacheRecycler.BYTE_PAGE_SIZE;
}

@Override
public void close() {
assertEquals(0, activePageCount.get());
}
}
Loading