Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@

import org.apache.parquet.ShouldNeverHappenException;

/*
Changes implemented:
All of the functionality of LittleEndianDataInputStream has been merged into ByteBufferInputStream and its child
classes. This has resulted in measurable performance improvements for the following reasons:
- Elimination of at least one layer of abstraction / method call overhead
- Enabling support for intrinsics for readInt, readLong, etc.
- Eliminate the need for the JIT to make inferences that may or may not inline methods from BytesUtils and
the InputStream.read() that is called by BytesUtils.
*/

public class ByteBufferInputStream extends InputStream {

// Used to maintain the deprecated behavior of instantiating ByteBufferInputStream directly
Expand All @@ -49,6 +59,19 @@ public static ByteBufferInputStream wrap(List<ByteBuffer> buffers) {
}
}

public static ByteBufferInputStream wrap(ByteBuffer buffer, int offset, int count) {
return new SingleBufferInputStream(buffer, offset, count);
}

public static ByteBufferInputStream wrap(byte[] buf) {
return new SingleBufferInputStream(buf);
}

public static ByteBufferInputStream wrap(byte[] buf, int start, int length) {
return new SingleBufferInputStream(buf, start, length);
}


ByteBufferInputStream() {
delegate = null;
}
Expand All @@ -74,11 +97,26 @@ public ByteBufferInputStream(ByteBuffer buffer) {
*/
@Deprecated
public ByteBufferInputStream(ByteBuffer buffer, int offset, int count) {
// This is necessary to pass "TestDeprecatedBufferInputStream"...
ByteBuffer temp = buffer.duplicate();
temp.position(offset);
ByteBuffer byteBuf = temp.slice();
byteBuf.limit(count);
delegate = wrap(byteBuf);
// ... but it would probably be faster to do this:
// delegate = wrap(buffer, offset, count);
}

public ByteBufferInputStream(byte[] inBuf) {
delegate = wrap(inBuf);
}

public ByteBufferInputStream(byte[] inBuf, int start, int length) {
delegate = wrap(inBuf, start, length);
}

public ByteBufferInputStream(List<ByteBuffer> inBufs) {
delegate = wrap(inBufs);
}

/**
Expand All @@ -98,12 +136,12 @@ public long position() {
return delegate.position();
}

public void position(int pos) {
throw new UnsupportedOperationException();
}

public void skipFully(long n) throws IOException {
long skipped = skip(n);
if (skipped < n) {
throw new EOFException(
"Not enough bytes to skip: " + skipped + " < " + n);
}
delegate.skipFully(n);
}

public int read(ByteBuffer out) {
Expand All @@ -119,15 +157,20 @@ public List<ByteBuffer> sliceBuffers(long length) throws EOFException {
}

public ByteBufferInputStream sliceStream(long length) throws EOFException {
return ByteBufferInputStream.wrap(sliceBuffers(length));
return delegate.sliceStream(length);
//return ByteBufferInputStream.wrap(sliceBuffers(length));
}

public List<ByteBuffer> remainingBuffers() {
return delegate.remainingBuffers();
}

public ByteBufferInputStream remainingStream() {
return ByteBufferInputStream.wrap(remainingBuffers());
return delegate.remainingStream();
}

public ByteBufferInputStream duplicate() {
return delegate.duplicate();
}

public int read() throws IOException {
Expand All @@ -138,14 +181,34 @@ public int read(byte[] b, int off, int len) throws IOException {
return delegate.read(b, off, len);
}

public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}

public void readFully(byte b[]) throws IOException {
readFully(b, 0, b.length);
}

public void readFully(byte b[], int off, int len) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't see where it is used. Don't know why it is 'public'.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what the method signatures for DataInputStream.readFully look like.

I also have a whole bunch of other performance improvements I want to contribute (https://docs.google.com/document/d/1fBGpF_LgtfaeHnPD5CFEIpA2Ga_lTITmFdFIcO9Af-g/edit?usp=sharing), and I think this might get used in some of that code.

I'm very soon going to publish an open preview of all of my proposed changes to a branch of my own fork, so we'll be able to check this out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delegate.readFully(b, off, len);
}

public long skip(long n) {
return delegate.skip(n);
}

public int skipBytes(int n) {
return (int)skip(n);
}

public int available() {
return delegate.available();
}

public int remaining() {
return available();
}

public void mark(int readlimit) {
delegate.mark(readlimit);
}
Expand All @@ -157,4 +220,83 @@ public void reset() throws IOException {
public boolean markSupported() {
return delegate.markSupported();
}

public void close() throws IOException {
}

public boolean readBoolean() throws IOException {
return readByte() != 0;
}

public byte readByte() throws IOException {
return delegate.readByte();
}

public int readUnsignedByte() throws IOException {
return delegate.readUnsignedByte();
}

public short readShort() throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea to provide these read functions to enable larger read. BTW, is there any use case to read a batch of shorts (and other numeric types)?

Copy link
Contributor Author

@theosib-amazon theosib-amazon Nov 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use the new batch read methods heavily in some optimizations I made to Trino. As for short, I can't say I recall any uses in Trino of readShorts(). readShort() is used indirectly through a method that reads a variable sized representation.

return delegate.readShort();
}

public int readUnsignedShort() throws IOException {
return delegate.readUnsignedShort();
}

public int readInt() throws IOException {
return delegate.readInt();
}

public long readLong() throws IOException {
return delegate.readLong();
}

public float readFloat() throws IOException {
return Float.intBitsToFloat(readInt());
}

public double readDouble() throws IOException {
return Double.longBitsToDouble(readLong());
}

public int readIntLittleEndianOnThreeBytes() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it copied from BytesUtils.java? I wonder why we don't use that directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comment on this. These two methods have the the same outcome, but mine is faster. I believe this is warranted for a performance critical path.

int ch1 = readUnsignedByte();
int ch2 = readUnsignedByte();
int ch3 = readUnsignedByte();
return ((ch3 << 16) + (ch2 << 8) + (ch1 << 0));
}

public int readIntLittleEndianPaddedOnBitWidth(int bitWidth)
Copy link
Contributor

@shangxinli shangxinli Jul 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it copied from BytesUtils.java? I wonder why we don't use that directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The one that reads three bytes may or may not be a win. A level of abstraction is eliminated by doing this. It's hard to say whether or not the JIT will be smart enough to do that automatically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know about that method. The BytesUtils code always reads one byte at a time. My version will read a whole word at a time for short and int. This is faster.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait. Are you referring to readIntLittleEndianPaddedOnBitWidth or readIntLittleEndianOnThreeBytes?

The former is definitely faster. An argument could be made to remove the latter, although it'll take longer for the JIT to hide the extra layers of virtual calls.

throws IOException {

int bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth);
switch (bytesWidth) {
case 0:
return 0;
case 1:
return readUnsignedByte();
case 2:
return readUnsignedShort();
case 3:
return readIntLittleEndianOnThreeBytes();
case 4:
return readInt();
default:
throw new IOException(
String.format("Encountered bitWidth (%d) that requires more than 4 bytes", bitWidth));
}
}

public int readUnsignedVarInt() throws IOException {
Copy link
Contributor

@shangxinli shangxinli Jul 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it copied from BytesUtils.java? I wonder why we don't use that directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly. The one in BytesUtils calls methods that read one byte at a time. This one can take advantage of faster methods that read whole words at a time. This is a critical-path method, so it's a performance win to eliminate the extra level of abstraction and all the extra overhead fetching individual bytes and shifting.

int value = 0;
int i = 0;
int b;
while (((b = readUnsignedByte()) & 0x80) != 0) {
value |= (b & 0x7F) << i;
i += 7;
}
return value | (b << i);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
/**
* Based on DataInputStream but little endian and without the String/char methods
*/
@Deprecated
public final class LittleEndianDataInputStream extends InputStream {

private final InputStream in;
Expand Down
Loading