Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,7 @@ public long position() {
}

public void skipFully(long n) throws IOException {
long skipped = skip(n);
if (skipped < n) {
throw new EOFException(
"Not enough bytes to skip: " + skipped + " < " + n);
}
delegate.skipFully(n);
}

public int read(ByteBuffer out) {
Expand All @@ -119,15 +115,15 @@ public List<ByteBuffer> sliceBuffers(long length) throws EOFException {
}

public ByteBufferInputStream sliceStream(long length) throws EOFException {
return ByteBufferInputStream.wrap(sliceBuffers(length));
return delegate.sliceStream(length);
}

public List<ByteBuffer> remainingBuffers() {
return delegate.remainingBuffers();
}

public ByteBufferInputStream remainingStream() {
return ByteBufferInputStream.wrap(remainingBuffers());
return delegate.remainingStream();
}

public int read() throws IOException {
Expand All @@ -138,6 +134,18 @@ public int read(byte[] b, int off, int len) throws IOException {
return delegate.read(b, off, len);
}

public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}

public void readFully(byte[] b) throws IOException {
readFully(b, 0, b.length);
}

public void readFully(byte b[], int off, int len) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't see where it is used. Don't know why it is 'public'.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what the method signatures for DataInputStream.readFully look like.

I also have a whole bunch of other performance improvements I want to contribute (https://docs.google.com/document/d/1fBGpF_LgtfaeHnPD5CFEIpA2Ga_lTITmFdFIcO9Af-g/edit?usp=sharing), and I think this might get used in some of that code.

I'm very soon going to publish an open preview of all of my proposed changes to a branch of my own fork, so we'll be able to check this out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delegate.readFully(b, off, len);
}

public long skip(long n) {
return delegate.skip(n);
}
Expand All @@ -157,4 +165,80 @@ public void reset() throws IOException {
public boolean markSupported() {
return delegate.markSupported();
}

public boolean readBoolean() throws IOException {
return readByte() != 0;
}

public byte readByte() throws IOException {
return delegate.readByte();
}

public int readUnsignedByte() throws IOException {
return delegate.readUnsignedByte();
}

public short readShort() throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea to provide these read functions to enable larger read. BTW, is there any use case to read a batch of shorts (and other numeric types)?

Copy link
Contributor Author

@theosib-amazon theosib-amazon Nov 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use the new batch read methods heavily in some optimizations I made to Trino. As for short, I can't say I recall any uses in Trino of readShorts(). readShort() is used indirectly through a method that reads a variable sized representation.

return delegate.readShort();
}

public int readUnsignedShort() throws IOException {
return delegate.readUnsignedShort();
}

public int readInt() throws IOException {
return delegate.readInt();
}

public long readLong() throws IOException {
return delegate.readLong();
}

public float readFloat() throws IOException {
return Float.intBitsToFloat(readInt());
}

public double readDouble() throws IOException {
return Double.longBitsToDouble(readLong());
}

public int readIntLittleEndianOnThreeBytes() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it copied from BytesUtils.java? I wonder why we don't use that directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comment on this. These two methods have the the same outcome, but mine is faster. I believe this is warranted for a performance critical path.

int ch1 = readUnsignedByte();
int ch2 = readUnsignedByte();
int ch3 = readUnsignedByte();
return ((ch3 << 16) + (ch2 << 8) + (ch1 << 0));
}

public int readIntLittleEndianPaddedOnBitWidth(int bitWidth)
Copy link
Contributor

@shangxinli shangxinli Jul 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it copied from BytesUtils.java? I wonder why we don't use that directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The one that reads three bytes may or may not be a win. A level of abstraction is eliminated by doing this. It's hard to say whether or not the JIT will be smart enough to do that automatically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know about that method. The BytesUtils code always reads one byte at a time. My version will read a whole word at a time for short and int. This is faster.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait. Are you referring to readIntLittleEndianPaddedOnBitWidth or readIntLittleEndianOnThreeBytes?

The former is definitely faster. An argument could be made to remove the latter, although it'll take longer for the JIT to hide the extra layers of virtual calls.

