diff --git a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java index 099c7b9733..ef97abf74c 100644 --- a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java @@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.Type; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.TimestampNTZType$; @@ -36,6 +37,9 @@ public abstract class AbstractColumnReader implements AutoCloseable { /** The Spark data type. */ protected final DataType type; + /** The Spark data type. */ + protected final Type fieldType; + /** Parquet column descriptor. */ protected final ColumnDescriptor descriptor; @@ -61,13 +65,23 @@ public abstract class AbstractColumnReader implements AutoCloseable { public AbstractColumnReader( DataType type, + Type fieldType, ColumnDescriptor descriptor, boolean useDecimal128, boolean useLegacyDateTimestamp) { this.type = type; + this.fieldType = fieldType; this.descriptor = descriptor; this.useDecimal128 = useDecimal128; this.useLegacyDateTimestamp = useLegacyDateTimestamp; + } + + public AbstractColumnReader( + DataType type, + ColumnDescriptor descriptor, + boolean useDecimal128, + boolean useLegacyDateTimestamp) { + this(type, null, descriptor, useDecimal128, useLegacyDateTimestamp); TypeUtil.checkParquetType(descriptor, type); } diff --git a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java index 625bde85cc..9d79b707a5 100644 --- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java @@ -72,6 +72,7 @@ import org.apache.comet.shims.ShimBatchReader; import org.apache.comet.shims.ShimFileFormat; import org.apache.comet.vector.CometVector; +import org.apache.comet.vector.NativeUtil; /** * A vectorized Parquet reader that reads a Parquet file in a batched fashion. @@ -94,6 +95,7 @@ public class NativeBatchReader extends RecordReader implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(NativeBatchReader.class); protected static final BufferAllocator ALLOCATOR = new RootAllocator(); + private NativeUtil nativeUtil = new NativeUtil(); private Configuration conf; private int capacity; @@ -266,7 +268,8 @@ public void init() throws URISyntaxException, IOException { //// Create Column readers List columns = requestedSchema.getColumns(); - int numColumns = columns.size(); + List fields = requestedSchema.getFields(); + int numColumns = fields.size(); if (partitionSchema != null) numColumns += partitionSchema.size(); columnReaders = new AbstractColumnReader[numColumns]; @@ -454,6 +457,7 @@ public void close() throws IOException { importer.close(); importer = null; } + nativeUtil.close(); Native.closeRecordBatchReader(this.handle); } @@ -469,19 +473,23 @@ private int loadNextBatch() throws Throwable { importer = new CometSchemaImporter(ALLOCATOR); List columns = requestedSchema.getColumns(); - for (int i = 0; i < columns.size(); i++) { + List fields = requestedSchema.getFields(); + for (int i = 0; i < fields.size(); i++) { // TODO: (ARROW NATIVE) check this. Currently not handling missing columns correctly? if (missingColumns[i]) continue; if (columnReaders[i] != null) columnReaders[i].close(); // TODO: (ARROW NATIVE) handle tz, datetime & int96 rebase DataType dataType = sparkSchema.fields()[i].dataType(); + Type field = fields.get(i); NativeColumnReader reader = new NativeColumnReader( this.handle, i, dataType, - columns.get(i), + field, + null, importer, + nativeUtil, capacity, useDecimal128, useLegacyDateTimestamp); diff --git a/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java b/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java index 448ba0fec6..c358999a54 100644 --- a/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java @@ -27,12 +27,8 @@ import org.apache.arrow.c.CometSchemaImporter; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.FieldVector; -import org.apache.arrow.vector.dictionary.Dictionary; -import org.apache.arrow.vector.types.pojo.DictionaryEncoding; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.*; -import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.Type; import org.apache.spark.sql.types.DataType; import org.apache.comet.vector.*; @@ -65,6 +61,7 @@ public class NativeColumnReader extends AbstractColumnReader { boolean hadNull; private final CometSchemaImporter importer; + private final NativeUtil nativeUtil; private ArrowArray array = null; private ArrowSchema schema = null; @@ -76,14 +73,17 @@ public NativeColumnReader( long nativeBatchHandle, int columnNum, DataType type, + Type fieldType, ColumnDescriptor descriptor, CometSchemaImporter importer, + NativeUtil nativeUtil, int batchSize, boolean useDecimal128, boolean useLegacyDateTimestamp) { - super(type, descriptor, useDecimal128, useLegacyDateTimestamp); + super(type, fieldType, descriptor, useDecimal128, useLegacyDateTimestamp); assert batchSize > 0 : "Batch size must be positive, found " + batchSize; this.batchSize = batchSize; + this.nativeUtil = nativeUtil; this.importer = importer; this.nativeBatchHandle = nativeBatchHandle; this.columnNum = columnNum; @@ -94,13 +94,13 @@ public NativeColumnReader( // Override in order to avoid creation of JVM side column readers protected void initNative() { LOG.debug( - "Native column reader " + String.join(".", this.descriptor.getPath()) + " is initialized"); + "Native column reader {} is initialized", String.join(".", this.type.catalogString())); nativeHandle = 0; } @Override public void readBatch(int total) { - LOG.debug("Reading column batch of size = " + total); + LOG.debug("Reading column batch of size = {}", total); this.currentNumValues = total; } @@ -131,10 +131,7 @@ public CometDecodedVector loadVector() { currentVector.close(); } - LogicalTypeAnnotation logicalTypeAnnotation = - descriptor.getPrimitiveType().getLogicalTypeAnnotation(); - boolean isUuid = - logicalTypeAnnotation instanceof LogicalTypeAnnotation.UUIDLogicalTypeAnnotation; + // TODO: ARROW NATIVE : Handle Uuid? array = ArrowArray.allocateNew(ALLOCATOR); schema = ArrowSchema.allocateNew(ALLOCATOR); @@ -144,47 +141,19 @@ public CometDecodedVector loadVector() { Native.currentColumnBatch(nativeBatchHandle, columnNum, arrayAddr, schemaAddr); - FieldVector vector = importer.importVector(array, schema); + ArrowArray[] arrays = {array}; + ArrowSchema[] schemas = {schema}; - DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary(); - - CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128); + CometDecodedVector cometVector = + (CometDecodedVector) + scala.collection.JavaConverters.seqAsJavaList(nativeUtil.importVector(arrays, schemas)) + .get(0); // Update whether the current vector contains any null values. This is used in the following // batch(s) to determine whether we can skip loading the native vector. hadNull = cometVector.hasNull(); - if (dictionaryEncoding == null) { - if (dictionary != null) { - // This means the column was using dictionary encoding but now has fall-back to plain - // encoding, on the native side. Setting 'dictionary' to null here, so we can use it as - // a condition to check if we can re-use vector later. - dictionary = null; - } - // Either the column is not dictionary encoded, or it was using dictionary encoding but - // a new data page has switched back to use plain encoding. For both cases we should - // return plain vector. - currentVector = cometVector; - return currentVector; - } - - // We should already re-initiate `CometDictionary` here because `Data.importVector` API will - // release the previous dictionary vector and create a new one. - Dictionary arrowDictionary = importer.getProvider().lookup(dictionaryEncoding.getId()); - CometPlainVector dictionaryVector = - new CometPlainVector(arrowDictionary.getVector(), useDecimal128, isUuid); - if (dictionary != null) { - dictionary.setDictionaryVector(dictionaryVector); - } else { - dictionary = new CometDictionary(dictionaryVector); - } - - currentVector = - new CometDictionaryVector( - cometVector, dictionary, importer.getProvider(), useDecimal128, false, isUuid); - - currentVector = - new CometDictionaryVector(cometVector, dictionary, importer.getProvider(), useDecimal128); + currentVector = cometVector; return currentVector; } } diff --git a/common/src/main/java/org/apache/comet/vector/CometVector.java b/common/src/main/java/org/apache/comet/vector/CometVector.java index 3b0ca35bf9..6be8b28669 100644 --- a/common/src/main/java/org/apache/comet/vector/CometVector.java +++ b/common/src/main/java/org/apache/comet/vector/CometVector.java @@ -232,7 +232,7 @@ public DictionaryProvider getDictionaryProvider() { * @param useDecimal128 Whether to use Decimal128 for decimal column * @return `CometVector` implementation */ - protected static CometVector getVector( + public static CometVector getVector( ValueVector vector, boolean useDecimal128, DictionaryProvider dictionaryProvider) { if (vector instanceof StructVector) { return new CometStructVector(vector, useDecimal128, dictionaryProvider);