Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 13 additions & 84 deletions src/main/java/com/facebook/presto/BlockBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@
import io.airlift.units.DataSize;
import io.airlift.units.DataSize.Unit;

import java.util.ArrayList;
import java.util.List;

import static com.facebook.presto.TupleInfo.Type.FIXED_INT_64;
import static com.facebook.presto.TupleInfo.Type.VARIABLE_BINARY;
import static com.google.common.base.Preconditions.*;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

public class BlockBuilder
{
Expand All @@ -22,10 +18,7 @@ public class BlockBuilder
private final DynamicSliceOutput sliceOutput;
private int count;

private final int fixedPartSize;

private int currentField;
private final List<Slice> variableLengthFields;
private TupleInfo.Builder tupleBuilder;

public BlockBuilder(long startPosition, TupleInfo tupleInfo)
{
Expand All @@ -42,23 +35,7 @@ public BlockBuilder(long startPosition, TupleInfo tupleInfo, DataSize blockSize)
maxBlockSize = (int) blockSize.toBytes();
sliceOutput = new DynamicSliceOutput((int) blockSize.toBytes());

int fixedPartSize = 0;
int variableLengthFieldCount = 0;
for (TupleInfo.Type type : tupleInfo.getTypes()) {
if (!type.isFixedSize()) {
if (variableLengthFieldCount > 0) {
fixedPartSize += SizeOf.SIZE_OF_SHORT;
}
variableLengthFieldCount++;
}
else {
fixedPartSize += type.getSize();
}
}
fixedPartSize += SizeOf.SIZE_OF_SHORT; // total tuple size

this.fixedPartSize = fixedPartSize;
variableLengthFields = new ArrayList<>(variableLengthFieldCount);
tupleBuilder = tupleInfo.builder(sliceOutput);
}

public boolean isFull()
Expand All @@ -70,90 +47,42 @@ public void append(long value)
{
flushTupleIfNecessary();

checkState(tupleInfo.getTypes().get(currentField) == FIXED_INT_64, "Expected FIXED_INT_64");

sliceOutput.writeLong(value);
currentField++;
tupleBuilder.append(value);
}

public void append(byte[] value)
{
flushTupleIfNecessary();

checkState(tupleInfo.getTypes().get(currentField) == VARIABLE_BINARY, "Expected VARIABLE_BINARY");

variableLengthFields.add(Slices.wrappedBuffer(value));
currentField++;
tupleBuilder.append(Slices.wrappedBuffer(value));
}

public void append(Slice value)
{
flushTupleIfNecessary();

checkState(tupleInfo.getTypes().get(currentField) == VARIABLE_BINARY, "Expected VARIABLE_BINARY");

variableLengthFields.add(value);
currentField++;
tupleBuilder.append(value);
}

public void append(Tuple tuple)
{
// TODO: optimization - single copy of block of fixed length fields

int index = 0;
for (TupleInfo.Type type : tuple.getTupleInfo().getTypes()) {
switch (type) {
case FIXED_INT_64:
append(tuple.getLong(index));
break;
case VARIABLE_BINARY:
append(tuple.getSlice(index));
break;
default:
throw new IllegalStateException("Type not yet supported: " + type);
}

index++;
}
flushTupleIfNecessary();

tupleBuilder.append(tuple);
}

private void flushTupleIfNecessary()
{
if (currentField < tupleInfo.getTypes().size()) {
return;
}

// write offsets
boolean isFirst = true;
int offset = fixedPartSize;
for (Slice field : variableLengthFields) {
if (!isFirst) {
sliceOutput.writeShort(offset);
}
offset += field.length();
isFirst = false;
}

if (!variableLengthFields.isEmpty()) {
sliceOutput.writeShort(offset); // total tuple length
if (tupleBuilder.isComplete()) {
tupleBuilder.finish();
count++;
}

// write values
for (Slice field : variableLengthFields) {
sliceOutput.writeBytes(field);
}

count++;
currentField = 0;
variableLengthFields.clear();
}

public ValueBlock build()
{
flushTupleIfNecessary();

Preconditions.checkState(currentField == 0, "Last tuple is incomplete");

if (count == 0) {
return EmptyValueBlock.INSTANCE;
}
Expand Down
139 changes: 139 additions & 0 deletions src/main/java/com/facebook/presto/TupleInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;

import java.util.ArrayList;
import java.util.List;

import static com.facebook.presto.SizeOf.SIZE_OF_SHORT;
Expand Down Expand Up @@ -37,14 +38,18 @@ boolean isFixedSize()
{
return size != -1;
}

}

private final int size;

private final List<Type> types;
private final List<Integer> offsets;
private final int firstVariableLengthField;
private final int secondVariableLengthField;
private final int fixedSizePart;
private final int variableLengthFieldCount;
private final int variablePartOffset;

public TupleInfo(Type... types)
{
Expand Down Expand Up @@ -77,11 +82,13 @@ public TupleInfo(List<Type> types)
int firstVariableLengthField = -1;
int secondVariableLengthField = -1;

int variableLengthFieldCount = 0;
// process variable length fields
for (int i = 0; i < types.size(); i++) {
Type type = types.get(i);

if (!type.isFixedSize()) {
++variableLengthFieldCount;
offsets[i] = currentOffset;
if (hasVariableLengthFields) {
currentOffset += SIZE_OF_SHORT; // we use a short to encode the offset of a var length field
Expand Down Expand Up @@ -115,15 +122,41 @@ public TupleInfo(List<Type> types)

this.firstVariableLengthField = firstVariableLengthField;
this.secondVariableLengthField = secondVariableLengthField;
this.variableLengthFieldCount = variableLengthFieldCount;

this.offsets = ImmutableList.copyOf(Ints.asList(offsets));


// compute offset of variable sized part
int variablePartOffset = 0;
boolean isFirst = true;
for (TupleInfo.Type type : getTypes()) {
if (!type.isFixedSize()) {
if (!isFirst) { // skip offset field for first variable length field
variablePartOffset += SizeOf.SIZE_OF_SHORT;
}

isFirst = false;
}
else {
variablePartOffset += type.getSize();
}
}
variablePartOffset += SizeOf.SIZE_OF_SHORT; // total tuple size field

this.variablePartOffset = variablePartOffset;
}

public List<Type> getTypes()
{
return types;
}

public int getFieldCount()
{
return types.size();
}

public int size(Slice slice)
{
if (size != -1) {
Expand Down Expand Up @@ -167,6 +200,16 @@ private int getOffset(int index)
return offsets.get(index);
}

public Builder builder(SliceOutput sliceOutput)
{
return new Builder(sliceOutput);
}

public Builder builder()
{
return new Builder(new DynamicSliceOutput(0));
}

@Override
public boolean equals(Object o)
{
Expand Down Expand Up @@ -204,4 +247,100 @@ public String toString()
", fixedSizePart=" + fixedSizePart +
'}';
}


public class Builder
{
private final SliceOutput sliceOutput;
private final List<Slice> variableLengthFields;

private int currentField;

public Builder(SliceOutput sliceOutput)
{
this.sliceOutput = sliceOutput;
this.variableLengthFields = new ArrayList<>(variableLengthFieldCount);
}

public Builder append(long value)
{
checkState(TupleInfo.this.getTypes().get(currentField) == FIXED_INT_64, "Cannot append long. Current field (%s) is of type %s", currentField, TupleInfo.this.getTypes().get(currentField));

sliceOutput.writeLong(value);
currentField++;

return this;
}

public Builder append(Slice value)
{
checkState(TupleInfo.this.getTypes().get(currentField) == VARIABLE_BINARY, "Cannot append binary. Current field (%s) is of type %s", currentField, TupleInfo.this.getTypes().get(currentField));

variableLengthFields.add(value);
currentField++;

return this;
}

public void append(Tuple tuple)
{
// TODO: optimization - single copy of block of fixed length fields

int index = 0;
for (TupleInfo.Type type : tuple.getTupleInfo().getTypes()) {
switch (type) {
case FIXED_INT_64:
append(tuple.getLong(index));
break;
case VARIABLE_BINARY:
append(tuple.getSlice(index));
break;
default:
throw new IllegalStateException("Type not yet supported: " + type);
}

index++;
}
}

public boolean isComplete()
{
return currentField == types.size();
}

public void finish()
{
Preconditions.checkState(currentField == types.size(), "Tuple is incomplete");

// write offsets
boolean isFirst = true;
int offset = variablePartOffset;
for (Slice field : variableLengthFields) {
if (!isFirst) {
sliceOutput.writeShort(offset);
}
offset += field.length();
isFirst = false;
}

if (!variableLengthFields.isEmpty()) {
sliceOutput.writeShort(offset); // total tuple length
}

// write values
for (Slice field : variableLengthFields) {
sliceOutput.writeBytes(field);
}

currentField = 0;
variableLengthFields.clear();
}

public Tuple build()
{
finish();
return new Tuple(sliceOutput.slice(), TupleInfo.this);
}
}

}
22 changes: 11 additions & 11 deletions src/test/java/com/facebook/presto/TestCsvFileScanner.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.facebook.presto;

import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.io.InputSupplier;
import org.testng.Assert;
Expand All @@ -12,7 +11,6 @@
import static com.facebook.presto.SizeOf.SIZE_OF_SHORT;
import static com.facebook.presto.TupleInfo.Type.FIXED_INT_64;
import static com.facebook.presto.TupleInfo.Type.VARIABLE_BINARY;
import static com.google.common.base.Charsets.*;
import static com.google.common.base.Charsets.UTF_8;
import static com.google.common.io.Resources.getResource;
import static com.google.common.io.Resources.newReaderSupplier;
Expand Down Expand Up @@ -54,20 +52,22 @@ public void testIterator()

private Tuple createTuple(String value)
{
byte[] bytes = value.getBytes(UTF_8);
Slice slice = Slices.allocate(bytes.length + SIZE_OF_SHORT);
slice.output()
.appendShort(bytes.length + 2)
.appendBytes(bytes);
TupleInfo tupleInfo = new TupleInfo(VARIABLE_BINARY);
Tuple tuple = tupleInfo.builder()
.append(Slices.wrappedBuffer(value.getBytes(UTF_8)))
.build();

return new Tuple(slice, new TupleInfo(VARIABLE_BINARY));
return tuple;
}

private Tuple createTuple(long value)
{
Slice slice = Slices.allocate(SIZE_OF_LONG);
slice.setLong(0, value);
return new Tuple(slice, new TupleInfo(FIXED_INT_64));
TupleInfo tupleInfo = new TupleInfo(FIXED_INT_64);
Tuple tuple = tupleInfo.builder()
.append(value)
.build();

return tuple;
}


Expand Down
Loading