diff --git a/src/main/java/com/facebook/presto/BasicSliceOutput.java b/src/main/java/com/facebook/presto/BasicSliceOutput.java index 83f703930f650..da6b6363cefb6 100644 --- a/src/main/java/com/facebook/presto/BasicSliceOutput.java +++ b/src/main/java/com/facebook/presto/BasicSliceOutput.java @@ -190,6 +190,41 @@ else if (nBytes < 4) { } } + @Override + public BasicSliceOutput appendLong(long value) + { + writeLong(value); + return this; + } + + @Override + public BasicSliceOutput appendShort(int value) + { + writeShort(value); + return this; + } + + @Override + public BasicSliceOutput appendBytes(byte[] source, int sourceIndex, int length) + { + write(source, sourceIndex, length); + return this; + } + + @Override + public BasicSliceOutput appendBytes(byte[] source) + { + writeBytes(source); + return this; + } + + @Override + public BasicSliceOutput appendBytes(Slice slice) + { + writeBytes(slice); + return this; + } + @Override public Slice slice() { diff --git a/src/main/java/com/facebook/presto/BlockBuilder.java b/src/main/java/com/facebook/presto/BlockBuilder.java index d931d0cb0d126..9bbc726bfbf82 100644 --- a/src/main/java/com/facebook/presto/BlockBuilder.java +++ b/src/main/java/com/facebook/presto/BlockBuilder.java @@ -1,9 +1,17 @@ package com.facebook.presto; import com.google.common.base.Preconditions; +import com.google.common.collect.Ranges; import io.airlift.units.DataSize; import io.airlift.units.DataSize.Unit; +import java.util.ArrayList; +import java.util.List; + +import static com.facebook.presto.TupleInfo.Type.FIXED_INT_64; +import static com.facebook.presto.TupleInfo.Type.VARIABLE_BINARY; +import static com.google.common.base.Preconditions.*; + public class BlockBuilder { private static final DataSize DEFAULT_MAX_BLOCK_SIZE = new DataSize(64, Unit.KILOBYTE); @@ -12,6 +20,12 @@ public class BlockBuilder private final TupleInfo tupleInfo; private final int maxBlockSize; private final DynamicSliceOutput sliceOutput; + private int count; + + private final int fixedPartSize; + + private int currentField; + private final List variableLengthFields; public BlockBuilder(long startPosition, TupleInfo tupleInfo) { @@ -20,13 +34,31 @@ public BlockBuilder(long startPosition, TupleInfo tupleInfo) public BlockBuilder(long startPosition, TupleInfo tupleInfo, DataSize blockSize) { - Preconditions.checkArgument(startPosition >= 0, "startPosition is negative"); - Preconditions.checkNotNull(blockSize, "blockSize is null"); + checkArgument(startPosition >= 0, "startPosition is negative"); + checkNotNull(blockSize, "blockSize is null"); this.startPosition = startPosition; this.tupleInfo = tupleInfo; maxBlockSize = (int) blockSize.toBytes(); sliceOutput = new DynamicSliceOutput((int) blockSize.toBytes()); + + int fixedPartSize = 0; + int variableLengthFieldCount = 0; + for (TupleInfo.Type type : tupleInfo.getTypes()) { + if (!type.isFixedSize()) { + if (variableLengthFieldCount > 0) { + fixedPartSize += SizeOf.SIZE_OF_SHORT; + } + variableLengthFieldCount++; + } + else { + fixedPartSize += type.getSize(); + } + } + fixedPartSize += SizeOf.SIZE_OF_SHORT; // total tuple size + + this.fixedPartSize = fixedPartSize; + variableLengthFields = new ArrayList<>(variableLengthFieldCount); } public boolean isFull() @@ -34,38 +66,98 @@ public boolean isFull() return sliceOutput.size() > maxBlockSize; } - public void append(byte value) + public void append(long value) { - sliceOutput.write(value); - } + flushTupleIfNecessary(); - public void append(int value) - { - sliceOutput.writeInt(value); - } + checkState(tupleInfo.getTypes().get(currentField) == FIXED_INT_64, "Expected FIXED_INT_64"); - public void append(long value) - { sliceOutput.writeLong(value); + currentField++; } public void append(byte[] value) { - sliceOutput.writeBytes(value); + flushTupleIfNecessary(); + + checkState(tupleInfo.getTypes().get(currentField) == VARIABLE_BINARY, "Expected VARIABLE_BINARY"); + + variableLengthFields.add(Slices.wrappedBuffer(value)); + currentField++; } public void append(Slice value) { - sliceOutput.writeBytes(value); + flushTupleIfNecessary(); + + checkState(tupleInfo.getTypes().get(currentField) == VARIABLE_BINARY, "Expected VARIABLE_BINARY"); + + variableLengthFields.add(value); + currentField++; } public void append(Tuple tuple) { - tuple.writeTo(sliceOutput); + // TODO: optimization - single copy of block of fixed length fields + + int index = 0; + for (TupleInfo.Type type : tuple.getTupleInfo().getTypes()) { + switch (type) { + case FIXED_INT_64: + append(tuple.getLong(index)); + break; + case VARIABLE_BINARY: + append(tuple.getSlice(index)); + break; + default: + throw new IllegalStateException("Type not yet supported: " + type); + } + + index++; + } + } + + private void flushTupleIfNecessary() + { + if (currentField < tupleInfo.getTypes().size()) { + return; + } + + // write offsets + boolean isFirst = true; + int offset = fixedPartSize; + for (Slice field : variableLengthFields) { + if (!isFirst) { + sliceOutput.writeShort(offset); + } + offset += field.length(); + isFirst = false; + } + + if (!variableLengthFields.isEmpty()) { + sliceOutput.writeShort(offset); // total tuple length + } + + // write values + for (Slice field : variableLengthFields) { + sliceOutput.writeBytes(field); + } + + count++; + currentField = 0; + variableLengthFields.clear(); } - public UncompressedValueBlock build() + public ValueBlock build() { - return new UncompressedValueBlock(startPosition, tupleInfo, sliceOutput.slice()); + flushTupleIfNecessary(); + + Preconditions.checkState(currentField == 0, "Last tuple is incomplete"); + + if (count == 0) { + return EmptyValueBlock.INSTANCE; + } + + return new UncompressedValueBlock(Ranges.closed(startPosition, startPosition + count - 1), tupleInfo, sliceOutput.slice()); } } diff --git a/src/main/java/com/facebook/presto/CsvFileScanner.java b/src/main/java/com/facebook/presto/CsvFileScanner.java index 33d8fd084d755..1f746a89cc861 100644 --- a/src/main/java/com/facebook/presto/CsvFileScanner.java +++ b/src/main/java/com/facebook/presto/CsvFileScanner.java @@ -12,40 +12,45 @@ import java.io.InputStreamReader; import java.util.Iterator; -public class CsvFileScanner implements Iterable +import static com.google.common.base.Charsets.*; +import static com.google.common.base.Charsets.UTF_8; + +public class CsvFileScanner implements Iterable { private final InputSupplier inputSupplier; private final Splitter columnSplitter; private final int columnIndex; - private final TupleInfo tupleInfo; + private final TupleInfo.Type columnType; - public CsvFileScanner(InputSupplier inputSupplier, int columnIndex, char columnSeparator, TupleInfo tupleInfo) + public CsvFileScanner(InputSupplier inputSupplier, int columnIndex, char columnSeparator, TupleInfo.Type columnType) { + this.columnType = columnType; Preconditions.checkNotNull(inputSupplier, "inputSupplier is null"); this.columnIndex = columnIndex; - this.tupleInfo = tupleInfo; this.inputSupplier = inputSupplier; columnSplitter = Splitter.on(columnSeparator); } @Override - public Iterator iterator() + public Iterator iterator() { - return new ColumnIterator(inputSupplier, columnIndex, columnSplitter, tupleInfo); + return new ColumnIterator(inputSupplier, columnIndex, columnSplitter, columnType); } - private static class ColumnIterator extends AbstractIterator + private static class ColumnIterator extends AbstractIterator { private final LineReader reader; private final TupleInfo tupleInfo; private final int columnIndex; private final Splitter columnSplitter; private long position; + private final TupleInfo.Type columnType; - public ColumnIterator(InputSupplier inputSupplier, int columnIndex, Splitter columnSplitter, TupleInfo tupleInfo) + public ColumnIterator(InputSupplier inputSupplier, int columnIndex, Splitter columnSplitter, TupleInfo.Type columnType) { - this.tupleInfo = tupleInfo; + this.columnType = columnType; + this.tupleInfo = new TupleInfo(columnType); try { this.reader = new LineReader(inputSupplier.getInput()); } @@ -57,7 +62,7 @@ public ColumnIterator(InputSupplier inputSupplier, int column } @Override - protected UncompressedValueBlock computeNext() + protected ValueBlock computeNext() { String line = nextLine(); if (line == null) { @@ -72,7 +77,12 @@ protected UncompressedValueBlock computeNext() // calculate final value for this group // todo add support for other column types - blockBuilder.append(Long.valueOf(value)); + if (columnType == TupleInfo.Type.FIXED_INT_64) { + blockBuilder.append(Long.valueOf(value)); + } + else { + blockBuilder.append(value.getBytes(UTF_8)); + } if (blockBuilder.isFull()) { break; @@ -80,7 +90,7 @@ protected UncompressedValueBlock computeNext() line = nextLine(); } while (line != null); - UncompressedValueBlock block = blockBuilder.build(); + ValueBlock block = blockBuilder.build(); position += block.getCount(); return block; } diff --git a/src/main/java/com/facebook/presto/DynamicSliceOutput.java b/src/main/java/com/facebook/presto/DynamicSliceOutput.java index d832110e1e6b0..ad7bb6232772a 100644 --- a/src/main/java/com/facebook/presto/DynamicSliceOutput.java +++ b/src/main/java/com/facebook/presto/DynamicSliceOutput.java @@ -201,6 +201,41 @@ else if (nBytes < 4) { } } + @Override + public DynamicSliceOutput appendLong(long value) + { + writeLong(value); + return this; + } + + @Override + public DynamicSliceOutput appendShort(int value) + { + writeShort(value); + return this; + } + + @Override + public DynamicSliceOutput appendBytes(byte[] source, int sourceIndex, int length) + { + write(source, sourceIndex, length); + return this; + } + + @Override + public DynamicSliceOutput appendBytes(byte[] source) + { + writeBytes(source); + return this; + } + + @Override + public DynamicSliceOutput appendBytes(Slice slice) + { + writeBytes(slice); + return this; + } + @Override public Slice slice() { diff --git a/src/main/java/com/facebook/presto/EmptyValueBlock.java b/src/main/java/com/facebook/presto/EmptyValueBlock.java index c2361e10f7410..006d9136d3c20 100644 --- a/src/main/java/com/facebook/presto/EmptyValueBlock.java +++ b/src/main/java/com/facebook/presto/EmptyValueBlock.java @@ -12,6 +12,12 @@ public class EmptyValueBlock implements ValueBlock { + public final static EmptyValueBlock INSTANCE = new EmptyValueBlock(); + + private EmptyValueBlock() + { + } + @Override public PositionBlock selectPositions(Predicate predicate) { diff --git a/src/main/java/com/facebook/presto/Merge.java b/src/main/java/com/facebook/presto/Merge.java index d58e93431c39e..a3ce137f7e11a 100644 --- a/src/main/java/com/facebook/presto/Merge.java +++ b/src/main/java/com/facebook/presto/Merge.java @@ -63,7 +63,7 @@ protected ValueBlock computeNext() } } while (Iterables.all(sources, hasNextPredicate())); - UncompressedValueBlock block = blockBuilder.build(); + ValueBlock block = blockBuilder.build(); position += block.getCount(); return block; } diff --git a/src/main/java/com/facebook/presto/PipelinedAggregation.java b/src/main/java/com/facebook/presto/PipelinedAggregation.java index e15af37817319..10bd9a3ce877e 100644 --- a/src/main/java/com/facebook/presto/PipelinedAggregation.java +++ b/src/main/java/com/facebook/presto/PipelinedAggregation.java @@ -56,7 +56,7 @@ protected ValueBlock computeNext() while (!builder.isFull() && groupBySource.hasNext()); // build an output block - UncompressedValueBlock block = builder.build(); + ValueBlock block = builder.build(); position += block.getCount(); return block; } diff --git a/src/main/java/com/facebook/presto/RunLengthEncodedBlock.java b/src/main/java/com/facebook/presto/RunLengthEncodedBlock.java index b7df0b8ff55a7..141ac2a39d7cf 100644 --- a/src/main/java/com/facebook/presto/RunLengthEncodedBlock.java +++ b/src/main/java/com/facebook/presto/RunLengthEncodedBlock.java @@ -6,6 +6,7 @@ import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; import com.google.common.collect.Range; +import com.google.common.collect.Ranges; import java.util.Collections; import java.util.Iterator; @@ -50,17 +51,18 @@ public ValueBlock filter(PositionBlock positions) } } if (matches == 0) { - return new EmptyValueBlock(); + return EmptyValueBlock.INSTANCE; } - Slice newSlice = Slices.allocate(matches * value.getTupleInfo().size()); + + Slice newSlice = Slices.allocate(matches * value.size()); SliceOutput sliceOutput = newSlice.output(); for (int i = 0; i < matches; i++) { value.writeTo(sliceOutput); } // todo what is the start position - return new UncompressedValueBlock(0, value.getTupleInfo(), newSlice); + return new UncompressedValueBlock(Ranges.closed(0L, (long) matches), value.getTupleInfo(), newSlice); } @Override diff --git a/src/main/java/com/facebook/presto/SliceOutput.java b/src/main/java/com/facebook/presto/SliceOutput.java index 0950e9e637b35..34308affd0263 100644 --- a/src/main/java/com/facebook/presto/SliceOutput.java +++ b/src/main/java/com/facebook/presto/SliceOutput.java @@ -264,6 +264,17 @@ public final void write(byte[] source, int sourceIndex, int length) */ public abstract String toString(Charset charset); + + public abstract SliceOutput appendLong(long value); + + public abstract SliceOutput appendShort(int value); + + public abstract SliceOutput appendBytes(byte[] source, int sourceIndex, int length); + + public abstract SliceOutput appendBytes(byte[] source); + + public abstract SliceOutput appendBytes(Slice slice); + // // Unsupported operations // diff --git a/src/main/java/com/facebook/presto/SumAggregation.java b/src/main/java/com/facebook/presto/SumAggregation.java index e642f9bdb7161..16f37533488d1 100644 --- a/src/main/java/com/facebook/presto/SumAggregation.java +++ b/src/main/java/com/facebook/presto/SumAggregation.java @@ -10,13 +10,15 @@ public class SumAggregation @Override public TupleInfo getTupleInfo() { - return new TupleInfo(SIZE_OF_LONG); + return new TupleInfo(TupleInfo.Type.FIXED_INT_64); } @Override public void add(ValueBlock values, PositionBlock relevantPositions) { - for (Tuple value : values.filter(relevantPositions)) { + ValueBlock filtered = values.filter(relevantPositions); + + for (Tuple value : filtered) { sum += value.getLong(0); } } diff --git a/src/main/java/com/facebook/presto/Tuple.java b/src/main/java/com/facebook/presto/Tuple.java index 714e57ac6c47b..3662df28850b5 100644 --- a/src/main/java/com/facebook/presto/Tuple.java +++ b/src/main/java/com/facebook/presto/Tuple.java @@ -1,7 +1,5 @@ package com.facebook.presto; -import com.google.common.base.Preconditions; - public class Tuple { private final Slice slice; @@ -18,40 +16,24 @@ public TupleInfo getTupleInfo() return tupleInfo; } - public byte getByteValue(int index) - { - checkIndexSize(index, SizeOf.SIZE_OF_BYTE); - return slice.getByte(tupleInfo.getOffset(index)); - } - - public int getInt(int index) - { - checkIndexSize(index, SizeOf.SIZE_OF_BYTE); - - return slice.getInt(tupleInfo.getOffset(index)); - } - public long getLong(int index) { - checkIndexSize(index, SizeOf.SIZE_OF_LONG); - return slice.getLong(tupleInfo.getOffset(index)); + return tupleInfo.getLong(slice, index); } public Slice getSlice(int index) { - Preconditions.checkArgument(index < tupleInfo.size()); - return slice.slice(tupleInfo.getOffset(index), tupleInfo.getLength(index)); + return tupleInfo.getSlice(slice, index); } - public void writeTo(SliceOutput out) + public int size() { - out.writeBytes(slice); + return tupleInfo.size(slice); } - private void checkIndexSize(int index, int size) + public void writeTo(SliceOutput out) { - Preconditions.checkArgument(index < tupleInfo.size()); - Preconditions.checkArgument(tupleInfo.getLength(index) == size, "Value %s must be %s bytes wide, but is %s bytes", index, size, tupleInfo.getLength(index)); + out.writeBytes(slice); } @Override diff --git a/src/main/java/com/facebook/presto/TupleInfo.java b/src/main/java/com/facebook/presto/TupleInfo.java index 19bc21d476f1e..976bdcc2b7dcb 100644 --- a/src/main/java/com/facebook/presto/TupleInfo.java +++ b/src/main/java/com/facebook/presto/TupleInfo.java @@ -6,64 +6,165 @@ import java.util.List; +import static com.facebook.presto.SizeOf.SIZE_OF_SHORT; +import static com.facebook.presto.TupleInfo.Type.FIXED_INT_64; +import static com.facebook.presto.TupleInfo.Type.VARIABLE_BINARY; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Arrays.asList; + public class TupleInfo { + public enum Type + { + FIXED_INT_64(8), + VARIABLE_BINARY(-1); + + private final int size; + + private Type(int size) + { + this.size = size; + } + + int getSize() + { + Preconditions.checkState(isFixedSize(), "Can't get size of variable length field"); + return size; + } + + boolean isFixedSize() + { + return size != -1; + } + } + private final int size; - private final List lengths; + private final List types; private final List offsets; + private final int firstVariableLengthField; + private final int secondVariableLengthField; + private final int fixedSizePart; - public TupleInfo(int... lengths) + public TupleInfo(Type... types) { - this(Ints.asList(lengths)); + this(asList(types)); } - public TupleInfo(List lengths) + public TupleInfo(List types) { - Preconditions.checkNotNull(lengths, "lengths is null"); + Preconditions.checkNotNull(types, "types is null"); + Preconditions.checkArgument(!types.isEmpty(), "types is empty"); + + this.types = ImmutableList.copyOf(types); + + int[] offsets = new int[types.size() + 1]; + + int currentOffset = 0; + + // process fixed-length fields first + for (int i = 0; i < types.size(); i++) { + Type type = types.get(i); + + if (type.isFixedSize()) { + offsets[i] = currentOffset; + currentOffset += type.getSize(); + } + } + + boolean hasVariableLengthFields = false; - this.lengths = ImmutableList.copyOf(lengths); - int rowLength = 0; - for (int i = 0; i < lengths.size(); i++) { - int length = lengths.get(i); - Preconditions.checkArgument(length >= 1, "length %s must be at least 1", i); - rowLength += length; + int firstVariableLengthField = -1; + int secondVariableLengthField = -1; + // process variable length fields + for (int i = 0; i < types.size(); i++) { + Type type = types.get(i); + + if (!type.isFixedSize()) { + offsets[i] = currentOffset; + if (hasVariableLengthFields) { + currentOffset += SIZE_OF_SHORT; // we use a short to encode the offset of a var length field + + if (secondVariableLengthField == -1) { + secondVariableLengthField = i; + } + } + else { + firstVariableLengthField = i; + } + + hasVariableLengthFields = true; + } } - ImmutableList.Builder offsets = ImmutableList.builder(); - int current = 0; - for (int length : lengths) { - offsets.add(current); - current += length; + if (secondVariableLengthField == -1) { + secondVariableLengthField = types.size(); // use the length field } - this.offsets = offsets.build(); - this.size = rowLength; + offsets[offsets.length - 1] = currentOffset; + + if (hasVariableLengthFields) { + size = -1; + } + else { + size = currentOffset; + } + + fixedSizePart = currentOffset + SIZE_OF_SHORT; // 2 bytes for the length field + + this.firstVariableLengthField = firstVariableLengthField; + this.secondVariableLengthField = secondVariableLengthField; + + this.offsets = ImmutableList.copyOf(Ints.asList(offsets)); } - public List getLengths() + public List getTypes() { - return lengths; + return types; } - public int getLength(int index) + public int size(Slice slice) { - return lengths.get(index); + if (size != -1) { + return size; + } + + // length of the tuple is located in the "last" fixed-width slot + // this makes variable length column size easy to calculate + return slice.getShort(getOffset(types.size())); } - public List getOffsets() + public long getLong(Slice slice, int index) { - return offsets; + checkState(types.get(index) == FIXED_INT_64, "Expected FIXED_INT_64"); + + return slice.getLong(getOffset(index)); } - public int getOffset(int index) + public Slice getSlice(Slice slice, int index) { - return offsets.get(index); + checkState(types.get(index) == VARIABLE_BINARY, "Expected VARIABLE_BINARY"); + + int start; + int end; + if (index == firstVariableLengthField) { + start = fixedSizePart; + end = slice.getShort(getOffset(secondVariableLengthField)); + } + else { + start = slice.getShort(getOffset(index)); + end = slice.getShort(getOffset(index) + SIZE_OF_SHORT); + } + + // this works because positions of variable length fields are laid out in the same order as the actual data + return slice.slice(start, end - start); } - public int size() + private int getOffset(int index) { - return size; + checkArgument(index != firstVariableLengthField, "Cannot get offset for first variable length field"); + return offsets.get(index); } @Override @@ -78,7 +179,7 @@ public boolean equals(Object o) TupleInfo tupleInfo = (TupleInfo) o; - if (!lengths.equals(tupleInfo.lengths)) { + if (!types.equals(tupleInfo.types)) { return false; } @@ -88,16 +189,19 @@ public boolean equals(Object o) @Override public int hashCode() { - return lengths.hashCode(); + return types.hashCode(); } @Override public String toString() { - final StringBuilder sb = new StringBuilder(); - sb.append("TupleInfo"); - sb.append("{lengths=").append(lengths); - sb.append('}'); - return sb.toString(); + return "TupleInfo{" + + "size=" + size + + ", types=" + types + + ", offsets=" + offsets + + ", firstVariableLengthField=" + firstVariableLengthField + + ", secondVariableLengthField=" + secondVariableLengthField + + ", fixedSizePart=" + fixedSizePart + + '}'; } } diff --git a/src/main/java/com/facebook/presto/UncompressedValueBlock.java b/src/main/java/com/facebook/presto/UncompressedValueBlock.java index fbe247174a023..7103def74ab6f 100644 --- a/src/main/java/com/facebook/presto/UncompressedValueBlock.java +++ b/src/main/java/com/facebook/presto/UncompressedValueBlock.java @@ -9,9 +9,9 @@ import com.google.common.collect.Range; import com.google.common.collect.Ranges; -import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; -import java.util.List; +import java.util.Set; public class UncompressedValueBlock implements ValueBlock @@ -20,19 +20,16 @@ public class UncompressedValueBlock private final TupleInfo tupleInfo; private final Slice slice; - public UncompressedValueBlock(long startPosition, TupleInfo tupleInfo, Slice slice) + public UncompressedValueBlock(Range range, TupleInfo tupleInfo, Slice slice) { - Preconditions.checkArgument(startPosition >= 0, "startPosition is negative"); + Preconditions.checkNotNull(range, "range is null"); + Preconditions.checkArgument(range.lowerEndpoint() >= 0, "range start position is negative"); Preconditions.checkNotNull(tupleInfo, "tupleInfo is null"); Preconditions.checkNotNull(slice, "data is null"); this.tupleInfo = tupleInfo; this.slice = slice; - - Preconditions.checkArgument(slice.length() % tupleInfo.size() == 0, "data must be a multiple of tuple length"); - - int rows = slice.length() / tupleInfo.size(); - range = Ranges.closed(startPosition, startPosition + rows - 1); + this.range = range; } @Override @@ -53,24 +50,38 @@ public ValueBlock selectPairs(Predicate predicate) @Override public ValueBlock filter(PositionBlock positions) { - List indexes = new ArrayList<>(); + // find selected positions + Set indexes = new HashSet<>(); for (long position : positions.getPositions()) { if (range.contains(position)) { - indexes.add(position - range.lowerEndpoint()); + indexes.add((int) (position - range.lowerEndpoint())); } } + + // if no positions are selected, we are done if (indexes.isEmpty()) { - return new EmptyValueBlock(); + return EmptyValueBlock.INSTANCE; } - Slice newSlice = Slices.allocate(indexes.size() * tupleInfo.size()); - SliceOutput sliceOutput = newSlice.output(); - for (long index : indexes) { - sliceOutput.writeBytes(slice, (int) (index * tupleInfo.size()), tupleInfo.size()); + + // build a buffer containing only the tuples from the selected positions + DynamicSliceOutput sliceOutput = new DynamicSliceOutput(1024); + + int currentOffset = 0; + for (int index = 0; index < getCount(); ++index) { + Slice currentPositionToEnd = slice.slice(currentOffset, slice.length() - currentOffset); + int size = tupleInfo.size(currentPositionToEnd); + + // only write selected tuples + if (indexes.contains(index)) { + sliceOutput.writeBytes(slice, currentOffset, size); + } + + currentOffset += size; } // todo what is the start position - return new UncompressedValueBlock(0, tupleInfo, newSlice); + return new UncompressedValueBlock(Ranges.closed(0L, (long) indexes.size() - 1), tupleInfo, sliceOutput.slice()); } @Override @@ -78,6 +89,7 @@ public Iterator iterator() { return new AbstractIterator() { + private int currentOffset = 0; private long index = 0; @Override @@ -87,8 +99,14 @@ protected Tuple computeNext() endOfData(); return null; } - Slice row = slice.slice((int) (index * tupleInfo.size()), tupleInfo.size()); + + Slice currentPositionToEnd = slice.slice(currentOffset, slice.length() - currentOffset); + + int size = tupleInfo.size(currentPositionToEnd); index++; + currentOffset += size; + + Slice row = currentPositionToEnd.slice(0, size); return new Tuple(row, tupleInfo); } }; @@ -99,6 +117,7 @@ public PeekingIterator pairIterator() { return Iterators.peekingIterator(new AbstractIterator() { + private int currentOffset = 0; private long index = 0; @Override @@ -108,7 +127,14 @@ protected Pair computeNext() endOfData(); return null; } - Slice row = slice.slice((int) (index * tupleInfo.size()), tupleInfo.size()); + + Slice currentPositionToEnd = slice.slice(currentOffset, slice.length() - currentOffset); + + int size = tupleInfo.size(currentPositionToEnd); + currentOffset += size; + + Slice row = currentPositionToEnd.slice(0, size); + long position = index + range.lowerEndpoint(); index++; return new Pair(position, new Tuple(row, tupleInfo)); diff --git a/src/test/java/com/facebook/presto/TestCsvFileScanner.java b/src/test/java/com/facebook/presto/TestCsvFileScanner.java index da2871241a737..7fb39c5fa52f0 100644 --- a/src/test/java/com/facebook/presto/TestCsvFileScanner.java +++ b/src/test/java/com/facebook/presto/TestCsvFileScanner.java @@ -9,18 +9,23 @@ import java.io.InputStreamReader; import static com.facebook.presto.SizeOf.SIZE_OF_LONG; +import static com.facebook.presto.SizeOf.SIZE_OF_SHORT; +import static com.facebook.presto.TupleInfo.Type.FIXED_INT_64; +import static com.facebook.presto.TupleInfo.Type.VARIABLE_BINARY; +import static com.google.common.base.Charsets.*; +import static com.google.common.base.Charsets.UTF_8; import static com.google.common.io.Resources.getResource; import static com.google.common.io.Resources.newReaderSupplier; public class TestCsvFileScanner { - private final InputSupplier inputSupplier = newReaderSupplier(getResource("data.csv"), Charsets.UTF_8); + private final InputSupplier inputSupplier = newReaderSupplier(getResource("data.csv"), UTF_8); @Test public void testIterator() throws Exception { - CsvFileScanner firstColumn = new CsvFileScanner(inputSupplier, 0, ',', new TupleInfo(SIZE_OF_LONG)); + CsvFileScanner firstColumn = new CsvFileScanner(inputSupplier, 0, ',', FIXED_INT_64); ImmutableList actual = ImmutableList.copyOf(new PairsIterator(firstColumn.iterator())); Assert.assertEquals(actual, @@ -30,28 +35,40 @@ public void testIterator() new Pair(2, createTuple(2)), new Pair(3, createTuple(3)))); - // todo add support for variable length columns -// CsvFileScanner secondColumn = new CsvFileScanner(inputSupplier, 1, ','); -// Assert.assertEquals(ImmutableList.copyOf(new PairsIterator(secondColumn.iterator())), -// ImmutableList.of( -// new Pair(0, new Tuple("apple")), -// new Pair(1, new Tuple("banana")), -// new Pair(2, new Tuple("cherry")), -// new Pair(3, new Tuple("date")))); -// -// CsvFileScanner thirdColumn = new CsvFileScanner(inputSupplier, 2, ','); -// Assert.assertEquals(ImmutableList.copyOf(new PairsIterator(thirdColumn.iterator())), -// ImmutableList.of( -// new Pair(0, new Tuple("alice")), -// new Pair(1, new Tuple("bob")), -// new Pair(2, new Tuple("charlie")), -// new Pair(3, new Tuple("dave")))); + CsvFileScanner secondColumn = new CsvFileScanner(inputSupplier, 1, ',', VARIABLE_BINARY); + Assert.assertEquals(ImmutableList.copyOf(new PairsIterator(secondColumn.iterator())), + ImmutableList.of( + new Pair(0, createTuple("apple")), + new Pair(1, createTuple("banana")), + new Pair(2, createTuple("cherry")), + new Pair(3, createTuple("date")))); + + CsvFileScanner thirdColumn = new CsvFileScanner(inputSupplier, 2, ',', VARIABLE_BINARY); + Assert.assertEquals(ImmutableList.copyOf(new PairsIterator(thirdColumn.iterator())), + ImmutableList.of( + new Pair(0, createTuple("alice")), + new Pair(1, createTuple("bob")), + new Pair(2, createTuple("charlie")), + new Pair(3, createTuple("dave")))); + } + + private Tuple createTuple(String value) + { + byte[] bytes = value.getBytes(UTF_8); + Slice slice = Slices.allocate(bytes.length + SIZE_OF_SHORT); + slice.output() + .appendShort(bytes.length + 2) + .appendBytes(bytes); + + return new Tuple(slice, new TupleInfo(VARIABLE_BINARY)); } private Tuple createTuple(long value) { Slice slice = Slices.allocate(SIZE_OF_LONG); slice.setLong(0, value); - return new Tuple(slice, new TupleInfo(SIZE_OF_LONG)); + return new Tuple(slice, new TupleInfo(FIXED_INT_64)); } + + } diff --git a/src/test/java/com/facebook/presto/TestExample.java b/src/test/java/com/facebook/presto/TestExample.java index e8f746c0ffd9c..0d4ecfe003d71 100644 --- a/src/test/java/com/facebook/presto/TestExample.java +++ b/src/test/java/com/facebook/presto/TestExample.java @@ -1,9 +1,11 @@ package com.facebook.presto; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Ranges; import java.util.Iterator; +import static com.facebook.presto.TupleInfo.Type.FIXED_INT_64; import static java.util.Arrays.asList; public class TestExample @@ -13,7 +15,7 @@ public static void main(String[] args) DataScan3 scan = newScan(); DataScan3 scan2 = newScan(); - Merge merge = new Merge(ImmutableList.of(scan, scan2), new TupleInfo(1, 1)); + Merge merge = new Merge(ImmutableList.of(scan, scan2), new TupleInfo(FIXED_INT_64, FIXED_INT_64)); while (merge.hasNext()) { ValueBlock block = merge.next(); @@ -27,9 +29,9 @@ public static void main(String[] args) private static DataScan3 newScan() { Iterator values = ImmutableList.builder() - .add(new UncompressedValueBlock(0, new TupleInfo(1), Slices.wrappedBuffer(new byte[]{'a', 'b', 'c', 'd', 'e', 'f'}))) - .add(new UncompressedValueBlock(20, new TupleInfo(1), Slices.wrappedBuffer(new byte[]{'h', 'i', 'j', 'k', 'l', 'm'}))) - .add(new UncompressedValueBlock(30, new TupleInfo(1), Slices.wrappedBuffer(new byte[]{'n', 'o', 'p', 'q', 'r', 's'}))) + .add(new UncompressedValueBlock(Ranges.closed(0L, 5L), new TupleInfo(FIXED_INT_64), Slices.wrappedBuffer(new byte[]{'a', 'b', 'c', 'd', 'e', 'f'}))) + .add(new UncompressedValueBlock(Ranges.closed(20L, 25L), new TupleInfo(FIXED_INT_64), Slices.wrappedBuffer(new byte[]{'h', 'i', 'j', 'k', 'l', 'm'}))) + .add(new UncompressedValueBlock(Ranges.closed(30L, 35L), new TupleInfo(FIXED_INT_64), Slices.wrappedBuffer(new byte[]{'n', 'o', 'p', 'q', 'r', 's'}))) .build() .iterator(); diff --git a/src/test/java/com/facebook/presto/TestSumAggregation.java b/src/test/java/com/facebook/presto/TestSumAggregation.java index ca8bfa517458c..461791073190f 100644 --- a/src/test/java/com/facebook/presto/TestSumAggregation.java +++ b/src/test/java/com/facebook/presto/TestSumAggregation.java @@ -14,8 +14,11 @@ import java.util.List; import java.util.Map; -import static com.facebook.presto.SizeOf.SIZE_OF_BYTE; import static com.facebook.presto.SizeOf.SIZE_OF_LONG; +import static com.facebook.presto.SizeOf.SIZE_OF_SHORT; +import static com.facebook.presto.TupleInfo.Type.FIXED_INT_64; +import static com.facebook.presto.TupleInfo.Type.VARIABLE_BINARY; +import static com.google.common.base.Charsets.UTF_8; public class TestSumAggregation { @@ -23,7 +26,7 @@ public class TestSumAggregation public void testPipelinedAggregation() { GroupBy groupBy = new GroupBy(newGroupColumn()); - PipelinedAggregation aggregation = new PipelinedAggregation(new TupleInfo(SIZE_OF_BYTE, SIZE_OF_LONG), + PipelinedAggregation aggregation = new PipelinedAggregation(new TupleInfo(VARIABLE_BINARY, FIXED_INT_64), groupBy, new ForwardingSeekableIterator<>(newAggregateColumn()), new Provider() @@ -36,10 +39,10 @@ public AggregationFunction get() }); List expected = ImmutableList.of( - new Pair(0, createTuple("a", 10L)), - new Pair(1, createTuple("b", 17L)), - new Pair(2, createTuple("c", 15L)), - new Pair(3, createTuple("d", 6L)) + new Pair(0, createTuple("apple", 10L)), + new Pair(1, createTuple("banana", 17L)), + new Pair(2, createTuple("cherry", 15L)), + new Pair(3, createTuple("date", 6L)) ); List actual = new ArrayList<>(); @@ -55,19 +58,24 @@ public AggregationFunction get() Assert.assertEquals(actual, expected); } - private Tuple createTuple(String character, long count) + private Tuple createTuple(String key, long count) { - Slice slice = Slices.allocate(SIZE_OF_BYTE + SIZE_OF_LONG); - slice.setByte(0, character.charAt(0)); - slice.setLong(1, count); - return new Tuple(slice, new TupleInfo(SIZE_OF_BYTE, SIZE_OF_LONG)); + byte[] bytes = key.getBytes(Charsets.UTF_8); + Slice slice = Slices.allocate(SIZE_OF_LONG + SIZE_OF_SHORT + bytes.length); + + slice.output() + .appendLong(count) + .appendShort(10 + bytes.length) + .appendBytes(bytes); + + return new Tuple(slice, new TupleInfo(VARIABLE_BINARY, FIXED_INT_64)); } @Test public void testHashAggregation() { GroupBy groupBy = new GroupBy(newGroupColumn()); - HashAggregation aggregation = new HashAggregation(new TupleInfo(SIZE_OF_BYTE, SIZE_OF_LONG), + HashAggregation aggregation = new HashAggregation(new TupleInfo(VARIABLE_BINARY, FIXED_INT_64), groupBy, new ForwardingSeekableIterator<>(newAggregateColumn()), new Provider() @@ -80,10 +88,10 @@ public AggregationFunction get() }); Map expected = ImmutableMap.of( - "a", createTuple("a", 10L), - "b", createTuple("b", 17L), - "c", createTuple("c", 15L), - "d", createTuple("d", 6L) + "apple", createTuple("apple", 10L), + "banana", createTuple("banana", 17L), + "cherry", createTuple("cherry", 15L), + "date", createTuple("date", 6L) ); Map actual = new HashMap<>(); @@ -93,7 +101,7 @@ public AggregationFunction get() while (pairs.hasNext()) { Pair pair = pairs.next(); Tuple tuple = pair.getValue(); - actual.put(tuple.getSlice(0).toString(Charsets.UTF_8), tuple); + actual.put(tuple.getSlice(0).toString(UTF_8), tuple); } } @@ -103,11 +111,11 @@ public AggregationFunction get() public Iterator newGroupColumn() { Iterator values = ImmutableList.builder() - .add(new UncompressedValueBlock(0, new TupleInfo(1), Slices.wrappedBuffer(new byte[]{'a', 'a', 'a', 'a', 'b', 'b'}))) - .add(new UncompressedValueBlock(20, new TupleInfo(1), Slices.wrappedBuffer(new byte[]{'b', 'b', 'b', 'c', 'c', 'c'}))) - .add(new UncompressedValueBlock(30, new TupleInfo(1), Slices.wrappedBuffer(new byte[]{'d'}))) - .add(new UncompressedValueBlock(31, new TupleInfo(1), Slices.wrappedBuffer(new byte[]{'d'}))) - .add(new UncompressedValueBlock(32, new TupleInfo(1), Slices.wrappedBuffer(new byte[]{'d'}))) + .add(createBlock(0, "apple", "apple", "apple", "apple", "banana", "banana")) + .add(createBlock(20, "banana", "banana", "banana", "cherry", "cherry", "cherry")) + .add(createBlock(30, "date")) + .add(createBlock(31, "date")) + .add(createBlock(32, "date")) .build() .iterator(); @@ -128,13 +136,25 @@ public Iterator newAggregateColumn() return values; } - private UncompressedValueBlock createBlock(long position, long... values) + private ValueBlock createBlock(int position, String... values) + { + BlockBuilder builder = new BlockBuilder(position, new TupleInfo(VARIABLE_BINARY)); + + for (String value : values) { + builder.append(value.getBytes(UTF_8)); + } + + return builder.build(); + } + + private ValueBlock createBlock(long position, long... values) { - Slice slice = Slices.allocate(values.length * SIZE_OF_LONG); - SliceOutput output = slice.output(); + BlockBuilder builder = new BlockBuilder(position, new TupleInfo(FIXED_INT_64)); + for (long value : values) { - output.writeLong(value); + builder.append(value); } - return new UncompressedValueBlock(position, new TupleInfo(SIZE_OF_LONG), slice); + + return builder.build(); } } diff --git a/src/test/java/com/facebook/presto/TestTupleInfo.java b/src/test/java/com/facebook/presto/TestTupleInfo.java new file mode 100644 index 0000000000000..cbf4dade7cd8d --- /dev/null +++ b/src/test/java/com/facebook/presto/TestTupleInfo.java @@ -0,0 +1,36 @@ +package com.facebook.presto; + +import org.testng.annotations.Test; + +import static com.facebook.presto.SizeOf.SIZE_OF_LONG; +import static com.facebook.presto.SizeOf.SIZE_OF_SHORT; +import static com.facebook.presto.TupleInfo.Type.FIXED_INT_64; +import static com.facebook.presto.TupleInfo.Type.VARIABLE_BINARY; +import static org.testng.Assert.assertEquals; + +public class TestTupleInfo +{ + @Test + public void testBasic() + { + Slice slice = new Slice(SIZE_OF_LONG + SIZE_OF_LONG + SIZE_OF_SHORT + SIZE_OF_SHORT + 10 + 15); + + Slice binary1 = Slices.wrappedBuffer(new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }); + Slice binary2 = Slices.wrappedBuffer(new byte[] { 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24 }); + + slice.output() + .appendLong(42) // value of first long field + .appendLong(67) // value of second long field + .appendShort(30) // offset of second binary field + .appendShort(45) // tuple size + .appendBytes(binary1) + .appendBytes(binary2); + + + TupleInfo info = new TupleInfo(FIXED_INT_64, VARIABLE_BINARY, FIXED_INT_64, VARIABLE_BINARY); + assertEquals(info.getLong(slice, 0), 42L); + assertEquals(info.getSlice(slice, 1), binary1); + assertEquals(info.getLong(slice, 2), 67L); + assertEquals(info.getSlice(slice, 3), binary2); + } +}