Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,31 @@ public class LongInputStreamV1
implements LongInputStream
{
private static final int MIN_REPEAT_SIZE = 3;
private static final int MAX_LITERAL_SIZE = 128;

private final OrcInputStream input;
private final boolean signed;
private final long[] literals = new long[MAX_LITERAL_SIZE];
private int numLiterals;
private long repeatBase;
private int numValuesInRun;
private int delta;
private int used;
private boolean repeat;
private long lastReadInputCheckpoint;

public LongInputStreamV1(OrcInputStream input, boolean signed)
{
this.input = input;
this.signed = signed;
lastReadInputCheckpoint = input.getCheckpoint();
}

// This comes from the Apache Hive ORC code
private void readValues()
private void readHeader()
throws IOException
{
lastReadInputCheckpoint = input.getCheckpoint();

int control = input.read();
if (control == -1) {
throw new OrcCorruptionException(input.getOrcDataSourceId(), "Read past end of RLE integer");
}

if (control < 0x80) {
numLiterals = control + MIN_REPEAT_SIZE;
numValuesInRun = control + MIN_REPEAT_SIZE;
used = 0;
repeat = true;
delta = input.read();
Expand All @@ -64,15 +58,12 @@ private void readValues()
// convert from 0 to 255 to -128 to 127 by converting to a signed byte
// noinspection SillyAssignment
delta = (byte) delta;
literals[0] = LongDecode.readVInt(signed, input);
repeatBase = input.readVarint(signed);
}
else {
numLiterals = 0x100 - control;
numValuesInRun = 0x100 - control;
used = 0;
repeat = false;
for (int i = 0; i < numLiterals; ++i) {
literals[i] = LongDecode.readVInt(signed, input);
}
}
}

Expand All @@ -82,15 +73,16 @@ public long next()
throws IOException
{
long result;
if (used == numLiterals) {
readValues();
if (used == numValuesInRun) {
readHeader();
}
if (repeat) {
result = literals[0] + (used++) * delta;
result = repeatBase + used * delta;
}
else {
result = literals[used++];
result = input.readVarint(signed);
}
used++;
return result;
}

Expand All @@ -105,29 +97,25 @@ public void seekToCheckpoint(LongStreamCheckpoint checkpoint)
throws IOException
{
LongStreamV1Checkpoint v1Checkpoint = (LongStreamV1Checkpoint) checkpoint;

// if the checkpoint is within the current buffer, just adjust the pointer
if (lastReadInputCheckpoint == v1Checkpoint.getInputStreamCheckpoint() && v1Checkpoint.getOffset() <= numLiterals) {
used = v1Checkpoint.getOffset();
}
else {
// otherwise, discard the buffer and start over
input.seekToCheckpoint(v1Checkpoint.getInputStreamCheckpoint());
numLiterals = 0;
used = 0;
skip(v1Checkpoint.getOffset());
}
// Discard the buffer and start over
input.seekToCheckpoint(v1Checkpoint.getInputStreamCheckpoint());
numValuesInRun = 0;
used = 0;
skip(v1Checkpoint.getOffset());
}

@Override
public void skip(long items)
throws IOException
{
while (items > 0) {
if (used == numLiterals) {
readValues();
if (used == numValuesInRun) {
readHeader();
}
long consume = Math.min(items, numValuesInRun - used);
if (!repeat) {
input.skipVarints(consume);
}
long consume = Math.min(items, numLiterals - used);
used += consume;
items -= consume;
}
Expand Down