diff --git a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java
index ef97abf74c..b9f1797cb3 100644
--- a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java
@@ -63,7 +63,7 @@ public abstract class AbstractColumnReader implements AutoCloseable {
/** A pointer to the native implementation of ColumnReader. */
protected long nativeHandle;
- public AbstractColumnReader(
+ AbstractColumnReader(
DataType type,
Type fieldType,
ColumnDescriptor descriptor,
@@ -76,7 +76,7 @@ public AbstractColumnReader(
this.useLegacyDateTimestamp = useLegacyDateTimestamp;
}
- public AbstractColumnReader(
+ AbstractColumnReader(
DataType type,
ColumnDescriptor descriptor,
boolean useDecimal128,
@@ -85,7 +85,7 @@ public AbstractColumnReader(
TypeUtil.checkParquetType(descriptor, type);
}
- public ColumnDescriptor getDescriptor() {
+ ColumnDescriptor getDescriptor() {
return descriptor;
}
diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java b/common/src/main/java/org/apache/comet/parquet/BatchReader.java
index 538de4a66d..edac28ec1b 100644
--- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java
@@ -186,7 +186,8 @@ public BatchReader(
}
/**
- * @deprecated since 0.9.1, will be removed in 0.10.0.
+ * @deprecated since 0.10.0, will be removed in 0.11.0.
+ * @see Comet Issue #2079
*/
public BatchReader(AbstractColumnReader[] columnReaders) {
// Todo: set useDecimal128 and useLazyMaterialization
@@ -383,14 +384,16 @@ public void init() throws URISyntaxException, IOException {
}
/**
- * @deprecated since 0.9.1, will be removed in 0.10.0.
+ * @deprecated since 0.10.0, will be removed in 0.11.0.
+ * @see Comet Issue #2079
*/
public void setSparkSchema(StructType schema) {
this.sparkSchema = schema;
}
/**
- * @deprecated since 0.9.1, will be removed in 0.10.0.
+ * @deprecated since 0.10.0, will be removed in 0.11.0.
+ * @see Comet Issue #2079
*/
public AbstractColumnReader[] getColumnReaders() {
return columnReaders;
diff --git a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
index b2fe965e2e..968da1959c 100644
--- a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
@@ -93,7 +93,7 @@ public class ColumnReader extends AbstractColumnReader {
private ArrowArray array = null;
private ArrowSchema schema = null;
- public ColumnReader(
+ ColumnReader(
DataType type,
ColumnDescriptor descriptor,
CometSchemaImporter importer,
@@ -111,6 +111,8 @@ public ColumnReader(
* Set the page reader for a new column chunk to read. Expects to call `readBatch` after this.
*
* @param pageReader the page reader for the new column chunk
+ * @deprecated since 0.10.0, will be removed in 0.11.0.
+ * @see Comet Issue #2079
*/
public void setPageReader(PageReader pageReader) throws IOException {
this.pageReader = pageReader;
diff --git a/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java
index e010f8ab78..b8fc49a175 100644
--- a/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java
@@ -38,7 +38,7 @@ public class ConstantColumnReader extends MetadataColumnReader {
/** The constant value in the format of Object that are used to initialize this column reader. */
private Object value;
- public ConstantColumnReader(StructField field, int batchSize, boolean useDecimal128) {
+ ConstantColumnReader(StructField field, int batchSize, boolean useDecimal128) {
this(field.dataType(), TypeUtil.convertToParquet(field), batchSize, useDecimal128);
this.value =
ResolveDefaultColumns.getExistenceDefaultValues(new StructType(new StructField[] {field}))[
@@ -46,18 +46,29 @@ public ConstantColumnReader(StructField field, int batchSize, boolean useDecimal
init(value);
}
- public ConstantColumnReader(
+ ConstantColumnReader(
StructField field, int batchSize, InternalRow values, int index, boolean useDecimal128) {
this(field.dataType(), TypeUtil.convertToParquet(field), batchSize, useDecimal128);
init(values, index);
}
+ /**
+ * @deprecated since 0.10.0, will be removed in 0.11.0.
+ * @see Comet Issue #2079
+ */
public ConstantColumnReader(
DataType type, ColumnDescriptor descriptor, Object value, boolean useDecimal128) {
super(type, descriptor, useDecimal128, true);
this.value = value;
}
+ // Used by Iceberg
+ public ConstantColumnReader(
+ DataType type, ParquetColumnSpec spec, Object value, boolean useDecimal128) {
+ super(type, spec, useDecimal128, true);
+ this.value = value;
+ }
+
ConstantColumnReader(
DataType type, ColumnDescriptor descriptor, int batchSize, boolean useDecimal128) {
super(type, descriptor, useDecimal128, true);
diff --git a/common/src/main/java/org/apache/comet/parquet/FileReader.java b/common/src/main/java/org/apache/comet/parquet/FileReader.java
index af6c5b3c0b..fa0d81f13e 100644
--- a/common/src/main/java/org/apache/comet/parquet/FileReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/FileReader.java
@@ -43,7 +43,6 @@
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.Preconditions;
@@ -130,15 +129,14 @@ public class FileReader implements Closeable {
private RowGroupReader currentRowGroup = null;
private InternalFileDecryptor fileDecryptor;
- public FileReader(InputFile file, ParquetReadOptions options, ReadOptions cometOptions)
+ FileReader(InputFile file, ParquetReadOptions options, ReadOptions cometOptions)
throws IOException {
this(file, null, options, cometOptions, null);
}
/** This constructor is called from Apache Iceberg. */
public FileReader(
- Path path,
- Configuration conf,
+ WrappedInputFile file,
ReadOptions cometOptions,
Map properties,
Long start,
@@ -147,9 +145,10 @@ public FileReader(
byte[] fileAADPrefix)
throws IOException {
ParquetReadOptions options =
- buildParquetReadOptions(conf, properties, start, length, fileEncryptionKey, fileAADPrefix);
+ buildParquetReadOptions(
+ new Configuration(), properties, start, length, fileEncryptionKey, fileAADPrefix);
this.converter = new ParquetMetadataConverter(options);
- this.file = CometInputFile.fromPath(path, conf);
+ this.file = file;
this.f = file.newStream();
this.options = options;
this.cometOptions = cometOptions;
@@ -177,7 +176,7 @@ public FileReader(
this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
}
- public FileReader(
+ FileReader(
InputFile file,
ParquetReadOptions options,
ReadOptions cometOptions,
@@ -186,7 +185,7 @@ public FileReader(
this(file, null, options, cometOptions, metrics);
}
- public FileReader(
+ FileReader(
InputFile file,
ParquetMetadata footer,
ParquetReadOptions options,
@@ -226,12 +225,12 @@ public FileReader(
}
/** Returns the footer of the Parquet file being read. */
- public ParquetMetadata getFooter() {
+ ParquetMetadata getFooter() {
return this.footer;
}
/** Returns the metadata of the Parquet file being read. */
- public FileMetaData getFileMetaData() {
+ FileMetaData getFileMetaData() {
return this.fileMetaData;
}
diff --git a/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java b/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java
index b22278ea78..f2772908b9 100644
--- a/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java
@@ -43,7 +43,7 @@ public class LazyColumnReader extends ColumnReader {
// The lazy vector being updated.
private final CometLazyVector vector;
- public LazyColumnReader(
+ LazyColumnReader(
DataType sparkReadType,
ColumnDescriptor descriptor,
CometSchemaImporter importer,
diff --git a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java
index 2820c42f89..6240c8c8c5 100644
--- a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java
@@ -42,6 +42,10 @@ public class MetadataColumnReader extends AbstractColumnReader {
private boolean isConstant;
+ /**
+ * @deprecated since 0.10.0, will be made package private in 0.11.0.
+ * @see Comet Issue #2079
+ */
public MetadataColumnReader(
DataType type, ColumnDescriptor descriptor, boolean useDecimal128, boolean isConstant) {
// TODO: should we handle legacy dates & timestamps for metadata columns?
@@ -50,6 +54,15 @@ public MetadataColumnReader(
this.isConstant = isConstant;
}
+ // Used by Iceberg
+ public MetadataColumnReader(
+ DataType type, ParquetColumnSpec spec, boolean useDecimal128, boolean isConstant) {
+ // TODO: should we handle legacy dates & timestamps for metadata columns?
+ super(type, Utils.buildColumnDescriptor(spec), useDecimal128, false);
+
+ this.isConstant = isConstant;
+ }
+
@Override
public void setBatchSize(int batchSize) {
close();
diff --git a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
index ec22c8e4db..7595242c34 100644
--- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
@@ -186,7 +186,7 @@ public NativeBatchReader(
this.taskContext = TaskContext$.MODULE$.get();
}
- public NativeBatchReader(AbstractColumnReader[] columnReaders) {
+ private NativeBatchReader(AbstractColumnReader[] columnReaders) {
// Todo: set useDecimal128 and useLazyMaterialization
int numColumns = columnReaders.length;
this.columnReaders = new AbstractColumnReader[numColumns];
diff --git a/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java b/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java
index 50838ef771..88447c1473 100644
--- a/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java
@@ -69,7 +69,7 @@ public class NativeColumnReader extends AbstractColumnReader {
private long nativeBatchHandle = 0xDEADBEEFL;
private final int columnNum;
- public NativeColumnReader(
+ NativeColumnReader(
long nativeBatchHandle,
int columnNum,
DataType type,
diff --git a/common/src/main/java/org/apache/comet/parquet/ParquetColumnSpec.java b/common/src/main/java/org/apache/comet/parquet/ParquetColumnSpec.java
index 49007f925d..805aaa033d 100644
--- a/common/src/main/java/org/apache/comet/parquet/ParquetColumnSpec.java
+++ b/common/src/main/java/org/apache/comet/parquet/ParquetColumnSpec.java
@@ -21,6 +21,12 @@
import java.util.Map;
+/**
+ * Parquet ColumnSpec encapsulates the information withing a Parquet ColumnDescriptor. Utility
+ * methods can convert from and to a ColumnDescriptor The only purpose of this class is to allow
+ * passing of Column descriptors between Comet and Iceberg. This is required because Iceberg shades
+ * Parquet, changing the package of Parquet classes and making then incompatible with Comet.
+ */
public class ParquetColumnSpec {
private final int fieldId;
diff --git a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java
index 1e9d5b937c..889e2baf50 100644
--- a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java
+++ b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java
@@ -31,9 +31,16 @@
import org.apache.comet.CometConf;
+import static org.apache.comet.parquet.Utils.descriptorToParquetColumnSpec;
+
public class TypeUtil {
- /** Converts the input Spark 'field' into a Parquet column descriptor. */
+ /**
+ * Converts the input Spark 'field' into a Parquet column descriptor.
+ *
+ * @deprecated since 0.10.0, will be removed in 0.11.0.
+ * @see Comet Issue #2079
+ */
public static ColumnDescriptor convertToParquet(StructField field) {
Type.Repetition repetition;
int maxDefinitionLevel;
@@ -105,6 +112,10 @@ public static ColumnDescriptor convertToParquet(StructField field) {
return new ColumnDescriptor(path, builder.named(field.name()), 0, maxDefinitionLevel);
}
+ public static ParquetColumnSpec convertToParquetSpec(StructField field) {
+ return descriptorToParquetColumnSpec(convertToParquet(field));
+ }
+
/**
* Check whether the Parquet 'descriptor' and Spark read type 'sparkType' are compatible. If not,
* throw exception.
diff --git a/common/src/main/java/org/apache/comet/parquet/Utils.java b/common/src/main/java/org/apache/comet/parquet/Utils.java
index 3e2e093a85..7fb2eac5b6 100644
--- a/common/src/main/java/org/apache/comet/parquet/Utils.java
+++ b/common/src/main/java/org/apache/comet/parquet/Utils.java
@@ -19,6 +19,9 @@
package org.apache.comet.parquet;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
@@ -54,8 +57,9 @@ public static ColumnReader getColumnReader(
/**
* This method is called from Apache Iceberg.
*
- * @deprecated since 0.9.1, will be removed in 0.10.0; use getColumnReader with ParquetColumnSpec
+ * @deprecated since 0.10.0, will be removed in 0.11.0; use getColumnReader with ParquetColumnSpec
* instead.
+ * @see Comet Issue #2079
*/
public static ColumnReader getColumnReader(
DataType type,
@@ -453,4 +457,66 @@ private static LogicalTypeAnnotation reconstructLogicalType(
throw new IllegalArgumentException("Unknown logical type: " + logicalTypeName);
}
}
+
+ public static ParquetColumnSpec descriptorToParquetColumnSpec(ColumnDescriptor descriptor) {
+
+ String[] path = descriptor.getPath();
+ PrimitiveType primitiveType = descriptor.getPrimitiveType();
+ String physicalType = primitiveType.getPrimitiveTypeName().name();
+
+ int typeLength =
+ primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
+ ? primitiveType.getTypeLength()
+ : 0;
+
+ boolean isRepeated = primitiveType.getRepetition() == Type.Repetition.REPEATED;
+
+ String logicalTypeName = null;
+ Map logicalTypeParams = new HashMap<>();
+ LogicalTypeAnnotation logicalType = primitiveType.getLogicalTypeAnnotation();
+
+ if (logicalType != null) {
+ logicalTypeName = logicalType.getClass().getSimpleName();
+
+ // Handle specific logical types
+ if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
+ LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal =
+ (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType;
+ logicalTypeParams.put("precision", String.valueOf(decimal.getPrecision()));
+ logicalTypeParams.put("scale", String.valueOf(decimal.getScale()));
+ } else if (logicalType instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
+ LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamp =
+ (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType;
+ logicalTypeParams.put("isAdjustedToUTC", String.valueOf(timestamp.isAdjustedToUTC()));
+ logicalTypeParams.put("unit", timestamp.getUnit().name());
+ } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
+ LogicalTypeAnnotation.TimeLogicalTypeAnnotation time =
+ (LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType;
+ logicalTypeParams.put("isAdjustedToUTC", String.valueOf(time.isAdjustedToUTC()));
+ logicalTypeParams.put("unit", time.getUnit().name());
+ } else if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) {
+ LogicalTypeAnnotation.IntLogicalTypeAnnotation intType =
+ (LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType;
+ logicalTypeParams.put("isSigned", String.valueOf(intType.isSigned()));
+ logicalTypeParams.put("bitWidth", String.valueOf(intType.getBitWidth()));
+ }
+ }
+
+ int id = -1;
+ Type type = descriptor.getPrimitiveType();
+ if (type != null && type.getId() != null) {
+ id = type.getId().intValue();
+ }
+
+ return new ParquetColumnSpec(
+ id,
+ path,
+ physicalType,
+ typeLength,
+ isRepeated,
+ descriptor.getMaxDefinitionLevel(),
+ descriptor.getMaxRepetitionLevel(),
+ logicalTypeName,
+ logicalTypeParams);
+ }
}
diff --git a/common/src/main/java/org/apache/comet/parquet/WrappedInputFile.java b/common/src/main/java/org/apache/comet/parquet/WrappedInputFile.java
new file mode 100644
index 0000000000..666d4c2e7b
--- /dev/null
+++ b/common/src/main/java/org/apache/comet/parquet/WrappedInputFile.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.parquet;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+
+/**
+ * Wraps an Object that possibly implements the methods of a Parquet InputFile (but is not a Parquet
+ * InputFile). Such an object` exists, for instance, in Iceberg's InputFile
+ */
+public class WrappedInputFile implements InputFile {
+ Object wrapped;
+
+ public WrappedInputFile(Object inputFile) {
+ this.wrapped = inputFile;
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ try {
+ Method targetMethod = wrapped.getClass().getDeclaredMethod("getLength"); //
+ targetMethod.setAccessible(true);
+ return (long) targetMethod.invoke(wrapped);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public SeekableInputStream newStream() throws IOException {
+ try {
+ Method targetMethod = wrapped.getClass().getDeclaredMethod("newStream"); //
+ targetMethod.setAccessible(true);
+ InputStream stream = (InputStream) targetMethod.invoke(wrapped);
+ return new WrappedSeekableInputStream(stream);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return wrapped.toString();
+ }
+}
diff --git a/common/src/main/java/org/apache/comet/parquet/WrappedSeekableInputStream.java b/common/src/main/java/org/apache/comet/parquet/WrappedSeekableInputStream.java
new file mode 100644
index 0000000000..c463617bd6
--- /dev/null
+++ b/common/src/main/java/org/apache/comet/parquet/WrappedSeekableInputStream.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.parquet;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+import java.util.Objects;
+
+import org.apache.parquet.io.DelegatingSeekableInputStream;
+
+/**
+ * Wraps an InputStream that possibly implements the methods of a Parquet SeekableInputStream (but
+ * is not a Parquet SeekableInputStream). Such an InputStream exists, for instance, in Iceberg's
+ * SeekableInputStream
+ */
+public class WrappedSeekableInputStream extends DelegatingSeekableInputStream {
+
+ private final InputStream wrappedInputStream; // The InputStream we are wrapping
+
+ public WrappedSeekableInputStream(InputStream inputStream) {
+ super(inputStream);
+ this.wrappedInputStream = Objects.requireNonNull(inputStream, "InputStream cannot be null");
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ try {
+ Method targetMethod = wrappedInputStream.getClass().getDeclaredMethod("getPos"); //
+ targetMethod.setAccessible(true);
+ return (long) targetMethod.invoke(wrappedInputStream);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void seek(long newPos) throws IOException {
+ try {
+ Method targetMethod = wrappedInputStream.getClass().getDeclaredMethod("seek", long.class);
+ targetMethod.setAccessible(true);
+ targetMethod.invoke(wrappedInputStream, newPos);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/common/src/main/java/org/apache/comet/vector/CometDelegateVector.java b/common/src/main/java/org/apache/comet/vector/CometDelegateVector.java
index 75e62ea0fe..8874d11b70 100644
--- a/common/src/main/java/org/apache/comet/vector/CometDelegateVector.java
+++ b/common/src/main/java/org/apache/comet/vector/CometDelegateVector.java
@@ -48,7 +48,7 @@ public CometDelegateVector(DataType dataType, CometVector delegate, boolean useD
this.delegate = delegate;
}
- public void setDelegate(CometVector delegate) {
+ protected void setDelegate(CometVector delegate) {
this.delegate = delegate;
}