diff --git a/src/main/java/com/facebook/presto/AbstractColumnProcessor.java b/src/main/java/com/facebook/presto/AbstractColumnProcessor.java new file mode 100644 index 0000000000000..9d3e8505f3062 --- /dev/null +++ b/src/main/java/com/facebook/presto/AbstractColumnProcessor.java @@ -0,0 +1,43 @@ +package com.facebook.presto; + +import java.io.IOException; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkElementIndex; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +public abstract class AbstractColumnProcessor + implements ColumnProcessor +{ + protected final TupleInfo.Type type; + protected final int index; + protected final Cursor cursor; + + private boolean finished = false; + + protected AbstractColumnProcessor(TupleInfo.Type type, int index, Cursor cursor) + { + checkNotNull(type, "type is null"); + checkNotNull(cursor, "cursor is null"); + checkElementIndex(index, cursor.getTupleInfo().getFieldCount()); + TupleInfo.Type cursorType = cursor.getTupleInfo().getTypes().get(index); + checkArgument(type == cursorType, "type (%s) does not match cursor type (%s) at index (%s)", type, cursorType, index); + + this.type = type; + this.index = index; + this.cursor = cursor; + } + + @Override + public final void finish() + throws IOException + { + checkState(!finished, "finish called twice"); + finished = true; + finished(); + } + + protected abstract void finished() + throws IOException; +} diff --git a/src/main/java/com/facebook/presto/BlockBuilder.java b/src/main/java/com/facebook/presto/BlockBuilder.java index 9427230bf0e34..12968262f59e6 100644 --- a/src/main/java/com/facebook/presto/BlockBuilder.java +++ b/src/main/java/com/facebook/presto/BlockBuilder.java @@ -3,12 +3,12 @@ import com.facebook.presto.slice.DynamicSliceOutput; import com.facebook.presto.slice.Slice; import com.facebook.presto.slice.Slices; -import com.google.common.base.Preconditions; import io.airlift.units.DataSize; import io.airlift.units.DataSize.Unit; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; public class BlockBuilder { @@ -40,6 +40,11 @@ public BlockBuilder(long startPosition, TupleInfo tupleInfo, DataSize blockSize) tupleBuilder = tupleInfo.builder(sliceOutput); } + public boolean isEmpty() + { + return count == 0; + } + public boolean isFull() { return sliceOutput.size() > maxBlockSize; @@ -93,7 +98,8 @@ public UncompressedValueBlock build() { flushTupleIfNecessary(); - Preconditions.checkState(count > 0, "Cannot build an empty block"); + checkState(!tupleBuilder.isPartial(), "Tuple is not complete"); + checkState(!isEmpty(), "Cannot build an empty block"); return new UncompressedValueBlock(Range.create(startPosition, startPosition + count - 1), tupleInfo, sliceOutput.slice()); } diff --git a/src/main/java/com/facebook/presto/Column.java b/src/main/java/com/facebook/presto/Column.java deleted file mode 100644 index 091a53a39c1aa..0000000000000 --- a/src/main/java/com/facebook/presto/Column.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.facebook.presto; - -import java.util.Iterator; - -public class Column - implements Iterable -{ - @Override - public Iterator iterator() - { - return null; - } -} diff --git a/src/main/java/com/facebook/presto/ColumnProcessor.java b/src/main/java/com/facebook/presto/ColumnProcessor.java index a4790d1eb5c83..8d43f630a5559 100644 --- a/src/main/java/com/facebook/presto/ColumnProcessor.java +++ b/src/main/java/com/facebook/presto/ColumnProcessor.java @@ -3,13 +3,13 @@ */ package com.facebook.presto; -import com.facebook.presto.TupleInfo.Type; +import java.io.IOException; public interface ColumnProcessor { - Type getColumnType(); + boolean processPositions(long end) + throws IOException; - void processBlock(ValueBlock block); - - void finish(); + void finish() + throws IOException; } diff --git a/src/main/java/com/facebook/presto/ColumnProcessors.java b/src/main/java/com/facebook/presto/ColumnProcessors.java new file mode 100644 index 0000000000000..e4a37beb0dd45 --- /dev/null +++ b/src/main/java/com/facebook/presto/ColumnProcessors.java @@ -0,0 +1,36 @@ +package com.facebook.presto; + +import java.io.IOException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +public class ColumnProcessors +{ + private static final int DEFAULT_BATCH_SIZE = 10_000; + + public static void process(List processors) + throws IOException + { + process(processors, DEFAULT_BATCH_SIZE); + } + + public static void process(List processors, int batchSize) + throws IOException + { + checkNotNull(processors, "processors is null"); + long end = 0; + while (true) { + end = Math.min(end + batchSize, Integer.MAX_VALUE); + boolean moreData = false; + for (ColumnProcessor processor : processors) { + moreData |= processor.processPositions(end); + } + if (!moreData) { + return; + } + checkState(end < Integer.MAX_VALUE, "processor should be complete"); + } + } +} diff --git a/src/main/java/com/facebook/presto/Csv.java b/src/main/java/com/facebook/presto/Csv.java deleted file mode 100644 index 9de561118a402..0000000000000 --- a/src/main/java/com/facebook/presto/Csv.java +++ /dev/null @@ -1,220 +0,0 @@ -package com.facebook.presto; - -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.base.Throwables; -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.io.CharStreams; -import com.google.common.io.InputSupplier; -import com.google.common.io.LineProcessor; -import com.google.common.io.LineReader; - -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.Iterator; -import java.util.List; - -import static com.facebook.presto.TupleInfo.Type; -import static com.facebook.presto.UncompressedBlockSerde.FloatMillisUncompressedColumnWriter; -import static com.google.common.base.Charsets.UTF_8; - -public class Csv -{ - public static void processCsv(InputSupplier inputSupplier, char columnSeparator, ColumnProcessor... processors) - throws IOException - { - processCsv(inputSupplier, columnSeparator, ImmutableList.copyOf(processors)); - } - - public static void processCsv(InputSupplier inputSupplier, char columnSeparator, List processors) - throws IOException - { - CharStreams.readLines(inputSupplier, new CsvLineProcessor(columnSeparator, processors)); - } - - private static class CsvLineProcessor implements LineProcessor - { - private final Splitter columnSplitter; - private final List handlers; - - public CsvLineProcessor(char columnSeparator, List processors) - { - ImmutableList.Builder builder = ImmutableList.builder(); - for (ColumnProcessor processor : processors) { - builder.add(new ColumnHandler(processor)); - } - final List handlers = builder.build(); - - - this.handlers = handlers; - columnSplitter = Splitter.on(columnSeparator); - } - - @Override - public boolean processLine(String line) - throws IOException - { - Iterator iterator = handlers.iterator(); - for (String value : columnSplitter.split(line)) { - iterator.next().handleValue(value); - } - return true; - } - - @Override - public Void getResult() - { - for (ColumnHandler handler : handlers) { - handler.finish(); - } - return null; - } - } - - private static class ColumnHandler - { - private final ColumnProcessor processor; - private final TupleInfo tupleInfo; - - private BlockBuilder blockBuilder; - private int position; - - private ColumnHandler(ColumnProcessor processor) - { - this.processor = processor; - tupleInfo = new TupleInfo(processor.getColumnType()); - blockBuilder = new BlockBuilder(position, tupleInfo); - } - - public void handleValue(String value) - { - if (blockBuilder.isFull()) { - flushBlock(); - } - - // TODO: fix this - if (processor instanceof FloatMillisUncompressedColumnWriter) { - blockBuilder.append((long) Double.parseDouble(value)); - } - else if (processor.getColumnType() == Type.FIXED_INT_64) { - blockBuilder.append(Long.valueOf(value)); - } - else { - blockBuilder.append(value.getBytes(UTF_8)); - } - } - - public void finish() - { - flushBlock(); - processor.finish(); - } - - private void flushBlock() - { - ValueBlock block = blockBuilder.build(); - position += block.getCount(); - processor.processBlock(block); - blockBuilder = new BlockBuilder(position, tupleInfo); - } - } - - public static Iterable readCsvColumn(InputSupplier inputSupplier, int columnIndex, char columnSeparator, Type columnType) - { - return new CsvColumnReader(inputSupplier, columnIndex, columnSeparator, columnType); - } - - private static class CsvColumnReader implements Iterable - { - private final InputSupplier inputSupplier; - private final Splitter columnSplitter; - private final int columnIndex; - private final Type columnType; - - public CsvColumnReader(InputSupplier inputSupplier, int columnIndex, char columnSeparator, Type columnType) - { - this.columnType = columnType; - Preconditions.checkNotNull(inputSupplier, "inputSupplier is null"); - - this.columnIndex = columnIndex; - this.inputSupplier = inputSupplier; - columnSplitter = Splitter.on(columnSeparator); - } - - @Override - public Iterator iterator() - { - return new ColumnIterator(inputSupplier, columnIndex, columnSplitter, columnType); - } - - private static class ColumnIterator extends AbstractIterator - { - private final LineReader reader; - private final TupleInfo tupleInfo; - private final int columnIndex; - private final Splitter columnSplitter; - private long position; - private final Type columnType; - - public ColumnIterator(InputSupplier inputSupplier, int columnIndex, Splitter columnSplitter, Type columnType) - { - this.columnType = columnType; - this.tupleInfo = new TupleInfo(columnType); - try { - this.reader = new LineReader(inputSupplier.getInput()); - } - catch (IOException e) { - throw Throwables.propagate(e); - } - this.columnIndex = columnIndex; - this.columnSplitter = columnSplitter; - } - - @Override - protected ValueBlock computeNext() - { - String line = nextLine(); - if (line == null) { - endOfData(); - return null; - } - - BlockBuilder blockBuilder = new BlockBuilder(position, tupleInfo); - do { - Iterable split = columnSplitter.split(line); - String value = Iterables.get(split, columnIndex); - - // calculate final value for this group - // todo add support for other column types - if (columnType == Type.FIXED_INT_64) { - blockBuilder.append(Long.valueOf(value)); - } - else { - blockBuilder.append(value.getBytes(UTF_8)); - } - - if (blockBuilder.isFull()) { - break; - } - line = nextLine(); - } while (line != null); - - ValueBlock block = blockBuilder.build(); - position += block.getCount(); - return block; - } - - private String nextLine() - { - try { - return reader.readLine(); - } - catch (IOException e) { - throw Throwables.propagate(e); - } - } - } - } -} diff --git a/src/main/java/com/facebook/presto/CsvReader.java b/src/main/java/com/facebook/presto/CsvReader.java new file mode 100644 index 0000000000000..5516c37300e2a --- /dev/null +++ b/src/main/java/com/facebook/presto/CsvReader.java @@ -0,0 +1,125 @@ +package com.facebook.presto; + +import com.google.common.base.Charsets; +import com.google.common.base.Splitter; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.io.InputSupplier; +import com.google.common.io.LineReader; + +import java.io.IOException; +import java.io.Reader; +import java.util.List; + +import static com.facebook.presto.RowSourceBuilder.RowBuilder; +import static com.facebook.presto.RowSourceBuilder.RowGenerator; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +public class CsvReader + implements InputSupplier +{ + public static interface CsvColumnProcessor + { + void process(String value, RowBuilder rowBuilder); + } + + private final TupleInfo tupleInfo; + private final InputSupplier inputSupplier; + private final Splitter columnSplitter; + private final List processors; + + public CsvReader(TupleInfo tupleInfo, InputSupplier inputSupplier, + char columnDelimiter, List processors) + { + this.tupleInfo = checkNotNull(tupleInfo, "tupleInfo is null"); + this.inputSupplier = checkNotNull(inputSupplier, "inputSupplier is null"); + this.columnSplitter = Splitter.on(columnDelimiter); + this.processors = ImmutableList.copyOf(checkNotNull(processors, "processors is null")); + } + + @Override + public RowSource getInput() + throws IOException + { + return new RowSourceBuilder(tupleInfo, new CsvRowGenerator()); + } + + private class CsvRowGenerator + implements RowGenerator + { + private final Reader reader; + private final LineReader lineReader; + + private CsvRowGenerator() + throws IOException + { + this.reader = inputSupplier.getInput(); + this.lineReader = new LineReader(reader); + } + + @Override + public boolean generate(RowBuilder rowBuilder) + { + String line = nextLine(); + if (line == null) { + return false; + } + + Iterable split = columnSplitter.split(line); + List values = ImmutableList.copyOf(split); + + checkState(values.size() == processors.size(), + "line column count (%d) does not match processor count (%d)", values.size(), processors.size()); + + for (int i = 0; i < values.size(); i++) { + String value = values.get(i); + CsvColumnProcessor processor = processors.get(i); + processor.process(value, rowBuilder); + } + + return true; + } + + private String nextLine() + { + try { + return lineReader.readLine(); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + + @Override + public void close() + throws IOException + { + reader.close(); + } + } + + public static CsvColumnProcessor csvNumericColumn() + { + return new CsvColumnProcessor() + { + @Override + public void process(String value, RowBuilder rowBuilder) + { + rowBuilder.append(Long.valueOf(value)); + } + }; + } + + public static CsvColumnProcessor csvStringColumn() + { + return new CsvColumnProcessor() + { + @Override + public void process(String value, RowBuilder rowBuilder) + { + rowBuilder.append(value.getBytes(Charsets.UTF_8)); + } + }; + } +} diff --git a/src/main/java/com/facebook/presto/Main.java b/src/main/java/com/facebook/presto/Main.java index 5480d401069f5..192a95678d42a 100644 --- a/src/main/java/com/facebook/presto/Main.java +++ b/src/main/java/com/facebook/presto/Main.java @@ -3,7 +3,6 @@ */ package com.facebook.presto; -import com.facebook.presto.UncompressedBlockSerde.UncompressedColumnWriter; import com.google.common.base.Charsets; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -21,12 +20,15 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStream; import java.util.List; import java.util.concurrent.Callable; +import static com.facebook.presto.CsvReader.CsvColumnProcessor; +import static com.facebook.presto.CsvReader.csvNumericColumn; +import static com.facebook.presto.CsvReader.csvStringColumn; import static com.facebook.presto.TupleInfo.Type.FIXED_INT_64; import static com.facebook.presto.TupleInfo.Type.VARIABLE_BINARY; -import static com.facebook.presto.UncompressedBlockSerde.FloatMillisUncompressedColumnWriter; public class Main { @@ -47,7 +49,8 @@ public static void main(String[] args) cli.parse(args).call(); } - public static class BaseCommand implements Callable + public static class BaseCommand + implements Callable { @Override public Void call() @@ -65,7 +68,8 @@ public void run() } @Command(name = "csv", description = "Convert CSV to columns") - public static class ConvertCsv extends BaseCommand + public static class ConvertCsv + extends BaseCommand { @Option(name = {"-d", "--column-delimiter"}, description = "Column delimiter character") public String columnSeparator = ","; @@ -87,39 +91,71 @@ public void run() File dir = new File(outputDir); - ImmutableList.Builder processors = ImmutableList.builder(); - int index = 0; + InputSupplier inputSupplier; + if (csvFile != null) { + inputSupplier = Files.newReaderSupplier(new File(csvFile), Charsets.UTF_8); + } + else { + inputSupplier = new InputSupplier() + { + public InputStreamReader getInput() + { + return new InputStreamReader(System.in, Charsets.UTF_8); + } + }; + } + + ImmutableList.Builder typeBuilder = ImmutableList.builder(); + ImmutableList.Builder csvColumns = ImmutableList.builder(); for (String type : types) { - File file = new File(dir, "column" + index++ + ".data"); switch (type) { case "long": - processors.add(new UncompressedColumnWriter(newOutputStreamSupplier(file), FIXED_INT_64)); + typeBuilder.add(FIXED_INT_64); + csvColumns.add(csvNumericColumn()); break; case "string": - processors.add(new UncompressedColumnWriter(newOutputStreamSupplier(file), VARIABLE_BINARY)); + typeBuilder.add(VARIABLE_BINARY); + csvColumns.add(csvStringColumn()); break; case "fmillis": - processors.add(new FloatMillisUncompressedColumnWriter(newOutputStreamSupplier(file), FIXED_INT_64)); + typeBuilder.add(FIXED_INT_64); + csvColumns.add(csvFloatMillisColumn()); break; default: throw new IllegalArgumentException("Unsupported type " + type); } } - InputSupplier inputSupplier; - if (csvFile != null) { - inputSupplier = Files.newReaderSupplier(new File(csvFile), Charsets.UTF_8); + ImmutableList columnTypes = typeBuilder.build(); + TupleInfo tupleInfo = new TupleInfo(columnTypes); + CsvReader csvReader = new CsvReader(tupleInfo, inputSupplier, toChar(columnSeparator), csvColumns.build()); + + ImmutableList.Builder processorsBuilder = ImmutableList.builder(); + ImmutableList.Builder rowSources = ImmutableList.builder(); + ImmutableList.Builder outputs = ImmutableList.builder(); + for (int index = 0; index < columnTypes.size(); index++) { + TupleInfo.Type type = columnTypes.get(index); + RowSource rowSource = csvReader.getInput(); + File file = new File(dir, "column" + index + ".data"); + OutputStream out = new FileOutputStream(file); + processorsBuilder.add(new UncompressedColumnWriter(type, index, rowSource.cursor(), out)); + rowSources.add(rowSource); + outputs.add(out); } - else { - inputSupplier = new InputSupplier() - { - public InputStreamReader getInput() - { - return new InputStreamReader(System.in, Charsets.UTF_8); - } - }; + List processors = processorsBuilder.build(); + + ColumnProcessors.process(processors); + + for (ColumnProcessor processor : processors) { + processor.finish(); + } + + for (RowSource rowSource : rowSources.build()) { + rowSource.close(); + } + for (OutputStream out : outputs.build()) { + out.close(); } - Csv.processCsv(inputSupplier, toChar(columnSeparator), processors.build()); } private char toChar(String string) @@ -147,5 +183,17 @@ public FileOutputStream getOutput() } }; } + + public static CsvColumnProcessor csvFloatMillisColumn() + { + return new CsvColumnProcessor() + { + @Override + public void process(String value, RowSourceBuilder.RowBuilder rowBuilder) + { + rowBuilder.append((long) Double.parseDouble(value)); + } + }; + } } } diff --git a/src/main/java/com/facebook/presto/RowSource.java b/src/main/java/com/facebook/presto/RowSource.java new file mode 100644 index 0000000000000..352089f692c61 --- /dev/null +++ b/src/main/java/com/facebook/presto/RowSource.java @@ -0,0 +1,8 @@ +package com.facebook.presto; + +import java.io.Closeable; + +public interface RowSource + extends BlockStream, Closeable +{ +} diff --git a/src/main/java/com/facebook/presto/RowSourceBuilder.java b/src/main/java/com/facebook/presto/RowSourceBuilder.java new file mode 100644 index 0000000000000..bc61be68824a7 --- /dev/null +++ b/src/main/java/com/facebook/presto/RowSourceBuilder.java @@ -0,0 +1,132 @@ +package com.facebook.presto; + +import com.facebook.presto.slice.Slice; +import com.google.common.collect.AbstractIterator; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Iterator; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +public class RowSourceBuilder + implements RowSource +{ + public static interface RowBuilder + { + RowBuilder append(long value); + + RowBuilder append(byte[] value); + + RowBuilder append(Slice value); + } + + public static interface RowGenerator + extends Closeable + { + boolean generate(RowBuilder rowBuilder); + } + + private final TupleInfo tupleInfo; + private final RowGenerator rowGenerator; + private boolean cursorCreated = false; + + public RowSourceBuilder(TupleInfo tupleInfo, RowGenerator rowGenerator) + { + this.tupleInfo = checkNotNull(tupleInfo, "tupleInfo is null"); + this.rowGenerator = checkNotNull(rowGenerator, "rowGenerator is null"); + } + + @Override + public TupleInfo getTupleInfo() + { + return tupleInfo; + } + + @Override + public Cursor cursor() + { + checkState(!cursorCreated, "cursor already created"); + cursorCreated = true; + return new UncompressedCursor(tupleInfo, new RowSourceIterator()); + } + + @Override + public Iterator iterator() + { + throw new UnsupportedOperationException(); + } + + @Override + public void close() + throws IOException + { + rowGenerator.close(); + } + + private class RowSourceIterator + extends AbstractIterator + { + private boolean done = false; + private int position = 0; + + @Override + protected UncompressedValueBlock computeNext() + { + if (done) { + return endOfData(); + } + + BlockBuilder blockBuilder = new BlockBuilder(position, tupleInfo); + BasicRowBuilder rowBuilder = new BasicRowBuilder(blockBuilder); + + do { + if (!rowGenerator.generate(rowBuilder)) { + done = true; + if (blockBuilder.isEmpty()) { + return endOfData(); + } + break; + } + } + while (!blockBuilder.isFull()); + + UncompressedValueBlock block = blockBuilder.build(); + position += block.getCount(); + return block; + } + } + + private static class BasicRowBuilder + implements RowBuilder + { + private final BlockBuilder blockBuilder; + + private BasicRowBuilder(BlockBuilder blockBuilder) + { + this.blockBuilder = blockBuilder; + } + + @Override + public RowBuilder append(long value) + { + blockBuilder.append(value); + return this; + } + + @Override + public RowBuilder append(byte[] value) + { + blockBuilder.append(value); + return this; + } + + @Override + public RowBuilder append(Slice value) + { + blockBuilder.append(value); + return this; + } + } +} diff --git a/src/main/java/com/facebook/presto/TupleInfo.java b/src/main/java/com/facebook/presto/TupleInfo.java index 8823350a142f2..7118c8da801aa 100644 --- a/src/main/java/com/facebook/presto/TupleInfo.java +++ b/src/main/java/com/facebook/presto/TupleInfo.java @@ -364,9 +364,14 @@ public boolean isComplete() return currentField == types.size(); } + public boolean isPartial() + { + return (currentField > 0) && (!isComplete()); + } + public void finish() { - Preconditions.checkState(currentField == types.size(), "Tuple is incomplete"); + checkState(isComplete(), "Tuple is incomplete"); // write offsets boolean isFirst = true; diff --git a/src/main/java/com/facebook/presto/UncompressedBlockSerde.java b/src/main/java/com/facebook/presto/UncompressedBlockSerde.java index 3bbb1aa83b200..81baa11e5800f 100644 --- a/src/main/java/com/facebook/presto/UncompressedBlockSerde.java +++ b/src/main/java/com/facebook/presto/UncompressedBlockSerde.java @@ -3,15 +3,15 @@ */ package com.facebook.presto; -import com.facebook.presto.TupleInfo.Type; -import com.facebook.presto.slice.*; +import com.facebook.presto.slice.ByteArraySlice; +import com.facebook.presto.slice.DynamicSliceOutput; +import com.facebook.presto.slice.OutputStreamSliceOutput; +import com.facebook.presto.slice.Slice; +import com.facebook.presto.slice.SliceInput; +import com.facebook.presto.slice.SliceOutput; +import com.facebook.presto.slice.Slices; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.collect.AbstractIterator; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableList.Builder; -import com.google.common.io.Closeables; -import com.google.common.io.OutputSupplier; import io.airlift.units.DataSize; import java.io.File; @@ -155,57 +155,4 @@ protected UncompressedValueBlock computeNext() return new UncompressedValueBlock(range, tupleInfo, block); } } - - public static class UncompressedColumnWriter implements ColumnProcessor - { - private final OutputSupplier outputSupplier; - private final Type type; - private OutputStream out; - - public UncompressedColumnWriter(OutputSupplier outputSupplier, Type type) - { - this.outputSupplier = outputSupplier; - this.type = type; - } - - @Override - public Type getColumnType() - { - return type; - } - - @Override - public void processBlock(ValueBlock block) - { - try { - if (out == null) { - out = outputSupplier.getOutput(); - UncompressedTupleInfoSerde.serialize(new TupleInfo(type), new OutputStreamSliceOutput(out)); - } - - Slice blockSlice = ((UncompressedValueBlock) block).getSlice(); - write(out, block.getCount(), blockSlice); - } - catch (IOException e) { - throw Throwables.propagate(e); - } - } - - @Override - public void finish() - { - Closeables.closeQuietly(out); - out = null; - } - } - - // TODO: fix this horrible hack - public static class FloatMillisUncompressedColumnWriter - extends UncompressedColumnWriter - { - public FloatMillisUncompressedColumnWriter(OutputSupplier outputSupplier, TupleInfo.Type type) - { - super(outputSupplier, type); - } - } } diff --git a/src/main/java/com/facebook/presto/UncompressedColumnWriter.java b/src/main/java/com/facebook/presto/UncompressedColumnWriter.java new file mode 100644 index 0000000000000..84401b213dcc1 --- /dev/null +++ b/src/main/java/com/facebook/presto/UncompressedColumnWriter.java @@ -0,0 +1,126 @@ +package com.facebook.presto; + +import com.facebook.presto.slice.DynamicSliceOutput; +import com.facebook.presto.slice.Slice; +import com.facebook.presto.slice.Slices; +import io.airlift.units.DataSize; + +import java.io.IOException; +import java.io.OutputStream; + +import static com.facebook.presto.SizeOf.SIZE_OF_INT; +import static com.facebook.presto.SizeOf.SIZE_OF_SHORT; +import static com.google.common.base.Preconditions.checkNotNull; +import static io.airlift.units.DataSize.Unit.KILOBYTE; + +public class UncompressedColumnWriter + extends AbstractColumnProcessor +{ + private static final int MAX_BLOCK_SIZE = (int) new DataSize(64, KILOBYTE).toBytes(); + + private final OutputStream out; + + private int tupleCount = 0; + private DynamicSliceOutput buffer = new DynamicSliceOutput(MAX_BLOCK_SIZE * 2); + + public UncompressedColumnWriter(TupleInfo.Type type, int index, Cursor cursor, OutputStream out) + throws IOException + { + super(type, index, cursor); + this.out = checkNotNull(out, "out is null"); + + writeTupleInfo(out, new TupleInfo(type)); + } + + @Override + public boolean processPositions(long end) + throws IOException + { + switch (type) { + case FIXED_INT_64: + return writeFixedInt64(end); + case VARIABLE_BINARY: + return writeVariableBinary(end); + default: + throw new AssertionError("unhandled type: " + type); + } + } + + @Override + protected void finished() + throws IOException + { + flush(); + } + + private boolean writeFixedInt64(long end) + throws IOException + { + while (cursor.hasNextPosition()) { + cursor.advanceNextPosition(); + buffer.appendLong(cursor.getLong(index)); + tupleCount++; + flushIfNecessary(); + if (cursor.getPosition() >= end) { + return true; + } + } + return false; + } + + private boolean writeVariableBinary(long end) + throws IOException + { + while (cursor.hasNextPosition()) { + cursor.advanceNextPosition(); + Slice slice = cursor.getSlice(index); + buffer.appendShort(slice.length() + SIZE_OF_SHORT); + buffer.appendBytes(slice); + tupleCount++; + flushIfNecessary(); + if (cursor.getPosition() >= end) { + return true; + } + } + return false; + } + + private void flushIfNecessary() + throws IOException + { + if (buffer.size() >= MAX_BLOCK_SIZE) { + flush(); + } + } + + private void flush() + throws IOException + { + if (tupleCount > 0) { + write(out, tupleCount, buffer.slice()); + tupleCount = 0; + buffer.reset(); + } + } + + private static void writeTupleInfo(OutputStream out, TupleInfo tupleInfo) + throws IOException + { + out.write(tupleInfo.getFieldCount()); + for (TupleInfo.Type type : tupleInfo.getTypes()) { + out.write(type.ordinal()); + } + } + + private static void write(OutputStream out, int tupleCount, Slice slice) + throws IOException + { + Slice header = Slices.allocate(SIZE_OF_INT + SIZE_OF_INT); + header.output() + .appendInt(slice.length()) + .appendInt(tupleCount); + header.getBytes(0, out, header.length()); + + slice.getBytes(0, out, slice.length()); + } +} diff --git a/src/test/java/com/facebook/presto/CollectingColumnProcessor.java b/src/test/java/com/facebook/presto/CollectingColumnProcessor.java new file mode 100644 index 0000000000000..7392b4db4ba36 --- /dev/null +++ b/src/test/java/com/facebook/presto/CollectingColumnProcessor.java @@ -0,0 +1,56 @@ +package com.facebook.presto; + +import com.google.common.collect.ImmutableList; + +import java.io.IOException; + +import static com.google.common.base.Preconditions.checkState; + +class CollectingColumnProcessor + extends AbstractColumnProcessor +{ + private final BlockBuilder builder; + private BlockStream blockStream; + + CollectingColumnProcessor(TupleInfo.Type type, int index, Cursor cursor) + { + super(type, index, cursor); + this.builder = new BlockBuilder(0, new TupleInfo(type)); + } + + public BlockStream getBlockStream() + { + checkState(blockStream != null, "close not called"); + return blockStream; + } + + @Override + public boolean processPositions(long end) + throws IOException + { + while (cursor.hasNextPosition()) { + cursor.advanceNextPosition(); + switch (type) { + case FIXED_INT_64: + builder.append(cursor.getLong(index)); + break; + case VARIABLE_BINARY: + builder.append(cursor.getSlice(index)); + break; + default: + throw new AssertionError("unhandled type: " + type); + } + if (cursor.getPosition() >= end) { + return true; + } + } + return false; + } + + @Override + protected void finished() + throws IOException + { + blockStream = new UncompressedBlockStream(new TupleInfo(type), ImmutableList.of(builder.build())); + } +} diff --git a/src/test/java/com/facebook/presto/TestCsvFileScanner.java b/src/test/java/com/facebook/presto/TestCsvFileScanner.java index b1fcd823532ce..485799a64f976 100644 --- a/src/test/java/com/facebook/presto/TestCsvFileScanner.java +++ b/src/test/java/com/facebook/presto/TestCsvFileScanner.java @@ -1,20 +1,22 @@ package com.facebook.presto; -import com.facebook.presto.TupleInfo.Type; import com.google.common.collect.ImmutableList; import com.google.common.io.InputSupplier; -import org.testng.Assert; import org.testng.annotations.Test; import java.io.InputStreamReader; import java.util.List; +import static com.facebook.presto.CsvReader.csvNumericColumn; +import static com.facebook.presto.CsvReader.csvStringColumn; import static com.facebook.presto.TupleInfo.Type.FIXED_INT_64; import static com.facebook.presto.TupleInfo.Type.VARIABLE_BINARY; import static com.facebook.presto.Tuples.createTuple; import static com.google.common.base.Charsets.UTF_8; +import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.io.Resources.getResource; import static com.google.common.io.Resources.newReaderSupplier; +import static org.testng.Assert.assertEquals; public class TestCsvFileScanner { @@ -24,93 +26,44 @@ public class TestCsvFileScanner public void testProcessCsv() throws Exception { - CollectingColumnProcessor column1 = new CollectingColumnProcessor(FIXED_INT_64); - CollectingColumnProcessor column2 = new CollectingColumnProcessor(VARIABLE_BINARY); - CollectingColumnProcessor column3 = new CollectingColumnProcessor(VARIABLE_BINARY); - Csv.processCsv(inputSupplier, ',', column1, column2, column3); - - Assert.assertEquals(ImmutableList.copyOf(column1.getBlocks().iterator().next().pairIterator()), - ImmutableList.of( - new Pair(0, createTuple(0)), - new Pair(1, createTuple(1)), - new Pair(2, createTuple(2)), - new Pair(3, createTuple(3)))); - - Assert.assertEquals(ImmutableList.copyOf(column2.getBlocks().iterator().next().pairIterator()), - ImmutableList.of( - new Pair(0, createTuple("apple")), - new Pair(1, createTuple("banana")), - new Pair(2, createTuple("cherry")), - new Pair(3, createTuple("date")))); - - Assert.assertEquals(ImmutableList.copyOf(column3.getBlocks().iterator().next().pairIterator()), - ImmutableList.of( - new Pair(0, createTuple("alice")), - new Pair(1, createTuple("bob")), - new Pair(2, createTuple("charlie")), - new Pair(3, createTuple("dave")))); - - } - @Test - public void testReadCsvColumn() - throws Exception - { - Iterable firstColumn = Csv.readCsvColumn(inputSupplier, 0, ',', FIXED_INT_64); - - ImmutableList actual = ImmutableList.copyOf(new PairsIterator(firstColumn.iterator())); - Assert.assertEquals(actual, - ImmutableList.of( - new Pair(0, createTuple(0)), - new Pair(1, createTuple(1)), - new Pair(2, createTuple(2)), - new Pair(3, createTuple(3)))); - - Iterable secondColumn = Csv.readCsvColumn(inputSupplier, 1, ',', VARIABLE_BINARY); - Assert.assertEquals(ImmutableList.copyOf(new PairsIterator(secondColumn.iterator())), - ImmutableList.of( - new Pair(0, createTuple("apple")), - new Pair(1, createTuple("banana")), - new Pair(2, createTuple("cherry")), - new Pair(3, createTuple("date")))); - - Iterable thirdColumn = Csv.readCsvColumn(inputSupplier, 2, ',', VARIABLE_BINARY); - Assert.assertEquals(ImmutableList.copyOf(new PairsIterator(thirdColumn.iterator())), - ImmutableList.of( - new Pair(0, createTuple("alice")), - new Pair(1, createTuple("bob")), - new Pair(2, createTuple("charlie")), - new Pair(3, createTuple("dave")))); - } - - private static class CollectingColumnProcessor implements ColumnProcessor { - private final Type type; - private final ImmutableList.Builder blocks = ImmutableList.builder(); - - private CollectingColumnProcessor(Type type) - { - this.type = type; - } - - public List getBlocks() - { - return blocks.build(); - } - - @Override - public Type getColumnType() - { - return type; - } - - @Override - public void processBlock(ValueBlock block) - { - blocks.add(block); - } - - @Override - public void finish() - { + TupleInfo tupleInfo = new TupleInfo(FIXED_INT_64, VARIABLE_BINARY, VARIABLE_BINARY); + CsvReader csvReader = new CsvReader(tupleInfo, inputSupplier, ',', + ImmutableList.of(csvNumericColumn(), csvStringColumn(), csvStringColumn())); + + try (RowSource rowSource0 = csvReader.getInput(); + RowSource rowSource1 = csvReader.getInput(); + RowSource rowSource2 = csvReader.getInput()) { + List processors = ImmutableList.of( + new CollectingColumnProcessor(FIXED_INT_64, 0, rowSource0.cursor()), + new CollectingColumnProcessor(VARIABLE_BINARY, 1, rowSource1.cursor()), + new CollectingColumnProcessor(VARIABLE_BINARY, 2, rowSource2.cursor())); + + ColumnProcessors.process(processors, 2); + + for (CollectingColumnProcessor processor : processors) { + processor.finish(); + } + + assertEquals(ImmutableList.copyOf(getOnlyElement(processors.get(0).getBlockStream()).pairIterator()), + ImmutableList.of( + new Pair(0, createTuple(0)), + new Pair(1, createTuple(1)), + new Pair(2, createTuple(2)), + new Pair(3, createTuple(3)))); + + assertEquals(ImmutableList.copyOf(getOnlyElement(processors.get(1).getBlockStream()).pairIterator()), + ImmutableList.of( + new Pair(0, createTuple("apple")), + new Pair(1, createTuple("banana")), + new Pair(2, createTuple("cherry")), + new Pair(3, createTuple("date")))); + + assertEquals(ImmutableList.copyOf(getOnlyElement(processors.get(2).getBlockStream()).pairIterator()), + ImmutableList.of( + new Pair(0, createTuple("alice")), + new Pair(1, createTuple("bob")), + new Pair(2, createTuple("charlie")), + new Pair(3, createTuple("dave")))); } } } diff --git a/src/test/java/com/facebook/presto/TestUncompressedBlockSerde.java b/src/test/java/com/facebook/presto/TestUncompressedBlockReader.java similarity index 70% rename from src/test/java/com/facebook/presto/TestUncompressedBlockSerde.java rename to src/test/java/com/facebook/presto/TestUncompressedBlockReader.java index 32a8b36e51173..abb2302f35f81 100644 --- a/src/test/java/com/facebook/presto/TestUncompressedBlockSerde.java +++ b/src/test/java/com/facebook/presto/TestUncompressedBlockReader.java @@ -15,21 +15,25 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; -public class TestUncompressedBlockSerde +public class TestUncompressedBlockReader { @Test public void testRoundTrip() throws Exception { - BlockBuilder builder = new BlockBuilder(0, new TupleInfo(Type.VARIABLE_BINARY)); - ValueBlock block = builder.append("alice".getBytes(UTF_8)) + TupleInfo tupleInfo = new TupleInfo(Type.VARIABLE_BINARY); + UncompressedValueBlock block = new BlockBuilder(0, tupleInfo) + .append("alice".getBytes(UTF_8)) .append("bob".getBytes(UTF_8)) .append("charlie".getBytes(UTF_8)) .append("dave".getBytes(UTF_8)) .build(); + UncompressedBlockStream blockStream = new UncompressedBlockStream(tupleInfo, ImmutableList.of(block)); ByteArrayOutputStream out = new ByteArrayOutputStream(); - UncompressedBlockSerde.write(ImmutableList.of(block).iterator(), out); + ColumnProcessor processor = new UncompressedColumnWriter(Type.VARIABLE_BINARY, 0, blockStream.cursor(), out); + processor.processPositions(Integer.MAX_VALUE); + processor.finish(); ImmutableList copiedBlocks = ImmutableList.copyOf(UncompressedBlockSerde.read(Slices.wrappedBuffer(out.toByteArray())));