From 151063a6f2eee8440d5343bf0b515a4319526167 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Fri, 1 Aug 2025 15:18:01 -0700 Subject: [PATCH 1/5] fix: [iceberg] more fixes for Iceberg intergation APIs. --- .../comet/parquet/AbstractColumnReader.java | 6 +- .../apache/comet/parquet/ColumnReader.java | 4 +- .../comet/parquet/ConstantColumnReader.java | 9 +-- .../org/apache/comet/parquet/FileReader.java | 19 +++--- .../comet/parquet/LazyColumnReader.java | 2 +- .../comet/parquet/MetadataColumnReader.java | 11 ++- .../comet/parquet/NativeBatchReader.java | 2 +- .../comet/parquet/NativeColumnReader.java | 2 +- .../comet/parquet/ParquetColumnSpec.java | 6 ++ .../org/apache/comet/parquet/TypeUtil.java | 8 ++- .../java/org/apache/comet/parquet/Utils.java | 59 ++++++++++++++++ .../comet/parquet/WrappedInputFile.java | 68 +++++++++++++++++++ .../parquet/WrappedSeekableInputStream.java | 64 +++++++++++++++++ .../comet/vector/CometDelegateVector.java | 2 +- 14 files changed, 237 insertions(+), 25 deletions(-) create mode 100644 common/src/main/java/org/apache/comet/parquet/WrappedInputFile.java create mode 100644 common/src/main/java/org/apache/comet/parquet/WrappedSeekableInputStream.java diff --git a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java index ef97abf74c..b9f1797cb3 100644 --- a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java @@ -63,7 +63,7 @@ public abstract class AbstractColumnReader implements AutoCloseable { /** A pointer to the native implementation of ColumnReader. */ protected long nativeHandle; - public AbstractColumnReader( + AbstractColumnReader( DataType type, Type fieldType, ColumnDescriptor descriptor, @@ -76,7 +76,7 @@ public AbstractColumnReader( this.useLegacyDateTimestamp = useLegacyDateTimestamp; } - public AbstractColumnReader( + AbstractColumnReader( DataType type, ColumnDescriptor descriptor, boolean useDecimal128, @@ -85,7 +85,7 @@ public AbstractColumnReader( TypeUtil.checkParquetType(descriptor, type); } - public ColumnDescriptor getDescriptor() { + ColumnDescriptor getDescriptor() { return descriptor; } diff --git a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java index b2fe965e2e..06543a91c1 100644 --- a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java @@ -93,7 +93,7 @@ public class ColumnReader extends AbstractColumnReader { private ArrowArray array = null; private ArrowSchema schema = null; - public ColumnReader( + ColumnReader( DataType type, ColumnDescriptor descriptor, CometSchemaImporter importer, @@ -112,7 +112,7 @@ public ColumnReader( * * @param pageReader the page reader for the new column chunk */ - public void setPageReader(PageReader pageReader) throws IOException { + void setPageReader(PageReader pageReader) throws IOException { this.pageReader = pageReader; DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); diff --git a/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java index e010f8ab78..6a1754ff15 100644 --- a/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java @@ -38,7 +38,7 @@ public class ConstantColumnReader extends MetadataColumnReader { /** The constant value in the format of Object that are used to initialize this column reader. */ private Object value; - public ConstantColumnReader(StructField field, int batchSize, boolean useDecimal128) { + ConstantColumnReader(StructField field, int batchSize, boolean useDecimal128) { this(field.dataType(), TypeUtil.convertToParquet(field), batchSize, useDecimal128); this.value = ResolveDefaultColumns.getExistenceDefaultValues(new StructType(new StructField[] {field}))[ @@ -46,15 +46,16 @@ public ConstantColumnReader(StructField field, int batchSize, boolean useDecimal init(value); } - public ConstantColumnReader( + ConstantColumnReader( StructField field, int batchSize, InternalRow values, int index, boolean useDecimal128) { this(field.dataType(), TypeUtil.convertToParquet(field), batchSize, useDecimal128); init(values, index); } + // Used by Iceberg public ConstantColumnReader( - DataType type, ColumnDescriptor descriptor, Object value, boolean useDecimal128) { - super(type, descriptor, useDecimal128, true); + DataType type, ParquetColumnSpec spec, Object value, boolean useDecimal128) { + super(type, spec, useDecimal128, true); this.value = value; } diff --git a/common/src/main/java/org/apache/comet/parquet/FileReader.java b/common/src/main/java/org/apache/comet/parquet/FileReader.java index af6c5b3c0b..fa0d81f13e 100644 --- a/common/src/main/java/org/apache/comet/parquet/FileReader.java +++ b/common/src/main/java/org/apache/comet/parquet/FileReader.java @@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.Preconditions; @@ -130,15 +129,14 @@ public class FileReader implements Closeable { private RowGroupReader currentRowGroup = null; private InternalFileDecryptor fileDecryptor; - public FileReader(InputFile file, ParquetReadOptions options, ReadOptions cometOptions) + FileReader(InputFile file, ParquetReadOptions options, ReadOptions cometOptions) throws IOException { this(file, null, options, cometOptions, null); } /** This constructor is called from Apache Iceberg. */ public FileReader( - Path path, - Configuration conf, + WrappedInputFile file, ReadOptions cometOptions, Map properties, Long start, @@ -147,9 +145,10 @@ public FileReader( byte[] fileAADPrefix) throws IOException { ParquetReadOptions options = - buildParquetReadOptions(conf, properties, start, length, fileEncryptionKey, fileAADPrefix); + buildParquetReadOptions( + new Configuration(), properties, start, length, fileEncryptionKey, fileAADPrefix); this.converter = new ParquetMetadataConverter(options); - this.file = CometInputFile.fromPath(path, conf); + this.file = file; this.f = file.newStream(); this.options = options; this.cometOptions = cometOptions; @@ -177,7 +176,7 @@ public FileReader( this.crc = options.usePageChecksumVerification() ? new CRC32() : null; } - public FileReader( + FileReader( InputFile file, ParquetReadOptions options, ReadOptions cometOptions, @@ -186,7 +185,7 @@ public FileReader( this(file, null, options, cometOptions, metrics); } - public FileReader( + FileReader( InputFile file, ParquetMetadata footer, ParquetReadOptions options, @@ -226,12 +225,12 @@ public FileReader( } /** Returns the footer of the Parquet file being read. */ - public ParquetMetadata getFooter() { + ParquetMetadata getFooter() { return this.footer; } /** Returns the metadata of the Parquet file being read. */ - public FileMetaData getFileMetaData() { + FileMetaData getFileMetaData() { return this.fileMetaData; } diff --git a/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java b/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java index b22278ea78..f2772908b9 100644 --- a/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java @@ -43,7 +43,7 @@ public class LazyColumnReader extends ColumnReader { // The lazy vector being updated. private final CometLazyVector vector; - public LazyColumnReader( + LazyColumnReader( DataType sparkReadType, ColumnDescriptor descriptor, CometSchemaImporter importer, diff --git a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java index 2820c42f89..50c8a738e5 100644 --- a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java @@ -42,7 +42,7 @@ public class MetadataColumnReader extends AbstractColumnReader { private boolean isConstant; - public MetadataColumnReader( + MetadataColumnReader( DataType type, ColumnDescriptor descriptor, boolean useDecimal128, boolean isConstant) { // TODO: should we handle legacy dates & timestamps for metadata columns? super(type, descriptor, useDecimal128, false); @@ -50,6 +50,15 @@ public MetadataColumnReader( this.isConstant = isConstant; } + // Used by Iceberg + public MetadataColumnReader( + DataType type, ParquetColumnSpec spec, boolean useDecimal128, boolean isConstant) { + // TODO: should we handle legacy dates & timestamps for metadata columns? + super(type, Utils.buildColumnDescriptor(spec), useDecimal128, false); + + this.isConstant = isConstant; + } + @Override public void setBatchSize(int batchSize) { close(); diff --git a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java index ec22c8e4db..7595242c34 100644 --- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java @@ -186,7 +186,7 @@ public NativeBatchReader( this.taskContext = TaskContext$.MODULE$.get(); } - public NativeBatchReader(AbstractColumnReader[] columnReaders) { + private NativeBatchReader(AbstractColumnReader[] columnReaders) { // Todo: set useDecimal128 and useLazyMaterialization int numColumns = columnReaders.length; this.columnReaders = new AbstractColumnReader[numColumns]; diff --git a/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java b/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java index 50838ef771..88447c1473 100644 --- a/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java @@ -69,7 +69,7 @@ public class NativeColumnReader extends AbstractColumnReader { private long nativeBatchHandle = 0xDEADBEEFL; private final int columnNum; - public NativeColumnReader( + NativeColumnReader( long nativeBatchHandle, int columnNum, DataType type, diff --git a/common/src/main/java/org/apache/comet/parquet/ParquetColumnSpec.java b/common/src/main/java/org/apache/comet/parquet/ParquetColumnSpec.java index 49007f925d..805aaa033d 100644 --- a/common/src/main/java/org/apache/comet/parquet/ParquetColumnSpec.java +++ b/common/src/main/java/org/apache/comet/parquet/ParquetColumnSpec.java @@ -21,6 +21,12 @@ import java.util.Map; +/** + * Parquet ColumnSpec encapsulates the information withing a Parquet ColumnDescriptor. Utility + * methods can convert from and to a ColumnDescriptor The only purpose of this class is to allow + * passing of Column descriptors between Comet and Iceberg. This is required because Iceberg shades + * Parquet, changing the package of Parquet classes and making then incompatible with Comet. + */ public class ParquetColumnSpec { private final int fieldId; diff --git a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java index 1e9d5b937c..a3e3d481f2 100644 --- a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java +++ b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java @@ -31,10 +31,12 @@ import org.apache.comet.CometConf; +import static org.apache.comet.parquet.Utils.descriptorToParquetColumnSpec; + public class TypeUtil { /** Converts the input Spark 'field' into a Parquet column descriptor. */ - public static ColumnDescriptor convertToParquet(StructField field) { + static ColumnDescriptor convertToParquet(StructField field) { Type.Repetition repetition; int maxDefinitionLevel; if (field.nullable()) { @@ -105,6 +107,10 @@ public static ColumnDescriptor convertToParquet(StructField field) { return new ColumnDescriptor(path, builder.named(field.name()), 0, maxDefinitionLevel); } + public static ParquetColumnSpec convertToParquetSpec(StructField field) { + return descriptorToParquetColumnSpec(convertToParquet(field)); + } + /** * Check whether the Parquet 'descriptor' and Spark read type 'sparkType' are compatible. If not, * throw exception. diff --git a/common/src/main/java/org/apache/comet/parquet/Utils.java b/common/src/main/java/org/apache/comet/parquet/Utils.java index 3e2e093a85..a2aa3b40d0 100644 --- a/common/src/main/java/org/apache/comet/parquet/Utils.java +++ b/common/src/main/java/org/apache/comet/parquet/Utils.java @@ -19,6 +19,9 @@ package org.apache.comet.parquet; +import java.util.HashMap; +import java.util.Map; + import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; @@ -453,4 +456,60 @@ private static LogicalTypeAnnotation reconstructLogicalType( throw new IllegalArgumentException("Unknown logical type: " + logicalTypeName); } } + + public static ParquetColumnSpec descriptorToParquetColumnSpec(ColumnDescriptor descriptor) { + + String[] path = descriptor.getPath(); + PrimitiveType primitiveType = descriptor.getPrimitiveType(); + String physicalType = primitiveType.getPrimitiveTypeName().name(); + + int typeLength = + primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY + ? primitiveType.getTypeLength() + : 0; + + boolean isRepeated = primitiveType.getRepetition() == Type.Repetition.REPEATED; + + String logicalTypeName = null; + Map logicalTypeParams = new HashMap<>(); + LogicalTypeAnnotation logicalType = primitiveType.getLogicalTypeAnnotation(); + + if (logicalType != null) { + logicalTypeName = logicalType.getClass().getSimpleName(); + + // Handle specific logical types + if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal = + (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("precision", String.valueOf(decimal.getPrecision())); + logicalTypeParams.put("scale", String.valueOf(decimal.getScale())); + } else if (logicalType instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamp = + (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isAdjustedToUTC", String.valueOf(timestamp.isAdjustedToUTC())); + logicalTypeParams.put("unit", timestamp.getUnit().name()); + } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimeLogicalTypeAnnotation time = + (LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isAdjustedToUTC", String.valueOf(time.isAdjustedToUTC())); + logicalTypeParams.put("unit", time.getUnit().name()); + } else if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) { + LogicalTypeAnnotation.IntLogicalTypeAnnotation intType = + (LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isSigned", String.valueOf(intType.isSigned())); + logicalTypeParams.put("bitWidth", String.valueOf(intType.getBitWidth())); + } + } + + return new ParquetColumnSpec( + 1, // ToDo: pass in the correct id + path, + physicalType, + typeLength, + isRepeated, + descriptor.getMaxDefinitionLevel(), + descriptor.getMaxRepetitionLevel(), + logicalTypeName, + logicalTypeParams); + } } diff --git a/common/src/main/java/org/apache/comet/parquet/WrappedInputFile.java b/common/src/main/java/org/apache/comet/parquet/WrappedInputFile.java new file mode 100644 index 0000000000..7896377835 --- /dev/null +++ b/common/src/main/java/org/apache/comet/parquet/WrappedInputFile.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.parquet; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Method; + +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.SeekableInputStream; + +/** + * A Parquet {@link InputFile} implementation that's similar to {@link + * org.apache.parquet.hadoop.util.HadoopInputFile}, but with optimizations introduced in Hadoop 3.x, + * for S3 specifically. + */ +public class WrappedInputFile implements InputFile { + Object wrapped; + + public WrappedInputFile(Object inputFile) { + this.wrapped = inputFile; + } + + @Override + public long getLength() throws IOException { + try { + Method targetMethod = wrapped.getClass().getDeclaredMethod("getLength"); // + targetMethod.setAccessible(true); + return (long) targetMethod.invoke(wrapped); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public SeekableInputStream newStream() throws IOException { + try { + Method targetMethod = wrapped.getClass().getDeclaredMethod("newStream"); // + targetMethod.setAccessible(true); + InputStream stream = (InputStream) targetMethod.invoke(wrapped); + return new WrappedSeekableInputStream(stream); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public String toString() { + return wrapped.toString(); + } +} diff --git a/common/src/main/java/org/apache/comet/parquet/WrappedSeekableInputStream.java b/common/src/main/java/org/apache/comet/parquet/WrappedSeekableInputStream.java new file mode 100644 index 0000000000..fd0151c704 --- /dev/null +++ b/common/src/main/java/org/apache/comet/parquet/WrappedSeekableInputStream.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.parquet; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Method; +import java.util.Objects; + +import org.apache.parquet.io.DelegatingSeekableInputStream; + +/** + * Wraps an InputStream that possibly implements the methods of a Parquet SeekableInputStream (but + * is not a Parquet SeekableInputStream). Such an InputStream exists, fir instance, in Iceberg's + * SeekableInputStream + */ +public class WrappedSeekableInputStream extends DelegatingSeekableInputStream { + + private final InputStream wrappedInputStream; // The InputStream we are wrapping + + public WrappedSeekableInputStream(InputStream inputStream) { + super(inputStream); + this.wrappedInputStream = Objects.requireNonNull(inputStream, "InputStream cannot be null"); + } + + @Override + public long getPos() throws IOException { + try { + Method targetMethod = wrappedInputStream.getClass().getDeclaredMethod("getPos"); // + targetMethod.setAccessible(true); + return (long) targetMethod.invoke(wrappedInputStream); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public void seek(long newPos) throws IOException { + try { + Method targetMethod = wrappedInputStream.getClass().getDeclaredMethod("seek", long.class); + targetMethod.setAccessible(true); + targetMethod.invoke(wrappedInputStream, newPos); + } catch (Exception e) { + throw new IOException(e); + } + } +} diff --git a/common/src/main/java/org/apache/comet/vector/CometDelegateVector.java b/common/src/main/java/org/apache/comet/vector/CometDelegateVector.java index 75e62ea0fe..8874d11b70 100644 --- a/common/src/main/java/org/apache/comet/vector/CometDelegateVector.java +++ b/common/src/main/java/org/apache/comet/vector/CometDelegateVector.java @@ -48,7 +48,7 @@ public CometDelegateVector(DataType dataType, CometVector delegate, boolean useD this.delegate = delegate; } - public void setDelegate(CometVector delegate) { + protected void setDelegate(CometVector delegate) { this.delegate = delegate; } From fd9b696e2dc0265f4879492630a1dbe60ff5dd7e Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Wed, 6 Aug 2025 15:46:15 -0700 Subject: [PATCH 2/5] fic ci failure --- .../java/org/apache/comet/parquet/BatchReader.java | 9 ++++++--- .../java/org/apache/comet/parquet/ColumnReader.java | 4 +++- .../org/apache/comet/parquet/ConstantColumnReader.java | 10 ++++++++++ .../org/apache/comet/parquet/MetadataColumnReader.java | 6 +++++- .../main/java/org/apache/comet/parquet/TypeUtil.java | 8 ++++++-- .../src/main/java/org/apache/comet/parquet/Utils.java | 4 +++- 6 files changed, 33 insertions(+), 8 deletions(-) diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java b/common/src/main/java/org/apache/comet/parquet/BatchReader.java index 538de4a66d..edac28ec1b 100644 --- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java @@ -186,7 +186,8 @@ public BatchReader( } /** - * @deprecated since 0.9.1, will be removed in 0.10.0. + * @deprecated since 0.10.0, will be removed in 0.11.0. + * @see Comet Issue #2079 */ public BatchReader(AbstractColumnReader[] columnReaders) { // Todo: set useDecimal128 and useLazyMaterialization @@ -383,14 +384,16 @@ public void init() throws URISyntaxException, IOException { } /** - * @deprecated since 0.9.1, will be removed in 0.10.0. + * @deprecated since 0.10.0, will be removed in 0.11.0. + * @see Comet Issue #2079 */ public void setSparkSchema(StructType schema) { this.sparkSchema = schema; } /** - * @deprecated since 0.9.1, will be removed in 0.10.0. + * @deprecated since 0.10.0, will be removed in 0.11.0. + * @see Comet Issue #2079 */ public AbstractColumnReader[] getColumnReaders() { return columnReaders; diff --git a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java index 06543a91c1..968da1959c 100644 --- a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java @@ -111,8 +111,10 @@ public class ColumnReader extends AbstractColumnReader { * Set the page reader for a new column chunk to read. Expects to call `readBatch` after this. * * @param pageReader the page reader for the new column chunk + * @deprecated since 0.10.0, will be removed in 0.11.0. + * @see Comet Issue #2079 */ - void setPageReader(PageReader pageReader) throws IOException { + public void setPageReader(PageReader pageReader) throws IOException { this.pageReader = pageReader; DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); diff --git a/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java index 6a1754ff15..20ae38e67c 100644 --- a/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java @@ -52,6 +52,16 @@ public class ConstantColumnReader extends MetadataColumnReader { init(values, index); } + /** + * @deprecated since 0.10.0, will be removed in 0.11.0. + * @see Comet Issue #2079 + */ + public ConstantColumnReader( + DataType type, ColumnDescriptor descriptor, Object value, boolean useDecimal128) { + super(type, descriptor, useDecimal128, true); + this.value = value; + } + // Used by Iceberg public ConstantColumnReader( DataType type, ParquetColumnSpec spec, Object value, boolean useDecimal128) { diff --git a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java index 50c8a738e5..6240c8c8c5 100644 --- a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java @@ -42,7 +42,11 @@ public class MetadataColumnReader extends AbstractColumnReader { private boolean isConstant; - MetadataColumnReader( + /** + * @deprecated since 0.10.0, will be made package private in 0.11.0. + * @see Comet Issue #2079 + */ + public MetadataColumnReader( DataType type, ColumnDescriptor descriptor, boolean useDecimal128, boolean isConstant) { // TODO: should we handle legacy dates & timestamps for metadata columns? super(type, descriptor, useDecimal128, false); diff --git a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java index a3e3d481f2..3906218d05 100644 --- a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java +++ b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java @@ -35,8 +35,12 @@ public class TypeUtil { - /** Converts the input Spark 'field' into a Parquet column descriptor. */ - static ColumnDescriptor convertToParquet(StructField field) { + /** + * Converts the input Spark 'field' into a Parquet column descriptor. + * @deprecated since 0.10.0, will be removed in 0.11.0. + * @see Comet Issue #2079 + */ +public static ColumnDescriptor convertToParquet(StructField field) { Type.Repetition repetition; int maxDefinitionLevel; if (field.nullable()) { diff --git a/common/src/main/java/org/apache/comet/parquet/Utils.java b/common/src/main/java/org/apache/comet/parquet/Utils.java index a2aa3b40d0..d29ac2447f 100644 --- a/common/src/main/java/org/apache/comet/parquet/Utils.java +++ b/common/src/main/java/org/apache/comet/parquet/Utils.java @@ -57,8 +57,10 @@ public static ColumnReader getColumnReader( /** * This method is called from Apache Iceberg. * - * @deprecated since 0.9.1, will be removed in 0.10.0; use getColumnReader with ParquetColumnSpec + * @deprecated since 0.10.0, will be removed in 0.11.0; use getColumnReader with ParquetColumnSpec * instead. + * @see Comet Issue #2079 + * */ public static ColumnReader getColumnReader( DataType type, From b1de7c8744ec1dead56d98cdf5d2f81303ba1047 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Wed, 6 Aug 2025 16:23:59 -0700 Subject: [PATCH 3/5] spotless --- .../java/org/apache/comet/parquet/ConstantColumnReader.java | 2 +- common/src/main/java/org/apache/comet/parquet/TypeUtil.java | 3 ++- common/src/main/java/org/apache/comet/parquet/Utils.java | 3 +-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java index 20ae38e67c..b8fc49a175 100644 --- a/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java @@ -57,7 +57,7 @@ public class ConstantColumnReader extends MetadataColumnReader { * @see Comet Issue #2079 */ public ConstantColumnReader( - DataType type, ColumnDescriptor descriptor, Object value, boolean useDecimal128) { + DataType type, ColumnDescriptor descriptor, Object value, boolean useDecimal128) { super(type, descriptor, useDecimal128, true); this.value = value; } diff --git a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java index 3906218d05..889e2baf50 100644 --- a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java +++ b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java @@ -37,10 +37,11 @@ public class TypeUtil { /** * Converts the input Spark 'field' into a Parquet column descriptor. + * * @deprecated since 0.10.0, will be removed in 0.11.0. * @see Comet Issue #2079 */ -public static ColumnDescriptor convertToParquet(StructField field) { + public static ColumnDescriptor convertToParquet(StructField field) { Type.Repetition repetition; int maxDefinitionLevel; if (field.nullable()) { diff --git a/common/src/main/java/org/apache/comet/parquet/Utils.java b/common/src/main/java/org/apache/comet/parquet/Utils.java index d29ac2447f..37f23d59f4 100644 --- a/common/src/main/java/org/apache/comet/parquet/Utils.java +++ b/common/src/main/java/org/apache/comet/parquet/Utils.java @@ -59,8 +59,7 @@ public static ColumnReader getColumnReader( * * @deprecated since 0.10.0, will be removed in 0.11.0; use getColumnReader with ParquetColumnSpec * instead. - * @see Comet Issue #2079 - * + * @see Comet Issue #2079 */ public static ColumnReader getColumnReader( DataType type, From 4171a9ea798227b2ad78337e53bab940ca0a5fe3 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Thu, 7 Aug 2025 11:12:13 -0700 Subject: [PATCH 4/5] fix comments --- .../main/java/org/apache/comet/parquet/WrappedInputFile.java | 5 ++--- .../org/apache/comet/parquet/WrappedSeekableInputStream.java | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/common/src/main/java/org/apache/comet/parquet/WrappedInputFile.java b/common/src/main/java/org/apache/comet/parquet/WrappedInputFile.java index 7896377835..666d4c2e7b 100644 --- a/common/src/main/java/org/apache/comet/parquet/WrappedInputFile.java +++ b/common/src/main/java/org/apache/comet/parquet/WrappedInputFile.java @@ -27,9 +27,8 @@ import org.apache.parquet.io.SeekableInputStream; /** - * A Parquet {@link InputFile} implementation that's similar to {@link - * org.apache.parquet.hadoop.util.HadoopInputFile}, but with optimizations introduced in Hadoop 3.x, - * for S3 specifically. + * Wraps an Object that possibly implements the methods of a Parquet InputFile (but is not a Parquet + * InputFile). Such an object` exists, for instance, in Iceberg's InputFile */ public class WrappedInputFile implements InputFile { Object wrapped; diff --git a/common/src/main/java/org/apache/comet/parquet/WrappedSeekableInputStream.java b/common/src/main/java/org/apache/comet/parquet/WrappedSeekableInputStream.java index fd0151c704..c463617bd6 100644 --- a/common/src/main/java/org/apache/comet/parquet/WrappedSeekableInputStream.java +++ b/common/src/main/java/org/apache/comet/parquet/WrappedSeekableInputStream.java @@ -28,7 +28,7 @@ /** * Wraps an InputStream that possibly implements the methods of a Parquet SeekableInputStream (but - * is not a Parquet SeekableInputStream). Such an InputStream exists, fir instance, in Iceberg's + * is not a Parquet SeekableInputStream). Such an InputStream exists, for instance, in Iceberg's * SeekableInputStream */ public class WrappedSeekableInputStream extends DelegatingSeekableInputStream { From 0ae3c3465400066ad37b328ed6cc957946db6c4a Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Thu, 7 Aug 2025 21:29:45 -0700 Subject: [PATCH 5/5] fix for id --- common/src/main/java/org/apache/comet/parquet/Utils.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/common/src/main/java/org/apache/comet/parquet/Utils.java b/common/src/main/java/org/apache/comet/parquet/Utils.java index 37f23d59f4..7fb2eac5b6 100644 --- a/common/src/main/java/org/apache/comet/parquet/Utils.java +++ b/common/src/main/java/org/apache/comet/parquet/Utils.java @@ -502,8 +502,14 @@ public static ParquetColumnSpec descriptorToParquetColumnSpec(ColumnDescriptor d } } + int id = -1; + Type type = descriptor.getPrimitiveType(); + if (type != null && type.getId() != null) { + id = type.getId().intValue(); + } + return new ParquetColumnSpec( - 1, // ToDo: pass in the correct id + id, path, physicalType, typeLength,