diff --git a/presto-parquet/pom.xml b/presto-parquet/pom.xml
index 09c9a45cfc478..7f2caa8e0b12a 100644
--- a/presto-parquet/pom.xml
+++ b/presto-parquet/pom.xml
@@ -227,49 +227,6 @@
-
- com.googlecode.fmpp-maven-plugin
- fmpp-maven-plugin
- 1.0
-
-
- net.sourceforge.fmpp
- fmpp
- 0.9.15
-
-
-
- ${project.basedir}/src/main/resources/freemarker/config.fmpp
- ${project.build.directory}/generated-sources/java/
- ${project.basedir}/src/main/resources/freemarker/templates
-
-
-
- generate-sources
-
- generate
-
-
-
-
-
- org.codehaus.mojo
- build-helper-maven-plugin
-
-
- add-source
- generate-sources
-
- add-source
-
-
-
- ${project.build.directory}/generated-sources/java/
-
-
-
-
-
diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BooleanFlatBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BooleanFlatBatchReader.java
new file mode 100644
index 0000000000000..a16af1f2cc1e6
--- /dev/null
+++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BooleanFlatBatchReader.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.parquet.batchreader;
+
+import com.facebook.presto.common.block.Block;
+import com.facebook.presto.common.block.ByteArrayBlock;
+import com.facebook.presto.common.block.RunLengthEncodedBlock;
+import com.facebook.presto.parquet.ColumnReader;
+import com.facebook.presto.parquet.DataPage;
+import com.facebook.presto.parquet.DictionaryPage;
+import com.facebook.presto.parquet.Field;
+import com.facebook.presto.parquet.RichColumnDescriptor;
+import com.facebook.presto.parquet.batchreader.decoders.Decoders.FlatDecoders;
+import com.facebook.presto.parquet.batchreader.decoders.FlatDefinitionLevelDecoder;
+import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.BooleanValuesDecoder;
+import com.facebook.presto.parquet.batchreader.dictionary.Dictionaries;
+import com.facebook.presto.parquet.dictionary.Dictionary;
+import com.facebook.presto.parquet.reader.ColumnChunk;
+import com.facebook.presto.parquet.reader.PageReader;
+import com.facebook.presto.spi.PrestoException;
+import org.apache.parquet.internal.filter2.columnindex.RowRanges;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.openjdk.jol.info.ClassLayout;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static com.facebook.presto.parquet.ParquetErrorCode.PARQUET_IO_READ_ERROR;
+import static com.facebook.presto.parquet.batchreader.decoders.Decoders.readFlatPage;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+public class BooleanFlatBatchReader
+ implements ColumnReader
+{
+ private static final int INSTANCE_SIZE = ClassLayout.parseClass(BooleanFlatBatchReader.class).instanceSize();
+
+ private final RichColumnDescriptor columnDescriptor;
+
+ protected Field field;
+ protected int nextBatchSize;
+ protected FlatDefinitionLevelDecoder definitionLevelDecoder;
+ protected BooleanValuesDecoder valuesDecoder;
+ protected int remainingCountInPage;
+
+ private Dictionary dictionary;
+ private int readOffset;
+ private PageReader pageReader;
+
+ public BooleanFlatBatchReader(RichColumnDescriptor columnDescriptor)
+ {
+ this.columnDescriptor = requireNonNull(columnDescriptor, "columnDescriptor is null");
+ }
+
+ @Override
+ public boolean isInitialized()
+ {
+ return pageReader != null && field != null;
+ }
+
+ @Override
+ public void init(PageReader pageReader, Field field, RowRanges rowRanges)
+ {
+ checkArgument(!isInitialized(), "Parquet batch reader already initialized");
+ this.pageReader = requireNonNull(pageReader, "pageReader is null");
+ checkArgument(pageReader.getValueCountInColumnChunk() > 0, "page is empty");
+ this.field = requireNonNull(field, "field is null");
+
+ DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+ if (dictionaryPage != null) {
+ dictionary = Dictionaries.createDictionary(columnDescriptor, dictionaryPage);
+ }
+ }
+
+ @Override
+ public void prepareNextRead(int batchSize)
+ {
+ readOffset = readOffset + nextBatchSize;
+ nextBatchSize = batchSize;
+ }
+
+ @Override
+ public ColumnChunk readNext()
+ {
+ ColumnChunk columnChunk = null;
+ try {
+ seek();
+ if (field.isRequired()) {
+ columnChunk = readWithoutNull();
+ }
+ else {
+ columnChunk = readWithNull();
+ }
+ }
+ catch (IOException exception) {
+ throw new PrestoException(PARQUET_IO_READ_ERROR, "Error reading Parquet column " + columnDescriptor, exception);
+ }
+
+ readOffset = 0;
+ nextBatchSize = 0;
+ return columnChunk;
+ }
+
+ @Override
+ public long getRetainedSizeInBytes()
+ {
+ return INSTANCE_SIZE +
+ (definitionLevelDecoder == null ? 0 : definitionLevelDecoder.getRetainedSizeInBytes()) +
+ (valuesDecoder == null ? 0 : valuesDecoder.getRetainedSizeInBytes()) +
+ (dictionary == null ? 0 : dictionary.getRetainedSizeInBytes()) +
+ (pageReader == null ? 0 : pageReader.getRetainedSizeInBytes());
+ }
+
+ protected boolean readNextPage()
+ {
+ definitionLevelDecoder = null;
+ valuesDecoder = null;
+ remainingCountInPage = 0;
+
+ DataPage page = pageReader.readPage();
+ if (page == null) {
+ return false;
+ }
+
+ FlatDecoders flatDecoders = readFlatPage(page, columnDescriptor, dictionary);
+ definitionLevelDecoder = flatDecoders.getDefinitionLevelDecoder();
+ valuesDecoder = (BooleanValuesDecoder) flatDecoders.getValuesDecoder();
+
+ remainingCountInPage = page.getValueCount();
+ return true;
+ }
+
+ private ColumnChunk readWithNull()
+ throws IOException
+ {
+ byte[] values = new byte[nextBatchSize];
+ boolean[] isNull = new boolean[nextBatchSize];
+
+ int totalNonNullCount = 0;
+ int remainingInBatch = nextBatchSize;
+ int startOffset = 0;
+ while (remainingInBatch > 0) {
+ if (remainingCountInPage == 0) {
+ if (!readNextPage()) {
+ break;
+ }
+ }
+
+ int chunkSize = Math.min(remainingCountInPage, remainingInBatch);
+ int nonNullCount = definitionLevelDecoder.readNext(isNull, startOffset, chunkSize);
+ totalNonNullCount += nonNullCount;
+
+ if (nonNullCount > 0) {
+ valuesDecoder.readNext(values, startOffset, nonNullCount);
+
+ int valueDestinationIndex = startOffset + chunkSize - 1;
+ int valueSourceIndex = startOffset + nonNullCount - 1;
+
+ while (valueDestinationIndex >= startOffset) {
+ if (!isNull[valueDestinationIndex]) {
+ values[valueDestinationIndex] = values[valueSourceIndex];
+ valueSourceIndex--;
+ }
+ valueDestinationIndex--;
+ }
+ }
+
+ startOffset += chunkSize;
+ remainingInBatch -= chunkSize;
+ remainingCountInPage -= chunkSize;
+ }
+
+ if (remainingInBatch != 0) {
+ throw new ParquetDecodingException("Still remaining to be read in current batch.");
+ }
+
+ if (totalNonNullCount == 0) {
+ Block block = RunLengthEncodedBlock.create(field.getType(), null, nextBatchSize);
+ return new ColumnChunk(block, new int[0], new int[0]);
+ }
+
+ boolean hasNoNull = totalNonNullCount == nextBatchSize;
+ Block block = new ByteArrayBlock(nextBatchSize, hasNoNull ? Optional.empty() : Optional.of(isNull), values);
+ return new ColumnChunk(block, new int[0], new int[0]);
+ }
+
+ private ColumnChunk readWithoutNull()
+ throws IOException
+ {
+ byte[] values = new byte[nextBatchSize];
+ int remainingInBatch = nextBatchSize;
+ int startOffset = 0;
+ while (remainingInBatch > 0) {
+ if (remainingCountInPage == 0) {
+ if (!readNextPage()) {
+ break;
+ }
+ }
+
+ int chunkSize = Math.min(remainingCountInPage, remainingInBatch);
+
+ valuesDecoder.readNext(values, startOffset, chunkSize);
+ startOffset += chunkSize;
+ remainingInBatch -= chunkSize;
+ remainingCountInPage -= chunkSize;
+ }
+
+ if (remainingInBatch != 0) {
+ throw new ParquetDecodingException(format("Corrupted Parquet file: extra %d values to be consumed when scanning current batch", remainingInBatch));
+ }
+
+ Block block = new ByteArrayBlock(nextBatchSize, Optional.empty(), values);
+ return new ColumnChunk(block, new int[0], new int[0]);
+ }
+
+ private void seek()
+ throws IOException
+ {
+ if (readOffset == 0) {
+ return;
+ }
+
+ int remainingInBatch = readOffset;
+ int startOffset = 0;
+ while (remainingInBatch > 0) {
+ if (remainingCountInPage == 0) {
+ if (!readNextPage()) {
+ break;
+ }
+ }
+
+ int chunkSize = Math.min(remainingCountInPage, remainingInBatch);
+ int skipSize = chunkSize;
+ if (!columnDescriptor.isRequired()) {
+ boolean[] isNull = new boolean[readOffset];
+ int nonNullCount = definitionLevelDecoder.readNext(isNull, startOffset, chunkSize);
+ skipSize = nonNullCount;
+ startOffset += chunkSize;
+ }
+ valuesDecoder.skip(skipSize);
+ remainingInBatch -= chunkSize;
+ remainingCountInPage -= chunkSize;
+ }
+ }
+}
diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BooleanNestedBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BooleanNestedBatchReader.java
new file mode 100644
index 0000000000000..f89ea02b21894
--- /dev/null
+++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BooleanNestedBatchReader.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.parquet.batchreader;
+
+import com.facebook.presto.common.block.Block;
+import com.facebook.presto.common.block.ByteArrayBlock;
+import com.facebook.presto.common.block.RunLengthEncodedBlock;
+import com.facebook.presto.parquet.RichColumnDescriptor;
+import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.BooleanValuesDecoder;
+import com.facebook.presto.parquet.reader.ColumnChunk;
+
+import java.io.IOException;
+import java.util.Optional;
+
+public class BooleanNestedBatchReader
+ extends AbstractNestedBatchReader
+{
+ public BooleanNestedBatchReader(RichColumnDescriptor columnDescriptor)
+ {
+ super(columnDescriptor);
+ }
+
+ @Override
+ protected ColumnChunk readNestedWithNull()
+ throws IOException
+ {
+ int maxDefinitionLevel = columnDescriptor.getMaxDefinitionLevel();
+ RepetitionLevelDecodingContext repetitionLevelDecodingContext = readRepetitionLevels(nextBatchSize);
+ DefinitionLevelDecodingContext definitionLevelDecodingContext = readDefinitionLevels(repetitionLevelDecodingContext.getDLValuesDecoderContexts(), repetitionLevelDecodingContext.getRepetitionLevels().length);
+
+ int[] definitionLevels = definitionLevelDecodingContext.getDefinitionLevels();
+ int newBatchSize = 0;
+ int batchNonNullCount = 0;
+ for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
+ int nonNullCount = 0;
+ int valueCount = 0;
+ for (int i = valuesDecoderContext.getStart(); i < valuesDecoderContext.getEnd(); i++) {
+ nonNullCount += (definitionLevels[i] == maxDefinitionLevel ? 1 : 0);
+ valueCount += (definitionLevels[i] >= maxDefinitionLevel - 1 ? 1 : 0);
+ }
+ batchNonNullCount += nonNullCount;
+ newBatchSize += valueCount;
+ valuesDecoderContext.setNonNullCount(nonNullCount);
+ valuesDecoderContext.setValueCount(valueCount);
+ }
+
+ if (batchNonNullCount == 0) {
+ Block block = RunLengthEncodedBlock.create(field.getType(), null, newBatchSize);
+ return new ColumnChunk(block, definitionLevels, repetitionLevelDecodingContext.getRepetitionLevels());
+ }
+
+ byte[] values = new byte[newBatchSize];
+ boolean[] isNull = new boolean[newBatchSize];
+ int offset = 0;
+ for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
+ ((BooleanValuesDecoder) valuesDecoderContext.getValuesDecoder()).readNext(values, offset, valuesDecoderContext.getNonNullCount());
+
+ int valueDestinationIndex = offset + valuesDecoderContext.getValueCount() - 1;
+ int valueSourceIndex = offset + valuesDecoderContext.getNonNullCount() - 1;
+ int definitionLevelIndex = valuesDecoderContext.getEnd() - 1;
+
+ while (valueDestinationIndex >= offset) {
+ if (definitionLevels[definitionLevelIndex] == maxDefinitionLevel) {
+ values[valueDestinationIndex--] = values[valueSourceIndex--];
+ }
+ else if (definitionLevels[definitionLevelIndex] == maxDefinitionLevel - 1) {
+ values[valueDestinationIndex] = 0;
+ isNull[valueDestinationIndex] = true;
+ valueDestinationIndex--;
+ }
+ definitionLevelIndex--;
+ }
+ offset += valuesDecoderContext.getValueCount();
+ }
+
+ boolean hasNoNull = batchNonNullCount == newBatchSize;
+ Block block = new ByteArrayBlock(newBatchSize, hasNoNull ? Optional.empty() : Optional.of(isNull), values);
+ return new ColumnChunk(block, definitionLevels, repetitionLevelDecodingContext.getRepetitionLevels());
+ }
+
+ @Override
+ protected ColumnChunk readNestedNoNull()
+ throws IOException
+ {
+ int maxDefinitionLevel = columnDescriptor.getMaxDefinitionLevel();
+ RepetitionLevelDecodingContext repetitionLevelDecodingContext = readRepetitionLevels(nextBatchSize);
+ DefinitionLevelDecodingContext definitionLevelDecodingContext = readDefinitionLevels(repetitionLevelDecodingContext.getDLValuesDecoderContexts(), repetitionLevelDecodingContext.getRepetitionLevels().length);
+
+ int[] definitionLevels = definitionLevelDecodingContext.getDefinitionLevels();
+ int newBatchSize = 0;
+ for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
+ int valueCount = 0;
+ for (int i = valuesDecoderContext.getStart(); i < valuesDecoderContext.getEnd(); i++) {
+ valueCount += (definitionLevels[i] == maxDefinitionLevel ? 1 : 0);
+ }
+ newBatchSize += valueCount;
+ valuesDecoderContext.setNonNullCount(valueCount);
+ valuesDecoderContext.setValueCount(valueCount);
+ }
+
+ byte[] values = new byte[newBatchSize];
+ int offset = 0;
+ for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
+ ((BooleanValuesDecoder) valuesDecoderContext.getValuesDecoder()).readNext(values, offset, valuesDecoderContext.getNonNullCount());
+ offset += valuesDecoderContext.getValueCount();
+ }
+
+ Block block = new ByteArrayBlock(newBatchSize, Optional.empty(), values);
+ return new ColumnChunk(block, definitionLevels, repetitionLevelDecodingContext.getRepetitionLevels());
+ }
+
+ @Override
+ protected void seek()
+ throws IOException
+ {
+ if (readOffset == 0) {
+ return;
+ }
+ int maxDefinitionLevel = columnDescriptor.getMaxDefinitionLevel();
+ RepetitionLevelDecodingContext repetitionLevelDecodingContext = readRepetitionLevels(readOffset);
+ DefinitionLevelDecodingContext definitionLevelDecodingContext = readDefinitionLevels(repetitionLevelDecodingContext.getDLValuesDecoderContexts(), repetitionLevelDecodingContext.getRepetitionLevels().length);
+
+ int[] definitionLevels = definitionLevelDecodingContext.getDefinitionLevels();
+ for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
+ int valueCount = 0;
+ for (int i = valuesDecoderContext.getStart(); i < valuesDecoderContext.getEnd(); i++) {
+ valueCount += (definitionLevels[i] == maxDefinitionLevel ? 1 : 0);
+ }
+ BooleanValuesDecoder intValuesDecoder = (BooleanValuesDecoder) valuesDecoderContext.getValuesDecoder();
+ intValuesDecoder.skip(valueCount);
+ }
+ }
+}
diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64FlatBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64FlatBatchReader.java
new file mode 100644
index 0000000000000..2c97056ddda94
--- /dev/null
+++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64FlatBatchReader.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.parquet.batchreader;
+
+import com.facebook.presto.common.block.Block;
+import com.facebook.presto.common.block.LongArrayBlock;
+import com.facebook.presto.common.block.RunLengthEncodedBlock;
+import com.facebook.presto.parquet.ColumnReader;
+import com.facebook.presto.parquet.DataPage;
+import com.facebook.presto.parquet.DictionaryPage;
+import com.facebook.presto.parquet.Field;
+import com.facebook.presto.parquet.RichColumnDescriptor;
+import com.facebook.presto.parquet.batchreader.decoders.Decoders.FlatDecoders;
+import com.facebook.presto.parquet.batchreader.decoders.FlatDefinitionLevelDecoder;
+import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.Int64ValuesDecoder;
+import com.facebook.presto.parquet.batchreader.dictionary.Dictionaries;
+import com.facebook.presto.parquet.dictionary.Dictionary;
+import com.facebook.presto.parquet.reader.ColumnChunk;
+import com.facebook.presto.parquet.reader.PageReader;
+import com.facebook.presto.spi.PrestoException;
+import org.apache.parquet.internal.filter2.columnindex.RowRanges;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.openjdk.jol.info.ClassLayout;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static com.facebook.presto.parquet.ParquetErrorCode.PARQUET_IO_READ_ERROR;
+import static com.facebook.presto.parquet.batchreader.decoders.Decoders.readFlatPage;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+public class Int64FlatBatchReader
+ implements ColumnReader
+{
+ private static final int INSTANCE_SIZE = ClassLayout.parseClass(Int64FlatBatchReader.class).instanceSize();
+
+ private final RichColumnDescriptor columnDescriptor;
+
+ protected Field field;
+ protected int nextBatchSize;
+ protected FlatDefinitionLevelDecoder definitionLevelDecoder;
+ protected Int64ValuesDecoder valuesDecoder;
+ protected int remainingCountInPage;
+
+ private Dictionary dictionary;
+ private int readOffset;
+ private PageReader pageReader;
+
+ public Int64FlatBatchReader(RichColumnDescriptor columnDescriptor)
+ {
+ this.columnDescriptor = requireNonNull(columnDescriptor, "columnDescriptor is null");
+ }
+
+ @Override
+ public boolean isInitialized()
+ {
+ return pageReader != null && field != null;
+ }
+
+ @Override
+ public void init(PageReader pageReader, Field field, RowRanges rowRanges)
+ {
+ checkArgument(!isInitialized(), "Parquet batch reader already initialized");
+ this.pageReader = requireNonNull(pageReader, "pageReader is null");
+ checkArgument(pageReader.getValueCountInColumnChunk() > 0, "page is empty");
+ this.field = requireNonNull(field, "field is null");
+
+ DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+ if (dictionaryPage != null) {
+ dictionary = Dictionaries.createDictionary(columnDescriptor, dictionaryPage);
+ }
+ }
+
+ @Override
+ public void prepareNextRead(int batchSize)
+ {
+ readOffset = readOffset + nextBatchSize;
+ nextBatchSize = batchSize;
+ }
+
+ @Override
+ public ColumnChunk readNext()
+ {
+ ColumnChunk columnChunk = null;
+ try {
+ seek();
+ if (field.isRequired()) {
+ columnChunk = readWithoutNull();
+ }
+ else {
+ columnChunk = readWithNull();
+ }
+ }
+ catch (IOException exception) {
+ throw new PrestoException(PARQUET_IO_READ_ERROR, "Error reading Parquet column " + columnDescriptor, exception);
+ }
+
+ readOffset = 0;
+ nextBatchSize = 0;
+ return columnChunk;
+ }
+
+ @Override
+ public long getRetainedSizeInBytes()
+ {
+ return INSTANCE_SIZE +
+ (definitionLevelDecoder == null ? 0 : definitionLevelDecoder.getRetainedSizeInBytes()) +
+ (valuesDecoder == null ? 0 : valuesDecoder.getRetainedSizeInBytes()) +
+ (dictionary == null ? 0 : dictionary.getRetainedSizeInBytes()) +
+ (pageReader == null ? 0 : pageReader.getRetainedSizeInBytes());
+ }
+
+ protected boolean readNextPage()
+ {
+ definitionLevelDecoder = null;
+ valuesDecoder = null;
+ remainingCountInPage = 0;
+
+ DataPage page = pageReader.readPage();
+ if (page == null) {
+ return false;
+ }
+
+ FlatDecoders flatDecoders = readFlatPage(page, columnDescriptor, dictionary);
+ definitionLevelDecoder = flatDecoders.getDefinitionLevelDecoder();
+ valuesDecoder = (Int64ValuesDecoder) flatDecoders.getValuesDecoder();
+
+ remainingCountInPage = page.getValueCount();
+ return true;
+ }
+
+ private ColumnChunk readWithNull()
+ throws IOException
+ {
+ long[] values = new long[nextBatchSize];
+ boolean[] isNull = new boolean[nextBatchSize];
+
+ int totalNonNullCount = 0;
+ int remainingInBatch = nextBatchSize;
+ int startOffset = 0;
+ while (remainingInBatch > 0) {
+ if (remainingCountInPage == 0) {
+ if (!readNextPage()) {
+ break;
+ }
+ }
+
+ int chunkSize = Math.min(remainingCountInPage, remainingInBatch);
+ int nonNullCount = definitionLevelDecoder.readNext(isNull, startOffset, chunkSize);
+ totalNonNullCount += nonNullCount;
+
+ if (nonNullCount > 0) {
+ valuesDecoder.readNext(values, startOffset, nonNullCount);
+
+ int valueDestinationIndex = startOffset + chunkSize - 1;
+ int valueSourceIndex = startOffset + nonNullCount - 1;
+
+ while (valueDestinationIndex >= startOffset) {
+ if (!isNull[valueDestinationIndex]) {
+ values[valueDestinationIndex] = values[valueSourceIndex];
+ valueSourceIndex--;
+ }
+ valueDestinationIndex--;
+ }
+ }
+
+ startOffset += chunkSize;
+ remainingInBatch -= chunkSize;
+ remainingCountInPage -= chunkSize;
+ }
+
+ if (remainingInBatch != 0) {
+ throw new ParquetDecodingException("Still remaining to be read in current batch.");
+ }
+
+ if (totalNonNullCount == 0) {
+ Block block = RunLengthEncodedBlock.create(field.getType(), null, nextBatchSize);
+ return new ColumnChunk(block, new int[0], new int[0]);
+ }
+
+ boolean hasNoNull = totalNonNullCount == nextBatchSize;
+ Block block = new LongArrayBlock(nextBatchSize, hasNoNull ? Optional.empty() : Optional.of(isNull), values);
+ return new ColumnChunk(block, new int[0], new int[0]);
+ }
+
+ private ColumnChunk readWithoutNull()
+ throws IOException
+ {
+ long[] values = new long[nextBatchSize];
+ int remainingInBatch = nextBatchSize;
+ int startOffset = 0;
+ while (remainingInBatch > 0) {
+ if (remainingCountInPage == 0) {
+ if (!readNextPage()) {
+ break;
+ }
+ }
+
+ int chunkSize = Math.min(remainingCountInPage, remainingInBatch);
+
+ valuesDecoder.readNext(values, startOffset, chunkSize);
+ startOffset += chunkSize;
+ remainingInBatch -= chunkSize;
+ remainingCountInPage -= chunkSize;
+ }
+
+ if (remainingInBatch != 0) {
+ throw new ParquetDecodingException(format("Corrupted Parquet file: extra %d values to be consumed when scanning current batch", remainingInBatch));
+ }
+
+ Block block = new LongArrayBlock(nextBatchSize, Optional.empty(), values);
+ return new ColumnChunk(block, new int[0], new int[0]);
+ }
+
+ private void seek()
+ throws IOException
+ {
+ if (readOffset == 0) {
+ return;
+ }
+
+ int remainingInBatch = readOffset;
+ int startOffset = 0;
+ while (remainingInBatch > 0) {
+ if (remainingCountInPage == 0) {
+ if (!readNextPage()) {
+ break;
+ }
+ }
+
+ int chunkSize = Math.min(remainingCountInPage, remainingInBatch);
+ int skipSize = chunkSize;
+ if (!columnDescriptor.isRequired()) {
+ boolean[] isNull = new boolean[readOffset];
+ int nonNullCount = definitionLevelDecoder.readNext(isNull, startOffset, chunkSize);
+ skipSize = nonNullCount;
+ startOffset += chunkSize;
+ }
+ valuesDecoder.skip(skipSize);
+ remainingInBatch -= chunkSize;
+ remainingCountInPage -= chunkSize;
+ }
+ }
+}
diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64NestedBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64NestedBatchReader.java
new file mode 100644
index 0000000000000..fea6186ad6126
--- /dev/null
+++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64NestedBatchReader.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.parquet.batchreader;
+
+import com.facebook.presto.common.block.Block;
+import com.facebook.presto.common.block.LongArrayBlock;
+import com.facebook.presto.common.block.RunLengthEncodedBlock;
+import com.facebook.presto.parquet.RichColumnDescriptor;
+import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.Int64ValuesDecoder;
+import com.facebook.presto.parquet.reader.ColumnChunk;
+
+import java.io.IOException;
+import java.util.Optional;
+
+public class Int64NestedBatchReader
+ extends AbstractNestedBatchReader
+{
+ public Int64NestedBatchReader(RichColumnDescriptor columnDescriptor)
+ {
+ super(columnDescriptor);
+ }
+
+ @Override
+ protected ColumnChunk readNestedWithNull()
+ throws IOException
+ {
+ int maxDefinitionLevel = columnDescriptor.getMaxDefinitionLevel();
+ RepetitionLevelDecodingContext repetitionLevelDecodingContext = readRepetitionLevels(nextBatchSize);
+ DefinitionLevelDecodingContext definitionLevelDecodingContext = readDefinitionLevels(repetitionLevelDecodingContext.getDLValuesDecoderContexts(), repetitionLevelDecodingContext.getRepetitionLevels().length);
+
+ int[] definitionLevels = definitionLevelDecodingContext.getDefinitionLevels();
+ int newBatchSize = 0;
+ int batchNonNullCount = 0;
+ for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
+ int nonNullCount = 0;
+ int valueCount = 0;
+ for (int i = valuesDecoderContext.getStart(); i < valuesDecoderContext.getEnd(); i++) {
+ nonNullCount += (definitionLevels[i] == maxDefinitionLevel ? 1 : 0);
+ valueCount += (definitionLevels[i] >= maxDefinitionLevel - 1 ? 1 : 0);
+ }
+ batchNonNullCount += nonNullCount;
+ newBatchSize += valueCount;
+ valuesDecoderContext.setNonNullCount(nonNullCount);
+ valuesDecoderContext.setValueCount(valueCount);
+ }
+
+ if (batchNonNullCount == 0) {
+ Block block = RunLengthEncodedBlock.create(field.getType(), null, newBatchSize);
+ return new ColumnChunk(block, definitionLevels, repetitionLevelDecodingContext.getRepetitionLevels());
+ }
+
+ long[] values = new long[newBatchSize];
+ boolean[] isNull = new boolean[newBatchSize];
+ int offset = 0;
+ for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
+ ((Int64ValuesDecoder) valuesDecoderContext.getValuesDecoder()).readNext(values, offset, valuesDecoderContext.getNonNullCount());
+
+ int valueDestinationIndex = offset + valuesDecoderContext.getValueCount() - 1;
+ int valueSourceIndex = offset + valuesDecoderContext.getNonNullCount() - 1;
+ int definitionLevelIndex = valuesDecoderContext.getEnd() - 1;
+
+ while (valueDestinationIndex >= offset) {
+ if (definitionLevels[definitionLevelIndex] == maxDefinitionLevel) {
+ values[valueDestinationIndex--] = values[valueSourceIndex--];
+ }
+ else if (definitionLevels[definitionLevelIndex] == maxDefinitionLevel - 1) {
+ values[valueDestinationIndex] = 0;
+ isNull[valueDestinationIndex] = true;
+ valueDestinationIndex--;
+ }
+ definitionLevelIndex--;
+ }
+ offset += valuesDecoderContext.getValueCount();
+ }
+
+ boolean hasNoNull = batchNonNullCount == newBatchSize;
+ Block block = new LongArrayBlock(newBatchSize, hasNoNull ? Optional.empty() : Optional.of(isNull), values);
+ return new ColumnChunk(block, definitionLevels, repetitionLevelDecodingContext.getRepetitionLevels());
+ }
+
+ @Override
+ protected ColumnChunk readNestedNoNull()
+ throws IOException
+ {
+ int maxDefinitionLevel = columnDescriptor.getMaxDefinitionLevel();
+ RepetitionLevelDecodingContext repetitionLevelDecodingContext = readRepetitionLevels(nextBatchSize);
+ DefinitionLevelDecodingContext definitionLevelDecodingContext = readDefinitionLevels(repetitionLevelDecodingContext.getDLValuesDecoderContexts(), repetitionLevelDecodingContext.getRepetitionLevels().length);
+
+ int[] definitionLevels = definitionLevelDecodingContext.getDefinitionLevels();
+ int newBatchSize = 0;
+ for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
+ int valueCount = 0;
+ for (int i = valuesDecoderContext.getStart(); i < valuesDecoderContext.getEnd(); i++) {
+ valueCount += (definitionLevels[i] == maxDefinitionLevel ? 1 : 0);
+ }
+ newBatchSize += valueCount;
+ valuesDecoderContext.setNonNullCount(valueCount);
+ valuesDecoderContext.setValueCount(valueCount);
+ }
+
+ long[] values = new long[newBatchSize];
+ int offset = 0;
+ for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
+ ((Int64ValuesDecoder) valuesDecoderContext.getValuesDecoder()).readNext(values, offset, valuesDecoderContext.getNonNullCount());
+ offset += valuesDecoderContext.getValueCount();
+ }
+
+ Block block = new LongArrayBlock(newBatchSize, Optional.empty(), values);
+ return new ColumnChunk(block, definitionLevels, repetitionLevelDecodingContext.getRepetitionLevels());
+ }
+
+ @Override
+ protected void seek()
+ throws IOException
+ {
+ if (readOffset == 0) {
+ return;
+ }
+ int maxDefinitionLevel = columnDescriptor.getMaxDefinitionLevel();
+ RepetitionLevelDecodingContext repetitionLevelDecodingContext = readRepetitionLevels(readOffset);
+ DefinitionLevelDecodingContext definitionLevelDecodingContext = readDefinitionLevels(repetitionLevelDecodingContext.getDLValuesDecoderContexts(), repetitionLevelDecodingContext.getRepetitionLevels().length);
+
+ int[] definitionLevels = definitionLevelDecodingContext.getDefinitionLevels();
+ for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
+ int valueCount = 0;
+ for (int i = valuesDecoderContext.getStart(); i < valuesDecoderContext.getEnd(); i++) {
+ valueCount += (definitionLevels[i] == maxDefinitionLevel ? 1 : 0);
+ }
+ Int64ValuesDecoder intValuesDecoder = (Int64ValuesDecoder) valuesDecoderContext.getValuesDecoder();
+ intValuesDecoder.skip(valueCount);
+ }
+ }
+}
diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64TimeAndTimestampMicrosFlatBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64TimeAndTimestampMicrosFlatBatchReader.java
new file mode 100644
index 0000000000000..758df93fa44f8
--- /dev/null
+++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64TimeAndTimestampMicrosFlatBatchReader.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.parquet.batchreader;
+
+import com.facebook.presto.common.block.Block;
+import com.facebook.presto.common.block.LongArrayBlock;
+import com.facebook.presto.common.block.RunLengthEncodedBlock;
+import com.facebook.presto.parquet.ColumnReader;
+import com.facebook.presto.parquet.DataPage;
+import com.facebook.presto.parquet.DictionaryPage;
+import com.facebook.presto.parquet.Field;
+import com.facebook.presto.parquet.RichColumnDescriptor;
+import com.facebook.presto.parquet.batchreader.decoders.Decoders.FlatDecoders;
+import com.facebook.presto.parquet.batchreader.decoders.FlatDefinitionLevelDecoder;
+import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.Int64TimeAndTimestampMicrosValuesDecoder;
+import com.facebook.presto.parquet.batchreader.dictionary.Dictionaries;
+import com.facebook.presto.parquet.dictionary.Dictionary;
+import com.facebook.presto.parquet.reader.ColumnChunk;
+import com.facebook.presto.parquet.reader.PageReader;
+import com.facebook.presto.spi.PrestoException;
+import org.apache.parquet.internal.filter2.columnindex.RowRanges;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.openjdk.jol.info.ClassLayout;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static com.facebook.presto.parquet.ParquetErrorCode.PARQUET_IO_READ_ERROR;
+import static com.facebook.presto.parquet.batchreader.decoders.Decoders.readFlatPage;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+public class Int64TimeAndTimestampMicrosFlatBatchReader
+ implements ColumnReader
+{
+ private static final int INSTANCE_SIZE = ClassLayout.parseClass(Int64TimeAndTimestampMicrosFlatBatchReader.class).instanceSize();
+
+ private final RichColumnDescriptor columnDescriptor;
+
+ protected Field field;
+ protected int nextBatchSize;
+ protected FlatDefinitionLevelDecoder definitionLevelDecoder;
+ protected Int64TimeAndTimestampMicrosValuesDecoder valuesDecoder;
+ protected int remainingCountInPage;
+
+ private Dictionary dictionary;
+ private int readOffset;
+ private PageReader pageReader;
+
+ public Int64TimeAndTimestampMicrosFlatBatchReader(RichColumnDescriptor columnDescriptor)
+ {
+ this.columnDescriptor = requireNonNull(columnDescriptor, "columnDescriptor is null");
+ }
+
+ @Override
+ public boolean isInitialized()
+ {
+ return pageReader != null && field != null;
+ }
+
+ @Override
+ public void init(PageReader pageReader, Field field, RowRanges rowRanges)
+ {
+ checkArgument(!isInitialized(), "Parquet batch reader already initialized");
+ this.pageReader = requireNonNull(pageReader, "pageReader is null");
+ checkArgument(pageReader.getValueCountInColumnChunk() > 0, "page is empty");
+ this.field = requireNonNull(field, "field is null");
+
+ DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+ if (dictionaryPage != null) {
+ dictionary = Dictionaries.createDictionary(columnDescriptor, dictionaryPage);
+ }
+ }
+
+ @Override
+ public void prepareNextRead(int batchSize)
+ {
+ readOffset = readOffset + nextBatchSize;
+ nextBatchSize = batchSize;
+ }
+
+ @Override
+ public ColumnChunk readNext()
+ {
+ ColumnChunk columnChunk = null;
+ try {
+ seek();
+ if (field.isRequired()) {
+ columnChunk = readWithoutNull();
+ }
+ else {
+ columnChunk = readWithNull();
+ }
+ }
+ catch (IOException exception) {
+ throw new PrestoException(PARQUET_IO_READ_ERROR, "Error reading Parquet column " + columnDescriptor, exception);
+ }
+
+ readOffset = 0;
+ nextBatchSize = 0;
+ return columnChunk;
+ }
+
+ @Override
+ public long getRetainedSizeInBytes()
+ {
+ return INSTANCE_SIZE +
+ (definitionLevelDecoder == null ? 0 : definitionLevelDecoder.getRetainedSizeInBytes()) +
+ (valuesDecoder == null ? 0 : valuesDecoder.getRetainedSizeInBytes()) +
+ (dictionary == null ? 0 : dictionary.getRetainedSizeInBytes()) +
+ (pageReader == null ? 0 : pageReader.getRetainedSizeInBytes());
+ }
+
+ protected boolean readNextPage()
+ {
+ definitionLevelDecoder = null;
+ valuesDecoder = null;
+ remainingCountInPage = 0;
+
+ DataPage page = pageReader.readPage();
+ if (page == null) {
+ return false;
+ }
+
+ FlatDecoders flatDecoders = readFlatPage(page, columnDescriptor, dictionary);
+ definitionLevelDecoder = flatDecoders.getDefinitionLevelDecoder();
+ valuesDecoder = (Int64TimeAndTimestampMicrosValuesDecoder) flatDecoders.getValuesDecoder();
+
+ remainingCountInPage = page.getValueCount();
+ return true;
+ }
+
+ private ColumnChunk readWithNull()
+ throws IOException
+ {
+ long[] values = new long[nextBatchSize];
+ boolean[] isNull = new boolean[nextBatchSize];
+
+ int totalNonNullCount = 0;
+ int remainingInBatch = nextBatchSize;
+ int startOffset = 0;
+ while (remainingInBatch > 0) {
+ if (remainingCountInPage == 0) {
+ if (!readNextPage()) {
+ break;
+ }
+ }
+
+ int chunkSize = Math.min(remainingCountInPage, remainingInBatch);
+ int nonNullCount = definitionLevelDecoder.readNext(isNull, startOffset, chunkSize);
+ totalNonNullCount += nonNullCount;
+
+ if (nonNullCount > 0) {
+ valuesDecoder.readNext(values, startOffset, nonNullCount);
+
+ int valueDestinationIndex = startOffset + chunkSize - 1;
+ int valueSourceIndex = startOffset + nonNullCount - 1;
+
+ while (valueDestinationIndex >= startOffset) {
+ if (!isNull[valueDestinationIndex]) {
+ values[valueDestinationIndex] = values[valueSourceIndex];
+ valueSourceIndex--;
+ }
+ valueDestinationIndex--;
+ }
+ }
+
+ startOffset += chunkSize;
+ remainingInBatch -= chunkSize;
+ remainingCountInPage -= chunkSize;
+ }
+
+ if (remainingInBatch != 0) {
+ throw new ParquetDecodingException("Still remaining to be read in current batch.");
+ }
+
+ if (totalNonNullCount == 0) {
+ Block block = RunLengthEncodedBlock.create(field.getType(), null, nextBatchSize);
+ return new ColumnChunk(block, new int[0], new int[0]);
+ }
+
+ boolean hasNoNull = totalNonNullCount == nextBatchSize;
+ Block block = new LongArrayBlock(nextBatchSize, hasNoNull ? Optional.empty() : Optional.of(isNull), values);
+ return new ColumnChunk(block, new int[0], new int[0]);
+ }
+
+ private ColumnChunk readWithoutNull()
+ throws IOException
+ {
+ long[] values = new long[nextBatchSize];
+ int remainingInBatch = nextBatchSize;
+ int startOffset = 0;
+ while (remainingInBatch > 0) {
+ if (remainingCountInPage == 0) {
+ if (!readNextPage()) {
+ break;
+ }
+ }
+
+ int chunkSize = Math.min(remainingCountInPage, remainingInBatch);
+
+ valuesDecoder.readNext(values, startOffset, chunkSize);
+ startOffset += chunkSize;
+ remainingInBatch -= chunkSize;
+ remainingCountInPage -= chunkSize;
+ }
+
+ if (remainingInBatch != 0) {
+ throw new ParquetDecodingException(format("Corrupted Parquet file: extra %d values to be consumed when scanning current batch", remainingInBatch));
+ }
+
+ Block block = new LongArrayBlock(nextBatchSize, Optional.empty(), values);
+ return new ColumnChunk(block, new int[0], new int[0]);
+ }
+
+ private void seek()
+ throws IOException
+ {
+ if (readOffset == 0) {
+ return;
+ }
+
+ int remainingInBatch = readOffset;
+ int startOffset = 0;
+ while (remainingInBatch > 0) {
+ if (remainingCountInPage == 0) {
+ if (!readNextPage()) {
+ break;
+ }
+ }
+
+ int chunkSize = Math.min(remainingCountInPage, remainingInBatch);
+ int skipSize = chunkSize;
+ if (!columnDescriptor.isRequired()) {
+ boolean[] isNull = new boolean[readOffset];
+ int nonNullCount = definitionLevelDecoder.readNext(isNull, startOffset, chunkSize);
+ skipSize = nonNullCount;
+ startOffset += chunkSize;
+ }
+ valuesDecoder.skip(skipSize);
+ remainingInBatch -= chunkSize;
+ remainingCountInPage -= chunkSize;
+ }
+ }
+}
diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64TimeAndTimestampMicrosNestedBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64TimeAndTimestampMicrosNestedBatchReader.java
new file mode 100644
index 0000000000000..c9a96ef85b7d2
--- /dev/null
+++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/Int64TimeAndTimestampMicrosNestedBatchReader.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.parquet.batchreader;
+
+import com.facebook.presto.common.block.Block;
+import com.facebook.presto.common.block.LongArrayBlock;
+import com.facebook.presto.common.block.RunLengthEncodedBlock;
+import com.facebook.presto.parquet.RichColumnDescriptor;
+import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.Int64TimeAndTimestampMicrosValuesDecoder;
+import com.facebook.presto.parquet.reader.ColumnChunk;
+
+import java.io.IOException;
+import java.util.Optional;
+
+public class Int64TimeAndTimestampMicrosNestedBatchReader
+ extends AbstractNestedBatchReader
+{
+ public Int64TimeAndTimestampMicrosNestedBatchReader(RichColumnDescriptor columnDescriptor)
+ {
+ super(columnDescriptor);
+ }
+
+ @Override
+ protected ColumnChunk readNestedWithNull()
+ throws IOException
+ {
+ int maxDefinitionLevel = columnDescriptor.getMaxDefinitionLevel();
+ RepetitionLevelDecodingContext repetitionLevelDecodingContext = readRepetitionLevels(nextBatchSize);
+ DefinitionLevelDecodingContext definitionLevelDecodingContext = readDefinitionLevels(repetitionLevelDecodingContext.getDLValuesDecoderContexts(), repetitionLevelDecodingContext.getRepetitionLevels().length);
+
+ int[] definitionLevels = definitionLevelDecodingContext.getDefinitionLevels();
+ int newBatchSize = 0;
+ int batchNonNullCount = 0;
+ for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
+ int nonNullCount = 0;
+ int valueCount = 0;
+ for (int i = valuesDecoderContext.getStart(); i < valuesDecoderContext.getEnd(); i++) {
+ nonNullCount += (definitionLevels[i] == maxDefinitionLevel ? 1 : 0);
+ valueCount += (definitionLevels[i] >= maxDefinitionLevel - 1 ? 1 : 0);
+ }
+ batchNonNullCount += nonNullCount;
+ newBatchSize += valueCount;
+ valuesDecoderContext.setNonNullCount(nonNullCount);
+ valuesDecoderContext.setValueCount(valueCount);
+ }
+
+ if (batchNonNullCount == 0) {
+ Block block = RunLengthEncodedBlock.create(field.getType(), null, newBatchSize);
+ return new ColumnChunk(block, definitionLevels, repetitionLevelDecodingContext.getRepetitionLevels());
+ }
+
+ long[] values = new long[newBatchSize];
+ boolean[] isNull = new boolean[newBatchSize];
+ int offset = 0;
+ for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
+ ((Int64TimeAndTimestampMicrosValuesDecoder) valuesDecoderContext.getValuesDecoder()).readNext(values, offset, valuesDecoderContext.getNonNullCount());
+
+ int valueDestinationIndex = offset + valuesDecoderContext.getValueCount() - 1;
+ int valueSourceIndex = offset + valuesDecoderContext.getNonNullCount() - 1;
+ int definitionLevelIndex = valuesDecoderContext.getEnd() - 1;
+
+ while (valueDestinationIndex >= offset) {
+ if (definitionLevels[definitionLevelIndex] == maxDefinitionLevel) {
+ values[valueDestinationIndex--] = values[valueSourceIndex--];
+ }
+ else if (definitionLevels[definitionLevelIndex] == maxDefinitionLevel - 1) {
+ values[valueDestinationIndex] = 0;
+ isNull[valueDestinationIndex] = true;
+ valueDestinationIndex--;
+ }
+ definitionLevelIndex--;
+ }
+ offset += valuesDecoderContext.getValueCount();
+ }
+
+ boolean hasNoNull = batchNonNullCount == newBatchSize;
+ Block block = new LongArrayBlock(newBatchSize, hasNoNull ? Optional.empty() : Optional.of(isNull), values);
+ return new ColumnChunk(block, definitionLevels, repetitionLevelDecodingContext.getRepetitionLevels());
+ }
+
+ @Override
+ protected ColumnChunk readNestedNoNull()
+ throws IOException
+ {
+ int maxDefinitionLevel = columnDescriptor.getMaxDefinitionLevel();
+ RepetitionLevelDecodingContext repetitionLevelDecodingContext = readRepetitionLevels(nextBatchSize);
+ DefinitionLevelDecodingContext definitionLevelDecodingContext = readDefinitionLevels(repetitionLevelDecodingContext.getDLValuesDecoderContexts(), repetitionLevelDecodingContext.getRepetitionLevels().length);
+
+ int[] definitionLevels = definitionLevelDecodingContext.getDefinitionLevels();
+ int newBatchSize = 0;
+ for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
+ int valueCount = 0;
+ for (int i = valuesDecoderContext.getStart(); i < valuesDecoderContext.getEnd(); i++) {
+ valueCount += (definitionLevels[i] == maxDefinitionLevel ? 1 : 0);
+ }
+ newBatchSize += valueCount;
+ valuesDecoderContext.setNonNullCount(valueCount);
+ valuesDecoderContext.setValueCount(valueCount);
+ }
+
+ long[] values = new long[newBatchSize];
+ int offset = 0;
+ for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
+ ((Int64TimeAndTimestampMicrosValuesDecoder) valuesDecoderContext.getValuesDecoder()).readNext(values, offset, valuesDecoderContext.getNonNullCount());
+ offset += valuesDecoderContext.getValueCount();
+ }
+
+ Block block = new LongArrayBlock(newBatchSize, Optional.empty(), values);
+ return new ColumnChunk(block, definitionLevels, repetitionLevelDecodingContext.getRepetitionLevels());
+ }
+
+ @Override
+ protected void seek()
+ throws IOException
+ {
+ if (readOffset == 0) {
+ return;
+ }
+ int maxDefinitionLevel = columnDescriptor.getMaxDefinitionLevel();
+ RepetitionLevelDecodingContext repetitionLevelDecodingContext = readRepetitionLevels(readOffset);
+ DefinitionLevelDecodingContext definitionLevelDecodingContext = readDefinitionLevels(repetitionLevelDecodingContext.getDLValuesDecoderContexts(), repetitionLevelDecodingContext.getRepetitionLevels().length);
+
+ int[] definitionLevels = definitionLevelDecodingContext.getDefinitionLevels();
+ for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
+ int valueCount = 0;
+ for (int i = valuesDecoderContext.getStart(); i < valuesDecoderContext.getEnd(); i++) {
+ valueCount += (definitionLevels[i] == maxDefinitionLevel ? 1 : 0);
+ }
+ Int64TimeAndTimestampMicrosValuesDecoder intValuesDecoder = (Int64TimeAndTimestampMicrosValuesDecoder) valuesDecoderContext.getValuesDecoder();
+ intValuesDecoder.skip(valueCount);
+ }
+ }
+}
diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/LongDecimalFlatBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/LongDecimalFlatBatchReader.java
new file mode 100644
index 0000000000000..478dee49e0e49
--- /dev/null
+++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/LongDecimalFlatBatchReader.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.parquet.batchreader;
+
+import com.facebook.presto.common.block.Block;
+import com.facebook.presto.common.block.Int128ArrayBlock;
+import com.facebook.presto.common.block.RunLengthEncodedBlock;
+import com.facebook.presto.parquet.ColumnReader;
+import com.facebook.presto.parquet.DataPage;
+import com.facebook.presto.parquet.DictionaryPage;
+import com.facebook.presto.parquet.Field;
+import com.facebook.presto.parquet.RichColumnDescriptor;
+import com.facebook.presto.parquet.batchreader.decoders.Decoders.FlatDecoders;
+import com.facebook.presto.parquet.batchreader.decoders.FlatDefinitionLevelDecoder;
+import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.LongDecimalValuesDecoder;
+import com.facebook.presto.parquet.batchreader.dictionary.Dictionaries;
+import com.facebook.presto.parquet.dictionary.Dictionary;
+import com.facebook.presto.parquet.reader.ColumnChunk;
+import com.facebook.presto.parquet.reader.PageReader;
+import com.facebook.presto.spi.PrestoException;
+import org.apache.parquet.internal.filter2.columnindex.RowRanges;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.openjdk.jol.info.ClassLayout;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static com.facebook.presto.parquet.ParquetErrorCode.PARQUET_IO_READ_ERROR;
+import static com.facebook.presto.parquet.batchreader.decoders.Decoders.readFlatPage;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+public class LongDecimalFlatBatchReader
+ implements ColumnReader
+{
+ private static final int INSTANCE_SIZE = ClassLayout.parseClass(LongDecimalFlatBatchReader.class).instanceSize();
+
+ private final RichColumnDescriptor columnDescriptor;
+
+ protected Field field;
+ protected int nextBatchSize;
+ protected FlatDefinitionLevelDecoder definitionLevelDecoder;
+ protected LongDecimalValuesDecoder valuesDecoder;
+ protected int remainingCountInPage;
+
+ private Dictionary dictionary;
+ private int readOffset;
+ private PageReader pageReader;
+
+ public LongDecimalFlatBatchReader(RichColumnDescriptor columnDescriptor)
+ {
+ this.columnDescriptor = requireNonNull(columnDescriptor, "columnDescriptor is null");
+ }
+
+ @Override
+ public boolean isInitialized()
+ {
+ return pageReader != null && field != null;
+ }
+
+ @Override
+ public void init(PageReader pageReader, Field field, RowRanges rowRanges)
+ {
+ checkArgument(!isInitialized(), "Parquet batch reader already initialized");
+ this.pageReader = requireNonNull(pageReader, "pageReader is null");
+ checkArgument(pageReader.getValueCountInColumnChunk() > 0, "page is empty");
+ this.field = requireNonNull(field, "field is null");
+
+ DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+ if (dictionaryPage != null) {
+ dictionary = Dictionaries.createDictionary(columnDescriptor, dictionaryPage);
+ }
+ }
+
+ @Override
+ public void prepareNextRead(int batchSize)
+ {
+ readOffset = readOffset + nextBatchSize;
+ nextBatchSize = batchSize;
+ }
+
+ @Override
+ public ColumnChunk readNext()
+ {
+ ColumnChunk columnChunk = null;
+ try {
+ seek();
+ if (field.isRequired()) {
+ columnChunk = readWithoutNull();
+ }
+ else {
+ columnChunk = readWithNull();
+ }
+ }
+ catch (IOException exception) {
+ throw new PrestoException(PARQUET_IO_READ_ERROR, "Error reading Parquet column " + columnDescriptor, exception);
+ }
+
+ readOffset = 0;
+ nextBatchSize = 0;
+ return columnChunk;
+ }
+
+ @Override
+ public long getRetainedSizeInBytes()
+ {
+ return INSTANCE_SIZE +
+ (definitionLevelDecoder == null ? 0 : definitionLevelDecoder.getRetainedSizeInBytes()) +
+ (valuesDecoder == null ? 0 : valuesDecoder.getRetainedSizeInBytes()) +
+ (dictionary == null ? 0 : dictionary.getRetainedSizeInBytes()) +
+ (pageReader == null ? 0 : pageReader.getRetainedSizeInBytes());
+ }
+
+ protected boolean readNextPage()
+ {
+ definitionLevelDecoder = null;
+ valuesDecoder = null;
+ remainingCountInPage = 0;
+
+ DataPage page = pageReader.readPage();
+ if (page == null) {
+ return false;
+ }
+
+ FlatDecoders flatDecoders = readFlatPage(page, columnDescriptor, dictionary);
+ definitionLevelDecoder = flatDecoders.getDefinitionLevelDecoder();
+ valuesDecoder = (LongDecimalValuesDecoder) flatDecoders.getValuesDecoder();
+
+ remainingCountInPage = page.getValueCount();
+ return true;
+ }
+
+ private ColumnChunk readWithNull()
+ throws IOException
+ {
+ long[] values = new long[nextBatchSize * 2];
+ boolean[] isNull = new boolean[nextBatchSize];
+
+ int totalNonNullCount = 0;
+ int remainingInBatch = nextBatchSize;
+ int startOffset = 0;
+ while (remainingInBatch > 0) {
+ if (remainingCountInPage == 0) {
+ if (!readNextPage()) {
+ break;
+ }
+ }
+
+ int chunkSize = Math.min(remainingCountInPage, remainingInBatch);
+ int nonNullCount = definitionLevelDecoder.readNext(isNull, startOffset, chunkSize);
+ totalNonNullCount += nonNullCount;
+
+ if (nonNullCount > 0) {
+ valuesDecoder.readNext(values, startOffset, nonNullCount);
+
+ int valueDestinationIndex = startOffset + chunkSize - 1;
+ int valueSourceIndex = startOffset + nonNullCount - 1;
+
+ while (valueDestinationIndex >= startOffset) {
+ if (!isNull[valueDestinationIndex]) {
+ values[valueDestinationIndex * 2 + 1] = values[valueSourceIndex * 2 + 1];
+ values[valueDestinationIndex * 2] = values[valueSourceIndex * 2];
+ valueSourceIndex--;
+ }
+ valueDestinationIndex--;
+ }
+ }
+
+ startOffset += chunkSize;
+ remainingInBatch -= chunkSize;
+ remainingCountInPage -= chunkSize;
+ }
+
+ if (remainingInBatch != 0) {
+ throw new ParquetDecodingException("Still remaining to be read in current batch.");
+ }
+
+ if (totalNonNullCount == 0) {
+ Block block = RunLengthEncodedBlock.create(field.getType(), null, nextBatchSize);
+ return new ColumnChunk(block, new int[0], new int[0]);
+ }
+
+ boolean hasNoNull = totalNonNullCount == nextBatchSize;
+ Block block = new Int128ArrayBlock(nextBatchSize, hasNoNull ? Optional.empty() : Optional.of(isNull), values);
+ return new ColumnChunk(block, new int[0], new int[0]);
+ }
+
+ private ColumnChunk readWithoutNull()
+ throws IOException
+ {
+ long[] values = new long[nextBatchSize * 2];
+ int remainingInBatch = nextBatchSize;
+ int startOffset = 0;
+ while (remainingInBatch > 0) {
+ if (remainingCountInPage == 0) {
+ if (!readNextPage()) {
+ break;
+ }
+ }
+
+ int chunkSize = Math.min(remainingCountInPage, remainingInBatch);
+
+ valuesDecoder.readNext(values, startOffset, chunkSize);
+ startOffset += chunkSize;
+ remainingInBatch -= chunkSize;
+ remainingCountInPage -= chunkSize;
+ }
+
+ if (remainingInBatch != 0) {
+ throw new ParquetDecodingException(format("Corrupted Parquet file: extra %d values to be consumed when scanning current batch", remainingInBatch));
+ }
+
+ Block block = new Int128ArrayBlock(nextBatchSize, Optional.empty(), values);
+ return new ColumnChunk(block, new int[0], new int[0]);
+ }
+
+ private void seek()
+ throws IOException
+ {
+ if (readOffset == 0) {
+ return;
+ }
+
+ int remainingInBatch = readOffset;
+ int startOffset = 0;
+ while (remainingInBatch > 0) {
+ if (remainingCountInPage == 0) {
+ if (!readNextPage()) {
+ break;
+ }
+ }
+
+ int chunkSize = Math.min(remainingCountInPage, remainingInBatch);
+ int skipSize = chunkSize;
+ if (!columnDescriptor.isRequired()) {
+ boolean[] isNull = new boolean[readOffset];
+ int nonNullCount = definitionLevelDecoder.readNext(isNull, startOffset, chunkSize);
+ skipSize = nonNullCount;
+ startOffset += chunkSize;
+ }
+ valuesDecoder.skip(skipSize);
+ remainingInBatch -= chunkSize;
+ remainingCountInPage -= chunkSize;
+ }
+ }
+}
diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/ShortDecimalFlatBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/ShortDecimalFlatBatchReader.java
new file mode 100644
index 0000000000000..db3dff192a240
--- /dev/null
+++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/ShortDecimalFlatBatchReader.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.parquet.batchreader;
+
+import com.facebook.presto.common.block.Block;
+import com.facebook.presto.common.block.LongArrayBlock;
+import com.facebook.presto.common.block.RunLengthEncodedBlock;
+import com.facebook.presto.parquet.ColumnReader;
+import com.facebook.presto.parquet.DataPage;
+import com.facebook.presto.parquet.DictionaryPage;
+import com.facebook.presto.parquet.Field;
+import com.facebook.presto.parquet.RichColumnDescriptor;
+import com.facebook.presto.parquet.batchreader.decoders.Decoders.FlatDecoders;
+import com.facebook.presto.parquet.batchreader.decoders.FlatDefinitionLevelDecoder;
+import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.ShortDecimalValuesDecoder;
+import com.facebook.presto.parquet.batchreader.dictionary.Dictionaries;
+import com.facebook.presto.parquet.dictionary.Dictionary;
+import com.facebook.presto.parquet.reader.ColumnChunk;
+import com.facebook.presto.parquet.reader.PageReader;
+import com.facebook.presto.spi.PrestoException;
+import org.apache.parquet.internal.filter2.columnindex.RowRanges;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.openjdk.jol.info.ClassLayout;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static com.facebook.presto.parquet.ParquetErrorCode.PARQUET_IO_READ_ERROR;
+import static com.facebook.presto.parquet.batchreader.decoders.Decoders.readFlatPage;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+public class ShortDecimalFlatBatchReader
+ implements ColumnReader
+{
+ private static final int INSTANCE_SIZE = ClassLayout.parseClass(ShortDecimalFlatBatchReader.class).instanceSize();
+
+ private final RichColumnDescriptor columnDescriptor;
+
+ protected Field field;
+ protected int nextBatchSize;
+ protected FlatDefinitionLevelDecoder definitionLevelDecoder;
+ protected ShortDecimalValuesDecoder valuesDecoder;
+ protected int remainingCountInPage;
+
+ private Dictionary dictionary;
+ private int readOffset;
+ private PageReader pageReader;
+
+ public ShortDecimalFlatBatchReader(RichColumnDescriptor columnDescriptor)
+ {
+ this.columnDescriptor = requireNonNull(columnDescriptor, "columnDescriptor is null");
+ }
+
+ @Override
+ public boolean isInitialized()
+ {
+ return pageReader != null && field != null;
+ }
+
+ @Override
+ public void init(PageReader pageReader, Field field, RowRanges rowRanges)
+ {
+ checkArgument(!isInitialized(), "Parquet batch reader already initialized");
+ this.pageReader = requireNonNull(pageReader, "pageReader is null");
+ checkArgument(pageReader.getValueCountInColumnChunk() > 0, "page is empty");
+ this.field = requireNonNull(field, "field is null");
+
+ DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+ if (dictionaryPage != null) {
+ dictionary = Dictionaries.createDictionary(columnDescriptor, dictionaryPage);
+ }
+ }
+
+ @Override
+ public void prepareNextRead(int batchSize)
+ {
+ readOffset = readOffset + nextBatchSize;
+ nextBatchSize = batchSize;
+ }
+
+ @Override
+ public ColumnChunk readNext()
+ {
+ ColumnChunk columnChunk = null;
+ try {
+ seek();
+ if (field.isRequired()) {
+ columnChunk = readWithoutNull();
+ }
+ else {
+ columnChunk = readWithNull();
+ }
+ }
+ catch (IOException exception) {
+ throw new PrestoException(PARQUET_IO_READ_ERROR, "Error reading Parquet column " + columnDescriptor, exception);
+ }
+
+ readOffset = 0;
+ nextBatchSize = 0;
+ return columnChunk;
+ }
+
+ @Override
+ public long getRetainedSizeInBytes()
+ {
+ return INSTANCE_SIZE +
+ (definitionLevelDecoder == null ? 0 : definitionLevelDecoder.getRetainedSizeInBytes()) +
+ (valuesDecoder == null ? 0 : valuesDecoder.getRetainedSizeInBytes()) +
+ (dictionary == null ? 0 : dictionary.getRetainedSizeInBytes()) +
+ (pageReader == null ? 0 : pageReader.getRetainedSizeInBytes());
+ }
+
+ protected boolean readNextPage()
+ {
+ definitionLevelDecoder = null;
+ valuesDecoder = null;
+ remainingCountInPage = 0;
+
+ DataPage page = pageReader.readPage();
+ if (page == null) {
+ return false;
+ }
+
+ FlatDecoders flatDecoders = readFlatPage(page, columnDescriptor, dictionary);
+ definitionLevelDecoder = flatDecoders.getDefinitionLevelDecoder();
+ valuesDecoder = (ShortDecimalValuesDecoder) flatDecoders.getValuesDecoder();
+
+ remainingCountInPage = page.getValueCount();
+ return true;
+ }
+
+ private ColumnChunk readWithNull()
+ throws IOException
+ {
+ long[] values = new long[nextBatchSize];
+ boolean[] isNull = new boolean[nextBatchSize];
+
+ int totalNonNullCount = 0;
+ int remainingInBatch = nextBatchSize;
+ int startOffset = 0;
+ while (remainingInBatch > 0) {
+ if (remainingCountInPage == 0) {
+ if (!readNextPage()) {
+ break;
+ }
+ }
+
+ int chunkSize = Math.min(remainingCountInPage, remainingInBatch);
+ int nonNullCount = definitionLevelDecoder.readNext(isNull, startOffset, chunkSize);
+ totalNonNullCount += nonNullCount;
+
+ if (nonNullCount > 0) {
+ valuesDecoder.readNext(values, startOffset, nonNullCount);
+
+ int valueDestinationIndex = startOffset + chunkSize - 1;
+ int valueSourceIndex = startOffset + nonNullCount - 1;
+
+ while (valueDestinationIndex >= startOffset) {
+ if (!isNull[valueDestinationIndex]) {
+ values[valueDestinationIndex] = values[valueSourceIndex];
+ valueSourceIndex--;
+ }
+ valueDestinationIndex--;
+ }
+ }
+
+ startOffset += chunkSize;
+ remainingInBatch -= chunkSize;
+ remainingCountInPage -= chunkSize;
+ }
+
+ if (remainingInBatch != 0) {
+ throw new ParquetDecodingException("Still remaining to be read in current batch.");
+ }
+
+ if (totalNonNullCount == 0) {
+ Block block = RunLengthEncodedBlock.create(field.getType(), null, nextBatchSize);
+ return new ColumnChunk(block, new int[0], new int[0]);
+ }
+
+ boolean hasNoNull = totalNonNullCount == nextBatchSize;
+ Block block = new LongArrayBlock(nextBatchSize, hasNoNull ? Optional.empty() : Optional.of(isNull), values);
+ return new ColumnChunk(block, new int[0], new int[0]);
+ }
+
+ private ColumnChunk readWithoutNull()
+ throws IOException
+ {
+ long[] values = new long[nextBatchSize];
+ int remainingInBatch = nextBatchSize;
+ int startOffset = 0;
+ while (remainingInBatch > 0) {
+ if (remainingCountInPage == 0) {
+ if (!readNextPage()) {
+ break;
+ }
+ }
+
+ int chunkSize = Math.min(remainingCountInPage, remainingInBatch);
+
+ valuesDecoder.readNext(values, startOffset, chunkSize);
+ startOffset += chunkSize;
+ remainingInBatch -= chunkSize;
+ remainingCountInPage -= chunkSize;
+ }
+
+ if (remainingInBatch != 0) {
+ throw new ParquetDecodingException(format("Corrupted Parquet file: extra %d values to be consumed when scanning current batch", remainingInBatch));
+ }
+
+ Block block = new LongArrayBlock(nextBatchSize, Optional.empty(), values);
+ return new ColumnChunk(block, new int[0], new int[0]);
+ }
+
+ private void seek()
+ throws IOException
+ {
+ if (readOffset == 0) {
+ return;
+ }
+
+ int remainingInBatch = readOffset;
+ int startOffset = 0;
+ while (remainingInBatch > 0) {
+ if (remainingCountInPage == 0) {
+ if (!readNextPage()) {
+ break;
+ }
+ }
+
+ int chunkSize = Math.min(remainingCountInPage, remainingInBatch);
+ int skipSize = chunkSize;
+ if (!columnDescriptor.isRequired()) {
+ boolean[] isNull = new boolean[readOffset];
+ int nonNullCount = definitionLevelDecoder.readNext(isNull, startOffset, chunkSize);
+ skipSize = nonNullCount;
+ startOffset += chunkSize;
+ }
+ valuesDecoder.skip(skipSize);
+ remainingInBatch -= chunkSize;
+ remainingCountInPage -= chunkSize;
+ }
+ }
+}
diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/TimestampFlatBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/TimestampFlatBatchReader.java
new file mode 100644
index 0000000000000..7ac7bef95043d
--- /dev/null
+++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/TimestampFlatBatchReader.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.parquet.batchreader;
+
+import com.facebook.presto.common.block.Block;
+import com.facebook.presto.common.block.LongArrayBlock;
+import com.facebook.presto.common.block.RunLengthEncodedBlock;
+import com.facebook.presto.parquet.ColumnReader;
+import com.facebook.presto.parquet.DataPage;
+import com.facebook.presto.parquet.DictionaryPage;
+import com.facebook.presto.parquet.Field;
+import com.facebook.presto.parquet.RichColumnDescriptor;
+import com.facebook.presto.parquet.batchreader.decoders.Decoders.FlatDecoders;
+import com.facebook.presto.parquet.batchreader.decoders.FlatDefinitionLevelDecoder;
+import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.TimestampValuesDecoder;
+import com.facebook.presto.parquet.batchreader.dictionary.Dictionaries;
+import com.facebook.presto.parquet.dictionary.Dictionary;
+import com.facebook.presto.parquet.reader.ColumnChunk;
+import com.facebook.presto.parquet.reader.PageReader;
+import com.facebook.presto.spi.PrestoException;
+import org.apache.parquet.internal.filter2.columnindex.RowRanges;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.openjdk.jol.info.ClassLayout;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static com.facebook.presto.parquet.ParquetErrorCode.PARQUET_IO_READ_ERROR;
+import static com.facebook.presto.parquet.batchreader.decoders.Decoders.readFlatPage;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+public class TimestampFlatBatchReader
+ implements ColumnReader
+{
+ private static final int INSTANCE_SIZE = ClassLayout.parseClass(TimestampFlatBatchReader.class).instanceSize();
+
+ private final RichColumnDescriptor columnDescriptor;
+
+ protected Field field;
+ protected int nextBatchSize;
+ protected FlatDefinitionLevelDecoder definitionLevelDecoder;
+ protected TimestampValuesDecoder valuesDecoder;
+ protected int remainingCountInPage;
+
+ private Dictionary dictionary;
+ private int readOffset;
+ private PageReader pageReader;
+
+ public TimestampFlatBatchReader(RichColumnDescriptor columnDescriptor)
+ {
+ this.columnDescriptor = requireNonNull(columnDescriptor, "columnDescriptor is null");
+ }
+
+ @Override
+ public boolean isInitialized()
+ {
+ return pageReader != null && field != null;
+ }
+
+ @Override
+ public void init(PageReader pageReader, Field field, RowRanges rowRanges)
+ {
+ checkArgument(!isInitialized(), "Parquet batch reader already initialized");
+ this.pageReader = requireNonNull(pageReader, "pageReader is null");
+ checkArgument(pageReader.getValueCountInColumnChunk() > 0, "page is empty");
+ this.field = requireNonNull(field, "field is null");
+
+ DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+ if (dictionaryPage != null) {
+ dictionary = Dictionaries.createDictionary(columnDescriptor, dictionaryPage);
+ }
+ }
+
+ @Override
+ public void prepareNextRead(int batchSize)
+ {
+ readOffset = readOffset + nextBatchSize;
+ nextBatchSize = batchSize;
+ }
+
+ @Override
+ public ColumnChunk readNext()
+ {
+ ColumnChunk columnChunk = null;
+ try {
+ seek();
+ if (field.isRequired()) {
+ columnChunk = readWithoutNull();
+ }
+ else {
+ columnChunk = readWithNull();
+ }
+ }
+ catch (IOException exception) {
+ throw new PrestoException(PARQUET_IO_READ_ERROR, "Error reading Parquet column " + columnDescriptor, exception);
+ }
+
+ readOffset = 0;
+ nextBatchSize = 0;
+ return columnChunk;
+ }
+
+ @Override
+ public long getRetainedSizeInBytes()
+ {
+ return INSTANCE_SIZE +
+ (definitionLevelDecoder == null ? 0 : definitionLevelDecoder.getRetainedSizeInBytes()) +
+ (valuesDecoder == null ? 0 : valuesDecoder.getRetainedSizeInBytes()) +
+ (dictionary == null ? 0 : dictionary.getRetainedSizeInBytes()) +
+ (pageReader == null ? 0 : pageReader.getRetainedSizeInBytes());
+ }
+
+ protected boolean readNextPage()
+ {
+ definitionLevelDecoder = null;
+ valuesDecoder = null;
+ remainingCountInPage = 0;
+
+ DataPage page = pageReader.readPage();
+ if (page == null) {
+ return false;
+ }
+
+ FlatDecoders flatDecoders = readFlatPage(page, columnDescriptor, dictionary);
+ definitionLevelDecoder = flatDecoders.getDefinitionLevelDecoder();
+ valuesDecoder = (TimestampValuesDecoder) flatDecoders.getValuesDecoder();
+
+ remainingCountInPage = page.getValueCount();
+ return true;
+ }
+
+ private ColumnChunk readWithNull()
+ throws IOException
+ {
+ long[] values = new long[nextBatchSize];
+ boolean[] isNull = new boolean[nextBatchSize];
+
+ int totalNonNullCount = 0;
+ int remainingInBatch = nextBatchSize;
+ int startOffset = 0;
+ while (remainingInBatch > 0) {
+ if (remainingCountInPage == 0) {
+ if (!readNextPage()) {
+ break;
+ }
+ }
+
+ int chunkSize = Math.min(remainingCountInPage, remainingInBatch);
+ int nonNullCount = definitionLevelDecoder.readNext(isNull, startOffset, chunkSize);
+ totalNonNullCount += nonNullCount;
+
+ if (nonNullCount > 0) {
+ valuesDecoder.readNext(values, startOffset, nonNullCount);
+
+ int valueDestinationIndex = startOffset + chunkSize - 1;
+ int valueSourceIndex = startOffset + nonNullCount - 1;
+
+ while (valueDestinationIndex >= startOffset) {
+ if (!isNull[valueDestinationIndex]) {
+ values[valueDestinationIndex] = values[valueSourceIndex];
+ valueSourceIndex--;
+ }
+ valueDestinationIndex--;
+ }
+ }
+
+ startOffset += chunkSize;
+ remainingInBatch -= chunkSize;
+ remainingCountInPage -= chunkSize;
+ }
+
+ if (remainingInBatch != 0) {
+ throw new ParquetDecodingException("Still remaining to be read in current batch.");
+ }
+
+ if (totalNonNullCount == 0) {
+ Block block = RunLengthEncodedBlock.create(field.getType(), null, nextBatchSize);
+ return new ColumnChunk(block, new int[0], new int[0]);
+ }
+
+ boolean hasNoNull = totalNonNullCount == nextBatchSize;
+ Block block = new LongArrayBlock(nextBatchSize, hasNoNull ? Optional.empty() : Optional.of(isNull), values);
+ return new ColumnChunk(block, new int[0], new int[0]);
+ }
+
+ private ColumnChunk readWithoutNull()
+ throws IOException
+ {
+ long[] values = new long[nextBatchSize];
+ int remainingInBatch = nextBatchSize;
+ int startOffset = 0;
+ while (remainingInBatch > 0) {
+ if (remainingCountInPage == 0) {
+ if (!readNextPage()) {
+ break;
+ }
+ }
+
+ int chunkSize = Math.min(remainingCountInPage, remainingInBatch);
+
+ valuesDecoder.readNext(values, startOffset, chunkSize);
+ startOffset += chunkSize;
+ remainingInBatch -= chunkSize;
+ remainingCountInPage -= chunkSize;
+ }
+
+ if (remainingInBatch != 0) {
+ throw new ParquetDecodingException(format("Corrupted Parquet file: extra %d values to be consumed when scanning current batch", remainingInBatch));
+ }
+
+ Block block = new LongArrayBlock(nextBatchSize, Optional.empty(), values);
+ return new ColumnChunk(block, new int[0], new int[0]);
+ }
+
+ private void seek()
+ throws IOException
+ {
+ if (readOffset == 0) {
+ return;
+ }
+
+ int remainingInBatch = readOffset;
+ int startOffset = 0;
+ while (remainingInBatch > 0) {
+ if (remainingCountInPage == 0) {
+ if (!readNextPage()) {
+ break;
+ }
+ }
+
+ int chunkSize = Math.min(remainingCountInPage, remainingInBatch);
+ int skipSize = chunkSize;
+ if (!columnDescriptor.isRequired()) {
+ boolean[] isNull = new boolean[readOffset];
+ int nonNullCount = definitionLevelDecoder.readNext(isNull, startOffset, chunkSize);
+ skipSize = nonNullCount;
+ startOffset += chunkSize;
+ }
+ valuesDecoder.skip(skipSize);
+ remainingInBatch -= chunkSize;
+ remainingCountInPage -= chunkSize;
+ }
+ }
+}
diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/TimestampNestedBatchReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/TimestampNestedBatchReader.java
new file mode 100644
index 0000000000000..e5c867a4d9893
--- /dev/null
+++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/TimestampNestedBatchReader.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.parquet.batchreader;
+
+import com.facebook.presto.common.block.Block;
+import com.facebook.presto.common.block.LongArrayBlock;
+import com.facebook.presto.common.block.RunLengthEncodedBlock;
+import com.facebook.presto.parquet.RichColumnDescriptor;
+import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.TimestampValuesDecoder;
+import com.facebook.presto.parquet.reader.ColumnChunk;
+
+import java.io.IOException;
+import java.util.Optional;
+
+public class TimestampNestedBatchReader
+ extends AbstractNestedBatchReader
+{
+ public TimestampNestedBatchReader(RichColumnDescriptor columnDescriptor)
+ {
+ super(columnDescriptor);
+ }
+
+ @Override
+ protected ColumnChunk readNestedWithNull()
+ throws IOException
+ {
+ int maxDefinitionLevel = columnDescriptor.getMaxDefinitionLevel();
+ RepetitionLevelDecodingContext repetitionLevelDecodingContext = readRepetitionLevels(nextBatchSize);
+ DefinitionLevelDecodingContext definitionLevelDecodingContext = readDefinitionLevels(repetitionLevelDecodingContext.getDLValuesDecoderContexts(), repetitionLevelDecodingContext.getRepetitionLevels().length);
+
+ int[] definitionLevels = definitionLevelDecodingContext.getDefinitionLevels();
+ int newBatchSize = 0;
+ int batchNonNullCount = 0;
+ for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
+ int nonNullCount = 0;
+ int valueCount = 0;
+ for (int i = valuesDecoderContext.getStart(); i < valuesDecoderContext.getEnd(); i++) {
+ nonNullCount += (definitionLevels[i] == maxDefinitionLevel ? 1 : 0);
+ valueCount += (definitionLevels[i] >= maxDefinitionLevel - 1 ? 1 : 0);
+ }
+ batchNonNullCount += nonNullCount;
+ newBatchSize += valueCount;
+ valuesDecoderContext.setNonNullCount(nonNullCount);
+ valuesDecoderContext.setValueCount(valueCount);
+ }
+
+ if (batchNonNullCount == 0) {
+ Block block = RunLengthEncodedBlock.create(field.getType(), null, newBatchSize);
+ return new ColumnChunk(block, definitionLevels, repetitionLevelDecodingContext.getRepetitionLevels());
+ }
+
+ long[] values = new long[newBatchSize];
+ boolean[] isNull = new boolean[newBatchSize];
+ int offset = 0;
+ for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
+ ((TimestampValuesDecoder) valuesDecoderContext.getValuesDecoder()).readNext(values, offset, valuesDecoderContext.getNonNullCount());
+
+ int valueDestinationIndex = offset + valuesDecoderContext.getValueCount() - 1;
+ int valueSourceIndex = offset + valuesDecoderContext.getNonNullCount() - 1;
+ int definitionLevelIndex = valuesDecoderContext.getEnd() - 1;
+
+ while (valueDestinationIndex >= offset) {
+ if (definitionLevels[definitionLevelIndex] == maxDefinitionLevel) {
+ values[valueDestinationIndex--] = values[valueSourceIndex--];
+ }
+ else if (definitionLevels[definitionLevelIndex] == maxDefinitionLevel - 1) {
+ values[valueDestinationIndex] = 0;
+ isNull[valueDestinationIndex] = true;
+ valueDestinationIndex--;
+ }
+ definitionLevelIndex--;
+ }
+ offset += valuesDecoderContext.getValueCount();
+ }
+
+ boolean hasNoNull = batchNonNullCount == newBatchSize;
+ Block block = new LongArrayBlock(newBatchSize, hasNoNull ? Optional.empty() : Optional.of(isNull), values);
+ return new ColumnChunk(block, definitionLevels, repetitionLevelDecodingContext.getRepetitionLevels());
+ }
+
+ @Override
+ protected ColumnChunk readNestedNoNull()
+ throws IOException
+ {
+ int maxDefinitionLevel = columnDescriptor.getMaxDefinitionLevel();
+ RepetitionLevelDecodingContext repetitionLevelDecodingContext = readRepetitionLevels(nextBatchSize);
+ DefinitionLevelDecodingContext definitionLevelDecodingContext = readDefinitionLevels(repetitionLevelDecodingContext.getDLValuesDecoderContexts(), repetitionLevelDecodingContext.getRepetitionLevels().length);
+
+ int[] definitionLevels = definitionLevelDecodingContext.getDefinitionLevels();
+ int newBatchSize = 0;
+ for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
+ int valueCount = 0;
+ for (int i = valuesDecoderContext.getStart(); i < valuesDecoderContext.getEnd(); i++) {
+ valueCount += (definitionLevels[i] == maxDefinitionLevel ? 1 : 0);
+ }
+ newBatchSize += valueCount;
+ valuesDecoderContext.setNonNullCount(valueCount);
+ valuesDecoderContext.setValueCount(valueCount);
+ }
+
+ long[] values = new long[newBatchSize];
+ int offset = 0;
+ for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
+ ((TimestampValuesDecoder) valuesDecoderContext.getValuesDecoder()).readNext(values, offset, valuesDecoderContext.getNonNullCount());
+ offset += valuesDecoderContext.getValueCount();
+ }
+
+ Block block = new LongArrayBlock(newBatchSize, Optional.empty(), values);
+ return new ColumnChunk(block, definitionLevels, repetitionLevelDecodingContext.getRepetitionLevels());
+ }
+
+ @Override
+ protected void seek()
+ throws IOException
+ {
+ if (readOffset == 0) {
+ return;
+ }
+ int maxDefinitionLevel = columnDescriptor.getMaxDefinitionLevel();
+ RepetitionLevelDecodingContext repetitionLevelDecodingContext = readRepetitionLevels(readOffset);
+ DefinitionLevelDecodingContext definitionLevelDecodingContext = readDefinitionLevels(repetitionLevelDecodingContext.getDLValuesDecoderContexts(), repetitionLevelDecodingContext.getRepetitionLevels().length);
+
+ int[] definitionLevels = definitionLevelDecodingContext.getDefinitionLevels();
+ for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
+ int valueCount = 0;
+ for (int i = valuesDecoderContext.getStart(); i < valuesDecoderContext.getEnd(); i++) {
+ valueCount += (definitionLevels[i] == maxDefinitionLevel ? 1 : 0);
+ }
+ TimestampValuesDecoder intValuesDecoder = (TimestampValuesDecoder) valuesDecoderContext.getValuesDecoder();
+ intValuesDecoder.skip(valueCount);
+ }
+ }
+}