diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/bytes/RecyclerBytesStreamOutputBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/bytes/RecyclerBytesStreamOutputBenchmark.java index 99ff455a360c4..c4188e8069e72 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/bytes/RecyclerBytesStreamOutputBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/bytes/RecyclerBytesStreamOutputBenchmark.java @@ -37,7 +37,8 @@ @Fork(value = 1) public class RecyclerBytesStreamOutputBenchmark { - private final AtomicReference bytesRef = new AtomicReference<>(new BytesRef(16384)); + private final AtomicReference bytesRef = new AtomicReference<>(new BytesRef(new byte[16384], 0, 16384)); + private RecyclerBytesStreamOutput streamOutput; private String shortString; private String longString; @@ -156,7 +157,7 @@ public V obtain() { localBytesRef = recycledBytesRef; } else { recycled = false; - localBytesRef = new BytesRef(16384); + localBytesRef = new BytesRef(new byte[16384], 0, 16384); } return new V<>() { @Override diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java b/server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java index c6e8a0bc798e3..daba4ef3dfb59 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java @@ -38,13 +38,30 @@ public class RecyclerBytesStreamOutput extends BytesStream implements Releasable private int pageIndex = -1; private int currentCapacity = 0; - private BytesRef currentBytesRef; - private int currentPageOffset; + /** + * Pool from which current buffer is sliced. + */ + private byte[] currentBufferPool; + + /** + * Current absolute offset within currentBufferPool. + */ + private int currentOffset; + + /** + * Max permitted offset within currentBufferPool. + */ + private int maxOffset; + + /** + * Position in stream corresponding (conceptually at least) with start of currentBufferPool. + */ + private long positionOffset; public RecyclerBytesStreamOutput(Recycler recycler) { this.recycler = recycler; this.pageSize = recycler.pageSize(); - this.currentPageOffset = pageSize; + this.currentOffset = this.maxOffset = pageSize; // Always start with a page. This is because if we don't have a page, one of the hot write paths would be forced to go through // a slow path. We prefer to only execute that path if we need to expand. ensureCapacityFromPosition(1); @@ -53,21 +70,18 @@ public RecyclerBytesStreamOutput(Recycler recycler) { @Override public long position() { - return ((long) pageSize * pageIndex) + currentPageOffset; + return positionOffset + currentOffset; } @Override public void writeByte(byte b) { - int currentPageOffset = this.currentPageOffset; - if (1 > pageSize - currentPageOffset) { + int currentOffset = this.currentOffset; + if (currentOffset >= maxOffset) { ensureCapacity(1); - nextPage(); - currentPageOffset = 0; + currentOffset = nextPage(); } - final BytesRef currentPage = currentBytesRef; - final int destOffset = currentPage.offset + currentPageOffset; - currentPage.bytes[destOffset] = b; - this.currentPageOffset = currentPageOffset + 1; + this.currentBufferPool[currentOffset] = b; + this.currentOffset = currentOffset + 1; } @Override @@ -87,47 +101,63 @@ public void writeBytes(byte[] b, int offset, int length) { return; } - Objects.checkFromIndexSize(offset, length, b.length); - - int currentPageOffset = this.currentPageOffset; - BytesRef currentPage = currentBytesRef; - if (length > pageSize - currentPageOffset) { - ensureCapacity(length); + int currentOffset = this.currentOffset; + int maxOffset = this.maxOffset; + if (length <= maxOffset - currentOffset) { + System.arraycopy(b, offset, this.currentBufferPool, currentOffset, length); + this.currentOffset = currentOffset + length; + } else { + writeBytesMultiPage(b, offset, length, this.currentBufferPool, currentOffset, maxOffset); } + } - int bytesToCopy = length; - int srcOff = offset; + private void writeBytesMultiPage( + byte[] sourceBufferPool, + int sourceOffset, + int lengthToCopy, + byte[] targetBufferPool, + int targetOffset, + int maxTargetOffset + ) { + Objects.checkFromIndexSize(sourceOffset, lengthToCopy, sourceBufferPool.length); + ensureCapacity(lengthToCopy); + + int pageIndex = this.pageIndex; + int pageStart = 0; while (true) { - final int toCopyThisLoop = Math.min(pageSize - currentPageOffset, bytesToCopy); - final int destOffset = currentPage.offset + currentPageOffset; - System.arraycopy(b, srcOff, currentPage.bytes, destOffset, toCopyThisLoop); - srcOff += toCopyThisLoop; - bytesToCopy -= toCopyThisLoop; - if (bytesToCopy > 0) { - currentPageOffset = 0; - currentPage = pages.get(++pageIndex).v(); + final int toCopyThisLoop = Math.min(maxTargetOffset - targetOffset, lengthToCopy); + System.arraycopy(sourceBufferPool, sourceOffset, targetBufferPool, targetOffset, toCopyThisLoop); + sourceOffset += toCopyThisLoop; + lengthToCopy -= toCopyThisLoop; + if (lengthToCopy > 0) { + final var nextPage = pages.get(++pageIndex).v(); + targetBufferPool = nextPage.bytes; + targetOffset = pageStart = nextPage.offset; + maxTargetOffset = nextPage.offset + nextPage.length; } else { - currentPageOffset += toCopyThisLoop; + targetOffset += toCopyThisLoop; break; } } - this.currentPageOffset = currentPageOffset; - this.currentBytesRef = currentPage; + this.pageIndex = pageIndex; + this.currentBufferPool = targetBufferPool; + this.currentOffset = targetOffset; + this.maxOffset = maxTargetOffset; + this.positionOffset = ((long) pageIndex) * pageSize - pageStart; } @Override public void writeVInt(int i) throws IOException { - final int currentPageOffset = this.currentPageOffset; - final int remainingBytesInPage = pageSize - currentPageOffset; + int currentOffset = this.currentOffset; + final int remainingBytesInPage = maxOffset - currentOffset; // Single byte values (most common) if ((i & 0xFFFFFF80) == 0) { if (1 > remainingBytesInPage) { super.writeVInt(i); } else { - BytesRef currentPage = currentBytesRef; - currentPage.bytes[currentPage.offset + currentPageOffset] = (byte) i; - this.currentPageOffset = currentPageOffset + 1; + this.currentBufferPool[currentOffset] = (byte) i; + this.currentOffset = currentOffset + 1; } return; } @@ -136,9 +166,7 @@ public void writeVInt(int i) throws IOException { if (bytesNeeded > remainingBytesInPage) { super.writeVInt(i); } else { - BytesRef currentPage = currentBytesRef; - putVInt(i, bytesNeeded, currentPage.bytes, currentPage.offset + currentPageOffset); - this.currentPageOffset = currentPageOffset + bytesNeeded; + this.currentOffset = currentOffset + StreamOutputHelper.putMultiByteVInt(this.currentBufferPool, i, currentOffset); } } @@ -166,49 +194,45 @@ private void putVInt(int i, int bytesNeeded, byte[] page, int offset) { @Override public void writeInt(int i) throws IOException { - final int currentPageOffset = this.currentPageOffset; - if (4 > (pageSize - currentPageOffset)) { + int currentOffset = this.currentOffset; + if (4 > (maxOffset - currentOffset)) { super.writeInt(i); } else { - BytesRef currentPage = currentBytesRef; - ByteUtils.writeIntBE(i, currentPage.bytes, currentPage.offset + currentPageOffset); - this.currentPageOffset = currentPageOffset + 4; + ByteUtils.writeIntBE(i, currentBufferPool, currentOffset); + this.currentOffset = currentOffset + 4; } } @Override public void writeIntLE(int i) throws IOException { - final int currentPageOffset = this.currentPageOffset; - if (4 > (pageSize - currentPageOffset)) { + int currentOffset = this.currentOffset; + if (4 > (maxOffset - currentOffset)) { super.writeIntLE(i); } else { - BytesRef currentPage = currentBytesRef; - ByteUtils.writeIntLE(i, currentPage.bytes, currentPage.offset + currentPageOffset); - this.currentPageOffset = currentPageOffset + 4; + ByteUtils.writeIntLE(i, currentBufferPool, currentOffset); + this.currentOffset = currentOffset + 4; } } @Override public void writeLong(long i) throws IOException { - final int currentPageOffset = this.currentPageOffset; - if (8 > (pageSize - currentPageOffset)) { + int currentOffset = this.currentOffset; + if (8 > (maxOffset - currentOffset)) { super.writeLong(i); } else { - BytesRef currentPage = currentBytesRef; - ByteUtils.writeLongBE(i, currentPage.bytes, currentPage.offset + currentPageOffset); - this.currentPageOffset = currentPageOffset + 8; + ByteUtils.writeLongBE(i, currentBufferPool, currentOffset); + this.currentOffset = currentOffset + 8; } } @Override public void writeLongLE(long i) throws IOException { - final int currentPageOffset = this.currentPageOffset; - if (8 > (pageSize - currentPageOffset)) { + int currentOffset = this.currentOffset; + if (8 > (maxOffset - currentOffset)) { super.writeLongLE(i); } else { - BytesRef currentPage = currentBytesRef; - ByteUtils.writeLongLE(i, currentPage.bytes, currentPage.offset + currentPageOffset); - this.currentPageOffset = currentPageOffset + 8; + ByteUtils.writeLongLE(i, currentBufferPool, currentOffset); + this.currentOffset = currentOffset + 8; } } @@ -243,12 +267,10 @@ public void legacyWriteWithSizePrefix(Writeable writeable) throws IOException { * @return a direct page if there is enough space in current page, otherwise null */ public BytesRef tryGetPageForWrite(int bytes) { - final int beforePageOffset = this.currentPageOffset; - if (bytes <= (pageSize - beforePageOffset)) { - BytesRef currentPage = currentBytesRef; - BytesRef bytesRef = new BytesRef(currentPage.bytes, currentPage.offset + beforePageOffset, bytes); - this.currentPageOffset = beforePageOffset + bytes; - return bytesRef; + final int currentOffset = this.currentOffset; + if (bytes <= maxOffset - currentOffset) { + this.currentOffset = currentOffset + bytes; + return new BytesRef(this.currentBufferPool, currentOffset, bytes); } else { return null; } @@ -258,37 +280,36 @@ public BytesRef tryGetPageForWrite(int bytes) { // intermediary buffers @Override public void writeString(String str) throws IOException { - final int currentPageOffset = this.currentPageOffset; + int currentOffset = this.currentOffset; final int charCount = str.length(); int bytesNeededForVInt = vIntLength(charCount); // maximum serialized length is 3 bytes per char + n bytes for the vint - if (charCount * 3 + bytesNeededForVInt > (pageSize - currentPageOffset)) { + if (charCount * 3 + bytesNeededForVInt > maxOffset - currentOffset) { // Technically no need for scratch buffer here, we can do the same thing directly on the pages just with bounds checks -- TODO StreamOutputHelper.writeString(str, this); return; } - BytesRef currentPage = currentBytesRef; - int offset = currentPage.offset + currentPageOffset; - byte[] buffer = currentPage.bytes; + int offset = currentOffset; + byte[] currentBufferPool = this.currentBufferPool; // mostly duplicated from StreamOutput.writeString to to get more reliable compilation of this very hot loop - putVInt(charCount, bytesNeededForVInt, currentPage.bytes, offset); + putVInt(charCount, bytesNeededForVInt, currentBufferPool, offset); offset += bytesNeededForVInt; for (int i = 0; i < charCount; i++) { final int c = str.charAt(i); if (c <= 0x007F) { - buffer[offset++] = ((byte) c); + currentBufferPool[offset++] = ((byte) c); } else if (c > 0x07FF) { - buffer[offset++] = ((byte) (0xE0 | c >> 12 & 0x0F)); - buffer[offset++] = ((byte) (0x80 | c >> 6 & 0x3F)); - buffer[offset++] = ((byte) (0x80 | c >> 0 & 0x3F)); + currentBufferPool[offset++] = ((byte) (0xE0 | c >> 12 & 0x0F)); + currentBufferPool[offset++] = ((byte) (0x80 | c >> 6 & 0x3F)); + currentBufferPool[offset++] = ((byte) (0x80 | c >> 0 & 0x3F)); } else { - buffer[offset++] = ((byte) (0xC0 | c >> 6 & 0x1F)); - buffer[offset++] = ((byte) (0x80 | c >> 0 & 0x3F)); + currentBufferPool[offset++] = ((byte) (0xC0 | c >> 6 & 0x1F)); + currentBufferPool[offset++] = ((byte) (0x80 | c >> 0 & 0x3F)); } } - this.currentPageOffset = offset - currentPage.offset; + this.currentOffset = offset; } @Override @@ -316,27 +337,33 @@ public void flush() { public void seek(long position) { ensureCapacityFromPosition(position); if (position > 0) { - int offsetInPage = (int) (position % pageSize); - int pageIndex = (int) position / pageSize; - // Special handling for seeking to the first index in a new page, which is handled as a seeking to one-after the last index // in the previous case. This is done so that seeking to the first index of a new page does not cause a page allocation while // still allowing a fast check via (pageSize - currentPageOffset) on the remaining size in the current page in all other // methods. - if (offsetInPage == 0) { - this.pageIndex = pageIndex - 1; - this.currentPageOffset = pageSize; - } else { - this.pageIndex = pageIndex; - this.currentPageOffset = offsetInPage; - } + long prevPosition = position - 1; + int offsetInPage = (int) (prevPosition % pageSize); + int pageIndex = (int) prevPosition / pageSize; + innerSeek(pageIndex, offsetInPage + 1, position); } else { // We always have an initial page so special handling for seeking to 0. assert position == 0; - this.pageIndex = 0; - this.currentPageOffset = 0; + innerSeek(0, 0, 0); + } + } + + private void innerSeek(int pageIndex, int offsetInPage, long position) { + if (this.pageIndex == pageIndex) { + this.currentOffset = (int) (position - this.positionOffset); + } else { + this.pageIndex = pageIndex; + final var page = pages.get(pageIndex).v(); + this.currentBufferPool = page.bytes; + final var pageOffset = page.offset; + this.currentOffset = pageOffset + offsetInPage; + this.maxOffset = pageOffset + page.length; + this.positionOffset = ((long) pageIndex) * pageSize - pageOffset; } - this.currentBytesRef = pages.get(pageIndex).v(); } public void skip(int length) { @@ -368,9 +395,11 @@ public ReleasableBytesReference moveToBytesReference() { private void closeFields() { this.pages = null; - this.currentBytesRef = null; + this.currentBufferPool = null; this.pageIndex = -1; - this.currentPageOffset = pageSize; + this.currentOffset = 0; + this.maxOffset = 0; + this.positionOffset = 0L; this.currentCapacity = 0; } @@ -418,7 +447,7 @@ public BytesReference bytes() { } private void ensureCapacity(int bytesNeeded) { - assert bytesNeeded > pageSize - currentPageOffset; + assert bytesNeeded > maxOffset - currentOffset; ensureCapacityFromPosition(position() + bytesNeeded); } @@ -445,9 +474,14 @@ private void ensureCapacityFromPosition(long newPosition) { } } - private void nextPage() { + private int nextPage() { pageIndex++; - currentPageOffset = 0; - currentBytesRef = pages.get(pageIndex).v(); + final var page = pages.get(pageIndex).v(); + this.currentBufferPool = page.bytes; + final var pageOffset = page.offset; + this.currentOffset = pageOffset; + this.maxOffset = pageOffset + page.length; + this.positionOffset = ((long) pageIndex) * pageSize - pageOffset; + return pageOffset; } }