throws IOException {

int bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth);
switch (bytesWidth) {
case 0:
return 0;
case 1:
return readUnsignedByte();
case 2:
return readUnsignedShort();
case 3:
return readIntLittleEndianOnThreeBytes();
case 4:
return readInt();
default:
throw new IOException(
String.format("Encountered bitWidth (%d) that requires more than 4 bytes", bitWidth));
}
}

public int readUnsignedVarInt() throws IOException {
Copy link
Contributor

@shangxinli shangxinli Jul 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it copied from BytesUtils.java? I wonder why we don't use that directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly. The one in BytesUtils calls methods that read one byte at a time. This one can take advantage of faster methods that read whole words at a time. This is a critical-path method, so it's a performance win to eliminate the extra level of abstraction and all the extra overhead fetching individual bytes and shifting.

int value = 0;
int i = 0;
int b;
while (((b = readUnsignedByte()) & 0x80) != 0) {
value |= (b & 0x7F) << i;
i += 7;
}
return value | (b << i);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.parquet.bytes;

import org.apache.parquet.ShouldNeverHappenException;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -27,6 +28,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.nio.BufferUnderflowException;

class MultiBufferInputStream extends ByteBufferInputStream {
private static final ByteBuffer EMPTY = ByteBuffer.allocate(0);
Expand Down Expand Up @@ -89,6 +91,15 @@ public long skip(long n) {
return bytesSkipped;
}

@Override
public void skipFully(long n) throws IOException {
if (current == null || n > length) {
throw new EOFException("Not enough bytes to skip: " + length + " < " + n);
}

skip(n);
}

@Override
public int read(ByteBuffer out) {
int len = out.remaining();
Expand Down Expand Up @@ -193,6 +204,10 @@ public List<ByteBuffer> sliceBuffers(long len) throws EOFException {
return buffers;
}

public ByteBufferInputStream sliceStream(long length) throws EOFException {
return ByteBufferInputStream.wrap(sliceBuffers(length));
}

@Override
public List<ByteBuffer> remainingBuffers() {
if (position >= length) {
Expand All @@ -208,6 +223,10 @@ public List<ByteBuffer> remainingBuffers() {
}
}

public ByteBufferInputStream remainingStream() {
return ByteBufferInputStream.wrap(remainingBuffers());
}

@Override
public int read(byte[] bytes, int off, int len) {
if (len <= 0) {
Expand Down Expand Up @@ -238,27 +257,38 @@ public int read(byte[] bytes, int off, int len) {
}

@Override
public int read(byte[] bytes) {
return read(bytes, 0, bytes.length);
}
public void readFully(byte[] bytes, int off, int len) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you cast a light why we need to add the implementation readFully() here? For performance improvement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are situations where we need to read an exact number of bytes and throw an exception if not enough are available. This is faster than reading maybe enough and then checking, and this is a performance critical path.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we just track the remaining bytes of the stream on the client side and check before reading any bytes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally, the user of the class would read exactly the right number of bytes. These checks and exceptions exist only to catch bugs elsewhere. This is one reason why it's important to minimize the overhead of these checks in such performance-critical methods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference between this method and read() is mainly to precheck if there is enough remaining length. I believe this can be done by wrapping up the read() method and adding the prechecks. Duplicating the code makes it harder to maintain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make these changes if you insist. But those prechecks are expensive, which is why I'm trying to avoid them when possible in a performance critical path.

if (len <= 0) {
if (len < 0) {
throw new IndexOutOfBoundsException("Read length must be greater than 0: " + len);
}

return;
}

@Override
public int read() throws IOException {
if (current == null) {
if (current == null || len > length) {
throw new EOFException();
}

while (true) {
int bytesRead = 0;
while (bytesRead < len) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems duplicate with above line 244.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two key differences that make it hard to combine them without hurting performance for one, the other, or both, and they're both performance critical.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicating code is hard to maintain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, my objective here is to maximize performance. So we have to decide between maintainability and performance. Let's deliberate over this a bit more, and I'll do what you think is best.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, my objective here is to maximize performance. So we have to decide between maintainability and performance. Let's deliberate over this a bit more, and I'll do what you think is best.

I deeply understand this is a difficult trade-off. Do you have any evidence on the performance penalty if we wrap read and readFully methods to share some common logic? If the penalty is acceptable, we should definitely go for maintainability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did test a lot of tradeoffs, but I don't think I tested this one thing directly. It's also been quite a while since I did this, so I don't think I'd be able to figure out which spreadsheets have the relevant data.

if (current.remaining() > 0) {
this.position += 1;
return current.get() & 0xFF; // as unsigned
int bytesToRead = Math.min(len - bytesRead, current.remaining());
current.get(bytes, off + bytesRead, bytesToRead);
bytesRead += bytesToRead;
this.position += bytesToRead;
} else if (!nextBuffer()) {
// there are no more buffers
throw new EOFException();
throw new ShouldNeverHappenException();
}
}
}

@Override
public int read() throws IOException {
return readUnsignedByte();
}

@Override
public int available() {
long remaining = length - position;
Expand Down Expand Up @@ -313,6 +343,8 @@ private boolean nextBuffer() {
}

this.current = iterator.next().duplicate();
// Have to put the buffer in little endian mode, because it defaults to big endian
this.current.order(java.nio.ByteOrder.LITTLE_ENDIAN);

if (mark >= 0) {
if (position < markLimit) {
Expand Down Expand Up @@ -379,4 +411,120 @@ public void remove() {
second.remove();
}
}

@Override
public byte readByte() throws IOException {
return (byte) readUnsignedByte();
}

@Override
public int readUnsignedByte() throws IOException {
if (current == null) {
throw new EOFException();
}

this.position += 1;
while (true) {
try {
return current.get() & 0xFF;
} catch (BufferUnderflowException e) {
if (!nextBuffer()) {
// there are no more buffers
throw new EOFException();
}
}
}
}

/**
* When reading a short will cross a buffer boundary, read one byte at a time.
* @return a short value
* @throws IOException
*/
private int getShortSlow() throws IOException {
int c0 = readUnsignedByte();
int c1 = readUnsignedByte();
return ((c0 << 0) + (c1 << 8));
}

public short readShort() throws IOException {
if (current == null) {
throw new EOFException();
}

if (current.remaining() >= Short.BYTES) {
// If the whole short can be read from the current buffer, use intrinsics
this.position += Short.BYTES;
return current.getShort();
} else {
// Otherwise get the short one byte at a time
return (short) getShortSlow();
}
}

public int readUnsignedShort() throws IOException {
return readShort() & 0xffff;
}

/**
* When reading an int will cross a buffer boundary, read one byte at a time.
* @return an int value
* @throws IOException
*/
private int getIntSlow() throws IOException {
int c0 = readUnsignedByte();
int c1 = readUnsignedByte();
int c2 = readUnsignedByte();
int c3 = readUnsignedByte();
return ((c0 << 0) + (c1 << 8)) + ((c2 << 16) + (c3 << 24));
}

@Override
public int readInt() throws IOException {
if (current == null) {
throw new EOFException();
}

if (current.remaining() >= Integer.BYTES) {
// If the whole int can be read from the current buffer, use intrinsics
this.position += Integer.BYTES;
return current.getInt();
} else {
// Otherwise get the int one byte at a time
return getIntSlow();
}
}

/**
* When reading a long will cross a buffer boundary, read one byte at a time.
* @return a long value
* @throws IOException
*/
private long getLongSlow() throws IOException {
long ch0 = (long) readUnsignedByte() << 0;
long ch1 = (long) readUnsignedByte() << 8;
long ch2 = (long) readUnsignedByte() << 16;
long ch3 = (long) readUnsignedByte() << 24;
long ch4 = (long) readUnsignedByte() << 32;
long ch5 = (long) readUnsignedByte() << 40;
long ch6 = (long) readUnsignedByte() << 48;
long ch7 = (long) readUnsignedByte() << 56;
return ((ch0 + ch1) + (ch2 + ch3)) + ((ch4 + ch5) + (ch6 + ch7));
}

@Override
public long readLong() throws IOException {
if (current == null) {
throw new EOFException();
}

if (current.remaining() >= Long.BYTES) {
// If the whole short can be read from the current buffer, use intrinsics
this.position += Long.BYTES;
return current.getLong();
} else {
// Otherwise get the long one byte at a time
return getLongSlow();
}
}
}
Loading