diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java b/common/src/main/java/org/apache/comet/parquet/BatchReader.java index 663406d0a9..538de4a66d 100644 --- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java @@ -99,20 +99,20 @@ public class BatchReader extends RecordReader implements Cl private StructType partitionSchema; private InternalRow partitionValues; private PartitionedFile file; - private final Map metrics; + protected Map metrics; private long rowsRead; - private StructType sparkSchema; + protected StructType sparkSchema; private MessageType requestedSchema; - private CometVector[] vectors; - private AbstractColumnReader[] columnReaders; + protected CometVector[] vectors; + protected AbstractColumnReader[] columnReaders; private CometSchemaImporter importer; - private ColumnarBatch currentBatch; + protected ColumnarBatch currentBatch; private Future> prefetchTask; private LinkedBlockingQueue> prefetchQueue; private FileReader fileReader; private boolean[] missingColumns; - private boolean isInitialized; + protected boolean isInitialized; private ParquetMetadata footer; /** The total number of rows across all row groups of the input split. */ @@ -143,7 +143,9 @@ public class BatchReader extends RecordReader implements Cl private boolean useLegacyDateTimestamp; /** The TaskContext object for executing this task. */ - private final TaskContext taskContext; + private TaskContext taskContext; + + public BatchReader() {} // Only for testing public BatchReader(String file, int capacity) { @@ -183,6 +185,9 @@ public BatchReader( this.taskContext = TaskContext$.MODULE$.get(); } + /** + * @deprecated since 0.9.1, will be removed in 0.10.0. + */ public BatchReader(AbstractColumnReader[] columnReaders) { // Todo: set useDecimal128 and useLazyMaterialization int numColumns = columnReaders.length; @@ -377,10 +382,16 @@ public void init() throws URISyntaxException, IOException { } } + /** + * @deprecated since 0.9.1, will be removed in 0.10.0. + */ public void setSparkSchema(StructType schema) { this.sparkSchema = schema; } + /** + * @deprecated since 0.9.1, will be removed in 0.10.0. + */ public AbstractColumnReader[] getColumnReaders() { return columnReaders; } diff --git a/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java b/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java new file mode 100644 index 0000000000..8953c9d935 --- /dev/null +++ b/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.parquet; + +import java.util.HashMap; + +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnarBatch; + +import org.apache.comet.vector.CometVector; + +public class IcebergCometBatchReader extends BatchReader { + public IcebergCometBatchReader(int numColumns, StructType schema) { + this.columnReaders = new AbstractColumnReader[numColumns]; + this.vectors = new CometVector[numColumns]; + this.currentBatch = new ColumnarBatch(vectors); + this.metrics = new HashMap<>(); + this.sparkSchema = schema; + } + + public void init(AbstractColumnReader[] columnReaders) { + this.columnReaders = columnReaders; + this.isInitialized = true; + } +} diff --git a/common/src/main/java/org/apache/comet/parquet/Utils.java b/common/src/main/java/org/apache/comet/parquet/Utils.java index fc7d1ab871..3e2e093a85 100644 --- a/common/src/main/java/org/apache/comet/parquet/Utils.java +++ b/common/src/main/java/org/apache/comet/parquet/Utils.java @@ -37,13 +37,26 @@ public static ColumnReader getColumnReader( CometSchemaImporter importer, int batchSize, boolean useDecimal128, - boolean useLazyMaterialization) { + boolean useLazyMaterialization, + boolean useLegacyTimestamp) { ColumnDescriptor descriptor = buildColumnDescriptor(columnSpec); return getColumnReader( - type, descriptor, importer, batchSize, useDecimal128, useLazyMaterialization, true); + type, + descriptor, + importer, + batchSize, + useDecimal128, + useLazyMaterialization, + useLegacyTimestamp); } + /** + * This method is called from Apache Iceberg. + * + * @deprecated since 0.9.1, will be removed in 0.10.0; use getColumnReader with ParquetColumnSpec + * instead. + */ public static ColumnReader getColumnReader( DataType type, ColumnDescriptor descriptor,