diff --git a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java index 9488a4c8ce..25718117c0 100644 --- a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java @@ -143,7 +143,7 @@ public BytesInput getBytes() { if (DEBUG) LOG.debug("max dic id " + maxDicId); int bitWidth = BytesUtils.getWidthFromMaxInt(maxDicId); // TODO: what is a good initialCapacity? - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 64 * 1024, maxDictionaryByteSize); + RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 64, maxDictionaryByteSize); IntIterator iterator = encodedValues.iterator(); try { while (iterator.hasNext()) { diff --git a/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java b/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java index bcff4761e2..dda8187e66 100644 --- a/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java +++ b/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java @@ -38,7 +38,7 @@ public void test() { MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }"); ColumnDescriptor col = schema.getColumns().get(0); MemPageWriter pageWriter = new MemPageWriter(); - ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, 1024, new ParquetProperties(1024, PARQUET_2_0, true), 2048); + ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, new ParquetProperties(1024, PARQUET_2_0, true), 2048); for (int i = 0; i < rows; i++) { columnWriterV2.write(Binary.fromString("bar" + i % 10), 0, 0); if ((i + 1) % 1000 == 0) { @@ -73,7 +73,7 @@ public void testOptional() { MessageType schema = MessageTypeParser.parseMessageType("message test { optional binary foo; }"); ColumnDescriptor col = schema.getColumns().get(0); MemPageWriter pageWriter = new MemPageWriter(); - ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, 1024, new ParquetProperties(1024, PARQUET_2_0, true), 2048); + ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, new ParquetProperties(1024, PARQUET_2_0, true), 2048); for (int i = 0; i < rows; i++) { columnWriterV2.writeNull(0, 0); if ((i + 1) % 1000 == 0) { diff --git a/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java index a386bbba92..b0abf55fa2 100644 --- a/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java +++ b/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java @@ -156,6 +156,6 @@ public void testMemColumnSeveralPagesRepeated() throws Exception { } private ColumnWriteStoreV1 newColumnWriteStoreImpl(MemPageStore memPageStore) { - return new ColumnWriteStoreV1(memPageStore, 2048, 2048, 2048, false, WriterVersion.PARQUET_1_0); + return new ColumnWriteStoreV1(memPageStore, 2048, 2048, false, WriterVersion.PARQUET_1_0); } } diff --git a/parquet-column/src/test/java/parquet/io/PerfTest.java b/parquet-column/src/test/java/parquet/io/PerfTest.java index 9cd31e3097..2642e09d15 100644 --- a/parquet-column/src/test/java/parquet/io/PerfTest.java +++ b/parquet-column/src/test/java/parquet/io/PerfTest.java @@ -74,7 +74,7 @@ private static void read(MemPageStore memPageStore, MessageType myschema, private static void write(MemPageStore memPageStore) { - ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0); + ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0); MessageColumnIO columnIO = newColumnFactory(schema); GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema); diff --git a/parquet-column/src/test/java/parquet/io/TestColumnIO.java b/parquet-column/src/test/java/parquet/io/TestColumnIO.java index d4442df6a8..bf93c6eb17 100644 --- a/parquet-column/src/test/java/parquet/io/TestColumnIO.java +++ b/parquet-column/src/test/java/parquet/io/TestColumnIO.java @@ -514,7 +514,7 @@ public void testPushParser() { } private ColumnWriteStoreV1 newColumnWriteStore(MemPageStore memPageStore) { - return new ColumnWriteStoreV1(memPageStore, 800, 800, 800, useDictionary, WriterVersion.PARQUET_1_0); + return new ColumnWriteStoreV1(memPageStore, 800, 800, useDictionary, WriterVersion.PARQUET_1_0); } @Test diff --git a/parquet-column/src/test/java/parquet/io/TestFiltered.java b/parquet-column/src/test/java/parquet/io/TestFiltered.java index 7acf6f1e69..2ba9c19e84 100644 --- a/parquet-column/src/test/java/parquet/io/TestFiltered.java +++ b/parquet-column/src/test/java/parquet/io/TestFiltered.java @@ -254,7 +254,7 @@ public void testFilteredNotPaged() { private MemPageStore writeTestRecords(MessageColumnIO columnIO, int number) { MemPageStore memPageStore = new MemPageStore(number * 2); - ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 800, 800, 800, false, WriterVersion.PARQUET_1_0); + ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 800, 800, false, WriterVersion.PARQUET_1_0); GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema); for ( int i = 0; i < number; i++ ) { diff --git a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java index cabfe259f2..1fe4204565 100644 --- a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java +++ b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java @@ -78,7 +78,7 @@ public CapacityByteArrayOutputStream(int initialSlabSize) { public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint) { checkArgument(initialSlabSize > 0, "initialSlabSize must be > 0"); checkArgument(maxCapacityHint > 0, "maxCapacityHint must be > 0"); - checkArgument(initialSlabSize <= maxCapacityHint, "maxCapacityHint can't be less than initialSlabSize"); + checkArgument(maxCapacityHint >= initialSlabSize, String.format("maxCapacityHint can't be less than initialSlabSize %d %d", initialSlabSize, maxCapacityHint)); this.initialSlabSize = initialSlabSize; this.maxCapacityHint = maxCapacityHint; reset(); @@ -154,7 +154,7 @@ public void write(byte b[], int off, int len) { public void writeTo(OutputStream out) throws IOException { for (int i = 0; i < slabs.size() - 1; i++) { final byte[] slab = slabs.get(i); - out.write(slab, 0, slab.length); + out.write(slab); } out.write(currentSlab, 0, currentSlabIndex); } diff --git a/parquet-encoding/src/main/java/parquet/bytes/ConcatenatingByteArrayCollector.java b/parquet-encoding/src/main/java/parquet/bytes/ConcatenatingByteArrayCollector.java new file mode 100644 index 0000000000..ec4f4d38d2 --- /dev/null +++ b/parquet-encoding/src/main/java/parquet/bytes/ConcatenatingByteArrayCollector.java @@ -0,0 +1,48 @@ +package parquet.bytes; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +import static java.lang.String.format; + +public class ConcatenatingByteArrayCollector extends BytesInput { + private final List slabs = new ArrayList(); + private long size = 0; + + public void collect(BytesInput bytes) throws IOException { + collect(bytes.toByteArray()); + } + + public void collect(byte[] bytes) { + slabs.add(bytes); + size += bytes.length; + } + + public void reset() { + size = 0; + slabs.clear(); + } + + @Override + public void writeAllTo(OutputStream out) throws IOException { + for (byte[] slab : slabs) { + out.write(slab); + } + } + + @Override + public long size() { + return size; + } + + /** + * @param prefix a prefix to be used for every new line in the string + * @return a text representation of the memory usage of this structure + */ + public String memUsageString(String prefix) { + return format("%s %s %d slabs, %,d bytes", prefix, getClass().getSimpleName(), slabs.size(), size); + } + +} diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java index 5d38e4364a..e5f9df0603 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -18,6 +18,7 @@ import static parquet.Log.INFO; import static parquet.column.statistics.Statistics.getStatsBasedOnType; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -28,6 +29,7 @@ import parquet.Log; import parquet.bytes.BytesInput; import parquet.bytes.CapacityByteArrayOutputStream; +import parquet.bytes.ConcatenatingByteArrayCollector; import parquet.column.ColumnDescriptor; import parquet.column.Encoding; import parquet.column.page.DictionaryPage; @@ -50,7 +52,8 @@ private static final class ColumnChunkPageWriter implements PageWriter { private final ColumnDescriptor path; private final BytesCompressor compressor; - private final CapacityByteArrayOutputStream buf; + private final ByteArrayOutputStream tempOutputStream = new ByteArrayOutputStream(); + private final ConcatenatingByteArrayCollector buf; private DictionaryPage dictionaryPage; private long uncompressedLength; @@ -69,7 +72,7 @@ private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, // this writer will write many pages, so we make the initial slab size 1 page size. // It will then double over time until it reaches COLUMN_CHUNK_WRITER_MAX_SIZE_HINT at // which point it will grow linearly. - this.buf = new CapacityByteArrayOutputStream(pageSize, COLUMN_CHUNK_WRITER_MAX_SIZE_HINT); + this.buf = new ConcatenatingByteArrayCollector(); this.totalStatistics = getStatsBasedOnType(this.path.getType()); } @@ -93,6 +96,7 @@ public void writePage(BytesInput bytes, "Cannot write compressed page larger than Integer.MAX_VALUE bytes: " + compressedSize); } + tempOutputStream.reset(); parquetMetadataConverter.writeDataPageHeader( (int)uncompressedSize, (int)compressedSize, @@ -101,13 +105,15 @@ public void writePage(BytesInput bytes, rlEncoding, dlEncoding, valuesEncoding, - buf); + tempOutputStream); + buf.collect(tempOutputStream.toByteArray()); + tempOutputStream.reset(); this.uncompressedLength += uncompressedSize; this.compressedLength += compressedSize; this.totalValueCount += valueCount; this.pageCount += 1; this.totalStatistics.mergeStatistics(statistics); - compressedBytes.writeAllTo(buf); + buf.collect(compressedBytes); encodings.add(rlEncoding); encodings.add(dlEncoding); encodings.add(valuesEncoding); @@ -129,21 +135,25 @@ public void writePageV2( int compressedSize = toIntWithCheck( compressedData.size() + repetitionLevels.size() + definitionLevels.size() ); + tempOutputStream.reset(); parquetMetadataConverter.writeDataPageV2Header( uncompressedSize, compressedSize, valueCount, nullCount, rowCount, statistics, dataEncoding, - rlByteLength, dlByteLength, - buf); + rlByteLength, + dlByteLength, + tempOutputStream); + buf.collect(tempOutputStream.toByteArray()); + tempOutputStream.reset(); this.uncompressedLength += uncompressedSize; this.compressedLength += compressedSize; this.totalValueCount += valueCount; this.pageCount += 1; this.totalStatistics.mergeStatistics(statistics); - repetitionLevels.writeAllTo(buf); - definitionLevels.writeAllTo(buf); - compressedData.writeAllTo(buf); + buf.collect(repetitionLevels); + buf.collect(definitionLevels); + buf.collect(compressedData); encodings.add(dataEncoding); } @@ -167,7 +177,7 @@ public void writeToFileWriter(ParquetFileWriter writer) throws IOException { writer.writeDictionaryPage(dictionaryPage); encodings.add(dictionaryPage.getEncoding()); } - writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, new ArrayList(encodings)); + writer.writeDataPages(buf, uncompressedLength, compressedLength, totalStatistics, new ArrayList(encodings)); writer.endColumn(); if (INFO) { LOG.info( @@ -185,7 +195,7 @@ public void writeToFileWriter(ParquetFileWriter writer) throws IOException { @Override public long allocatedSize() { - return buf.getCapacity(); + return buf.size(); } @Override diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java index e1223b666c..60337cddc8 100644 --- a/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java @@ -64,7 +64,7 @@ public void test() throws Exception { writer.start(); writer.startBlock(rowCount); { - ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(f.getCompressor(codec, pageSize ), schema , initialSize, pageSize); + ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(f.getCompressor(codec, pageSize ), schema, pageSize); PageWriter pageWriter = store.getPageWriter(col); pageWriter.writePageV2( rowCount, nullCount, valueCount, diff --git a/parquet-pig/src/test/java/parquet/pig/TupleConsumerPerfTest.java b/parquet-pig/src/test/java/parquet/pig/TupleConsumerPerfTest.java index 68ad1fed3e..9e590d855a 100644 --- a/parquet-pig/src/test/java/parquet/pig/TupleConsumerPerfTest.java +++ b/parquet-pig/src/test/java/parquet/pig/TupleConsumerPerfTest.java @@ -56,7 +56,7 @@ public static void main(String[] args) throws Exception { MessageType schema = new PigSchemaConverter().convert(Utils.getSchemaFromString(pigSchema)); MemPageStore memPageStore = new MemPageStore(0); - ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0); + ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0); write(memPageStore, columns, schema, pigSchema); columns.flush(); read(memPageStore, pigSchema, pigSchemaProjected, pigSchemaNoString); diff --git a/parquet-thrift/src/test/java/parquet/thrift/TestParquetReadProtocol.java b/parquet-thrift/src/test/java/parquet/thrift/TestParquetReadProtocol.java index eb2041250d..43e1a884f2 100644 --- a/parquet-thrift/src/test/java/parquet/thrift/TestParquetReadProtocol.java +++ b/parquet-thrift/src/test/java/parquet/thrift/TestParquetReadProtocol.java @@ -145,7 +145,7 @@ private > void validate(T expected) throws TException { final MessageType schema = schemaConverter.convert(thriftClass); LOG.info(schema); final MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema); - final ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 10000, 10000, 10000, false, WriterVersion.PARQUET_1_0); + final ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 10000, 10000, false, WriterVersion.PARQUET_1_0); final RecordConsumer recordWriter = columnIO.getRecordWriter(columns); final StructType thriftType = schemaConverter.toStructType(thriftClass); ParquetWriteProtocol parquetWriteProtocol = new ParquetWriteProtocol(recordWriter, columnIO, thriftType);