Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions src/main/java/com/facebook/presto/BasicSliceOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,41 @@ else if (nBytes < 4) {
}
}

@Override
public BasicSliceOutput appendLong(long value)
{
writeLong(value);
return this;
}

@Override
public BasicSliceOutput appendShort(int value)
{
writeShort(value);
return this;
}

@Override
public BasicSliceOutput appendBytes(byte[] source, int sourceIndex, int length)
{
write(source, sourceIndex, length);
return this;
}

@Override
public BasicSliceOutput appendBytes(byte[] source)
{
writeBytes(source);
return this;
}

@Override
public BasicSliceOutput appendBytes(Slice slice)
{
writeBytes(slice);
return this;
}

@Override
public Slice slice()
{
Expand Down
124 changes: 108 additions & 16 deletions src/main/java/com/facebook/presto/BlockBuilder.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
package com.facebook.presto;

import com.google.common.base.Preconditions;
import com.google.common.collect.Ranges;
import io.airlift.units.DataSize;
import io.airlift.units.DataSize.Unit;

import java.util.ArrayList;
import java.util.List;

import static com.facebook.presto.TupleInfo.Type.FIXED_INT_64;
import static com.facebook.presto.TupleInfo.Type.VARIABLE_BINARY;
import static com.google.common.base.Preconditions.*;

public class BlockBuilder
{
private static final DataSize DEFAULT_MAX_BLOCK_SIZE = new DataSize(64, Unit.KILOBYTE);
Expand All @@ -12,6 +20,12 @@ public class BlockBuilder
private final TupleInfo tupleInfo;
private final int maxBlockSize;
private final DynamicSliceOutput sliceOutput;
private int count;

private final int fixedPartSize;

private int currentField;
private final List<Slice> variableLengthFields;

public BlockBuilder(long startPosition, TupleInfo tupleInfo)
{
Expand All @@ -20,52 +34,130 @@ public BlockBuilder(long startPosition, TupleInfo tupleInfo)

public BlockBuilder(long startPosition, TupleInfo tupleInfo, DataSize blockSize)
{
Preconditions.checkArgument(startPosition >= 0, "startPosition is negative");
Preconditions.checkNotNull(blockSize, "blockSize is null");
checkArgument(startPosition >= 0, "startPosition is negative");
checkNotNull(blockSize, "blockSize is null");

this.startPosition = startPosition;
this.tupleInfo = tupleInfo;
maxBlockSize = (int) blockSize.toBytes();
sliceOutput = new DynamicSliceOutput((int) blockSize.toBytes());

int fixedPartSize = 0;
int variableLengthFieldCount = 0;
for (TupleInfo.Type type : tupleInfo.getTypes()) {
if (!type.isFixedSize()) {
if (variableLengthFieldCount > 0) {
fixedPartSize += SizeOf.SIZE_OF_SHORT;
}
variableLengthFieldCount++;
}
else {
fixedPartSize += type.getSize();
}
}
fixedPartSize += SizeOf.SIZE_OF_SHORT; // total tuple size

this.fixedPartSize = fixedPartSize;
variableLengthFields = new ArrayList<>(variableLengthFieldCount);
}

public boolean isFull()
{
return sliceOutput.size() > maxBlockSize;
}

public void append(byte value)
public void append(long value)
{
sliceOutput.write(value);
}
flushTupleIfNecessary();

public void append(int value)
{
sliceOutput.writeInt(value);
}
checkState(tupleInfo.getTypes().get(currentField) == FIXED_INT_64, "Expected FIXED_INT_64");

public void append(long value)
{
sliceOutput.writeLong(value);
currentField++;
}

public void append(byte[] value)
{
sliceOutput.writeBytes(value);
flushTupleIfNecessary();

checkState(tupleInfo.getTypes().get(currentField) == VARIABLE_BINARY, "Expected VARIABLE_BINARY");

variableLengthFields.add(Slices.wrappedBuffer(value));
currentField++;
}

public void append(Slice value)
{
sliceOutput.writeBytes(value);
flushTupleIfNecessary();

checkState(tupleInfo.getTypes().get(currentField) == VARIABLE_BINARY, "Expected VARIABLE_BINARY");

variableLengthFields.add(value);
currentField++;
}

public void append(Tuple tuple)
{
tuple.writeTo(sliceOutput);
// TODO: optimization - single copy of block of fixed length fields

int index = 0;
for (TupleInfo.Type type : tuple.getTupleInfo().getTypes()) {
switch (type) {
case FIXED_INT_64:
append(tuple.getLong(index));
break;
case VARIABLE_BINARY:
append(tuple.getSlice(index));
break;
default:
throw new IllegalStateException("Type not yet supported: " + type);
}

index++;
}
}

private void flushTupleIfNecessary()
{
if (currentField < tupleInfo.getTypes().size()) {
return;
}

// write offsets
boolean isFirst = true;
int offset = fixedPartSize;
for (Slice field : variableLengthFields) {
if (!isFirst) {
sliceOutput.writeShort(offset);
}
offset += field.length();
isFirst = false;
}

if (!variableLengthFields.isEmpty()) {
sliceOutput.writeShort(offset); // total tuple length
}

// write values
for (Slice field : variableLengthFields) {
sliceOutput.writeBytes(field);
}

count++;
currentField = 0;
variableLengthFields.clear();
}

public UncompressedValueBlock build()
public ValueBlock build()
{
return new UncompressedValueBlock(startPosition, tupleInfo, sliceOutput.slice());
flushTupleIfNecessary();

Preconditions.checkState(currentField == 0, "Last tuple is incomplete");

if (count == 0) {
return EmptyValueBlock.INSTANCE;
}

return new UncompressedValueBlock(Ranges.closed(startPosition, startPosition + count - 1), tupleInfo, sliceOutput.slice());
}
}
34 changes: 22 additions & 12 deletions src/main/java/com/facebook/presto/CsvFileScanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,45 @@
import java.io.InputStreamReader;
import java.util.Iterator;

public class CsvFileScanner implements Iterable<UncompressedValueBlock>
import static com.google.common.base.Charsets.*;
import static com.google.common.base.Charsets.UTF_8;

public class CsvFileScanner implements Iterable<ValueBlock>
{
private final InputSupplier<InputStreamReader> inputSupplier;
private final Splitter columnSplitter;
private final int columnIndex;
private final TupleInfo tupleInfo;
private final TupleInfo.Type columnType;

public CsvFileScanner(InputSupplier<InputStreamReader> inputSupplier, int columnIndex, char columnSeparator, TupleInfo tupleInfo)
public CsvFileScanner(InputSupplier<InputStreamReader> inputSupplier, int columnIndex, char columnSeparator, TupleInfo.Type columnType)
{
this.columnType = columnType;
Preconditions.checkNotNull(inputSupplier, "inputSupplier is null");

this.columnIndex = columnIndex;
this.tupleInfo = tupleInfo;
this.inputSupplier = inputSupplier;
columnSplitter = Splitter.on(columnSeparator);
}

@Override
public Iterator<UncompressedValueBlock> iterator()
public Iterator<ValueBlock> iterator()
{
return new ColumnIterator(inputSupplier, columnIndex, columnSplitter, tupleInfo);
return new ColumnIterator(inputSupplier, columnIndex, columnSplitter, columnType);
}

private static class ColumnIterator extends AbstractIterator<UncompressedValueBlock>
private static class ColumnIterator extends AbstractIterator<ValueBlock>
{
private final LineReader reader;
private final TupleInfo tupleInfo;
private final int columnIndex;
private final Splitter columnSplitter;
private long position;
private final TupleInfo.Type columnType;

public ColumnIterator(InputSupplier<InputStreamReader> inputSupplier, int columnIndex, Splitter columnSplitter, TupleInfo tupleInfo)
public ColumnIterator(InputSupplier<InputStreamReader> inputSupplier, int columnIndex, Splitter columnSplitter, TupleInfo.Type columnType)
{
this.tupleInfo = tupleInfo;
this.columnType = columnType;
this.tupleInfo = new TupleInfo(columnType);
try {
this.reader = new LineReader(inputSupplier.getInput());
}
Expand All @@ -57,7 +62,7 @@ public ColumnIterator(InputSupplier<InputStreamReader> inputSupplier, int column
}

@Override
protected UncompressedValueBlock computeNext()
protected ValueBlock computeNext()
{
String line = nextLine();
if (line == null) {
Expand All @@ -72,15 +77,20 @@ protected UncompressedValueBlock computeNext()

// calculate final value for this group
// todo add support for other column types
blockBuilder.append(Long.valueOf(value));
if (columnType == TupleInfo.Type.FIXED_INT_64) {
blockBuilder.append(Long.valueOf(value));
}
else {
blockBuilder.append(value.getBytes(UTF_8));
}

if (blockBuilder.isFull()) {
break;
}
line = nextLine();
} while (line != null);

UncompressedValueBlock block = blockBuilder.build();
ValueBlock block = blockBuilder.build();
position += block.getCount();
return block;
}
Expand Down
35 changes: 35 additions & 0 deletions src/main/java/com/facebook/presto/DynamicSliceOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,41 @@ else if (nBytes < 4) {
}
}

@Override
public DynamicSliceOutput appendLong(long value)
{
writeLong(value);
return this;
}

@Override
public DynamicSliceOutput appendShort(int value)
{
writeShort(value);
return this;
}

@Override
public DynamicSliceOutput appendBytes(byte[] source, int sourceIndex, int length)
{
write(source, sourceIndex, length);
return this;
}

@Override
public DynamicSliceOutput appendBytes(byte[] source)
{
writeBytes(source);
return this;
}

@Override
public DynamicSliceOutput appendBytes(Slice slice)
{
writeBytes(slice);
return this;
}

@Override
public Slice slice()
{
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/facebook/presto/EmptyValueBlock.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
public class EmptyValueBlock
implements ValueBlock
{
public final static EmptyValueBlock INSTANCE = new EmptyValueBlock();

private EmptyValueBlock()
{
}

@Override
public PositionBlock selectPositions(Predicate<Tuple> predicate)
{
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/facebook/presto/Merge.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ protected ValueBlock computeNext()
}
} while (Iterables.all(sources, hasNextPredicate()));

UncompressedValueBlock block = blockBuilder.build();
ValueBlock block = blockBuilder.build();
position += block.getCount();
return block;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ protected ValueBlock computeNext()
while (!builder.isFull() && groupBySource.hasNext());

// build an output block
UncompressedValueBlock block = builder.build();
ValueBlock block = builder.build();
position += block.getCount();
return block;
}
Expand Down
Loading