diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 2b2e460ee994..81c6d436baed 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -127,6 +127,7 @@ public class Parquet { private static final Logger LOG = LoggerFactory.getLogger(Parquet.class); + private static final String VECTORIZED_READER_FACTORY = "read.parquet.vectorized-reader.factory"; private Parquet() {} @@ -1378,6 +1379,22 @@ public ReadBuilder setCustomType(int fieldId, Class struct return this; } + /** + * Sets the vectorized reader factory class to use for reading Parquet files. + * + * @param factoryClassName fully qualified class name of the VectorizedParquetReaderFactory + * implementation, or null to use the default reader + * @return this builder for method chaining + */ + public ReadBuilder vectorizedReaderFactory(String factoryClassName) { + if (factoryClassName != null) { + this.properties.put(VECTORIZED_READER_FACTORY, factoryClassName); + } else { + this.properties.remove(VECTORIZED_READER_FACTORY); + } + return this; + } + public ReadBuilder withFileEncryptionKey(ByteBuffer encryptionKey) { this.fileEncryptionKey = encryptionKey; return this; @@ -1389,7 +1406,7 @@ public ReadBuilder withAADPrefix(ByteBuffer aadPrefix) { } @Override - @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"}) + @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity", "MethodLength"}) public CloseableIterable build() { FileDecryptionProperties fileDecryptionProperties = null; if (fileEncryptionKey != null) { @@ -1442,6 +1459,29 @@ public CloseableIterable build() { } if (batchedReaderFunc != null) { + // Try to load custom vectorized reader factory from properties + String factoryName = properties.get(VECTORIZED_READER_FACTORY); + + if (factoryName != null) { + LOG.info("Loading custom vectorized reader factory: {}", factoryName); + VectorizedParquetReaderFactory factory = loadReaderFactory(factoryName); + if (factory != null) { + return factory.createReader( + VectorizedParquetReaderFactory.ReaderParams.builder( + file, schema, options, batchedReaderFunc) + .nameMapping(mapping) + .filter(filter) + .reuseContainers(reuseContainers) + .caseSensitive(caseSensitive) + .maxRecordsPerBatch(maxRecordsPerBatch) + .properties(properties) + .split(start, length) + .encryption(fileEncryptionKey, fileAADPrefix) + .build()); + } + } + + // Fall back to default VectorizedParquetReader return new VectorizedParquetReader<>( file, schema, @@ -1537,6 +1577,21 @@ public CloseableIterable build() { } } + private static VectorizedParquetReaderFactory loadReaderFactory(String className) { + try { + Class factoryClass = Class.forName(className); + if (VectorizedParquetReaderFactory.class.isAssignableFrom(factoryClass)) { + return (VectorizedParquetReaderFactory) factoryClass.getDeclaredConstructor().newInstance(); + } else { + LOG.warn("Class {} does not implement VectorizedParquetReaderFactory interface", className); + return null; + } + } catch (Exception e) { + LOG.warn("Failed to instantiate vectorized reader factory: {}", className, e); + return null; + } + } + private static class ParquetReadBuilder extends ParquetReader.Builder { private Schema schema = null; private ReadSupport readSupport = null; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java index 1fb2372ba568..b82472c0639a 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java @@ -43,7 +43,7 @@ * * @param type of value to read */ -class ReadConf { +public class ReadConf { private final ParquetFileReader reader; private final InputFile file; private final ParquetReadOptions options; @@ -60,7 +60,7 @@ class ReadConf { private final List> columnChunkMetaDataForRowGroups; @SuppressWarnings("unchecked") - ReadConf( + public ReadConf( InputFile file, ParquetReadOptions options, Schema expectedSchema, @@ -146,7 +146,7 @@ private ReadConf(ReadConf toCopy) { this.columnChunkMetaDataForRowGroups = toCopy.columnChunkMetaDataForRowGroups; } - ParquetFileReader reader() { + public ParquetFileReader reader() { if (reader != null) { reader.setRequestedSchema(projection); return reader; @@ -157,35 +157,43 @@ ParquetFileReader reader() { return newReader; } - ParquetValueReader model() { + public InputFile file() { + return file; + } + + public MessageType projection() { + return projection; + } + + public ParquetValueReader model() { return model; } - VectorizedReader vectorizedModel() { + public VectorizedReader vectorizedModel() { return vectorizedModel; } - boolean[] shouldSkip() { + public boolean[] shouldSkip() { return shouldSkip; } - long totalValues() { + public long totalValues() { return totalValues; } - boolean reuseContainers() { + public boolean reuseContainers() { return reuseContainers; } - Integer batchSize() { + public Integer batchSize() { return batchSize; } - List> columnChunkMetadataForRowGroups() { + public List> columnChunkMetadataForRowGroups() { return columnChunkMetaDataForRowGroups; } - ReadConf copy() { + public ReadConf copy() { return new ReadConf<>(this); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReaderFactory.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReaderFactory.java new file mode 100644 index 000000000000..6ea4e2b81408 --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReaderFactory.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.function.Function; +import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.schema.MessageType; + +/** + * Interface for creating custom vectorized Parquet readers. + * + *

Implementations of this interface are loaded at runtime via reflection by specifying the fully + * qualified class name in the {@code read.parquet.vectorized-reader.factory} configuration + * property. + * + *

This allows for pluggable vectorized reader implementations (e.g., Comet, Arrow, Velox) + * without requiring the core parquet module to depend on specific execution engines. + */ +public interface VectorizedParquetReaderFactory { + + /** + * Returns the unique identifier for this reader factory. + * + *

This name is used to select the reader factory via configuration. For example, "comet" for + * the Comet vectorized reader. + * + * @return the unique name for this factory + */ + String name(); + + /** + * Creates a vectorized parquet reader with the given configuration. + * + * @param params reader parameters encapsulating all configuration options + * @param the type of records returned by the reader + * @return a closeable iterable of records + */ + CloseableIterable createReader(ReaderParams params); + + /** Parameters for creating a vectorized parquet reader. */ + interface ReaderParams { + InputFile file(); + + Schema schema(); + + ParquetReadOptions options(); + + Function> batchedReaderFunc(); + + NameMapping mapping(); + + Expression filter(); + + boolean reuseContainers(); + + boolean caseSensitive(); + + int maxRecordsPerBatch(); + + Map properties(); + + Long start(); + + Long length(); + + ByteBuffer fileEncryptionKey(); + + ByteBuffer fileAADPrefix(); + + static Builder builder( + InputFile file, + Schema schema, + ParquetReadOptions options, + Function> batchedReaderFunc) { + return new Builder(file, schema, options, batchedReaderFunc); + } + + /** Builder for ReaderParams. */ + class Builder { + private final InputFile file; + private final Schema schema; + private final ParquetReadOptions options; + private final Function> batchedReaderFunc; + private NameMapping mapping = null; + private Expression filter = null; + private boolean reuseContainers = false; + private boolean caseSensitive = true; + private int maxRecordsPerBatch = 10000; + private Map properties = null; + private Long start = null; + private Long length = null; + private ByteBuffer fileEncryptionKey = null; + private ByteBuffer fileAADPrefix = null; + + private Builder( + InputFile file, + Schema schema, + ParquetReadOptions options, + Function> batchedReaderFunc) { + this.file = file; + this.schema = schema; + this.options = options; + this.batchedReaderFunc = batchedReaderFunc; + } + + public Builder nameMapping(NameMapping nameMapping) { + this.mapping = nameMapping; + return this; + } + + public Builder filter(Expression filterExpr) { + this.filter = filterExpr; + return this; + } + + public Builder reuseContainers(boolean reuse) { + this.reuseContainers = reuse; + return this; + } + + public Builder caseSensitive(boolean sensitive) { + this.caseSensitive = sensitive; + return this; + } + + public Builder maxRecordsPerBatch(int maxRecords) { + this.maxRecordsPerBatch = maxRecords; + return this; + } + + public Builder properties(Map props) { + this.properties = props; + return this; + } + + public Builder split(Long splitStart, Long splitLength) { + this.start = splitStart; + this.length = splitLength; + return this; + } + + public Builder encryption(ByteBuffer encryptionKey, ByteBuffer aadPrefix) { + this.fileEncryptionKey = encryptionKey; + this.fileAADPrefix = aadPrefix; + return this; + } + + public ReaderParams build() { + return new ReaderParamsImpl( + file, + schema, + options, + batchedReaderFunc, + mapping, + filter, + reuseContainers, + caseSensitive, + maxRecordsPerBatch, + properties, + start, + length, + fileEncryptionKey, + fileAADPrefix); + } + } + } + + /** Implementation of ReaderParams. */ + class ReaderParamsImpl implements ReaderParams { + private final InputFile file; + private final Schema schema; + private final ParquetReadOptions options; + private final Function> batchedReaderFunc; + private final NameMapping mapping; + private final Expression filter; + private final boolean reuseContainers; + private final boolean caseSensitive; + private final int maxRecordsPerBatch; + private final Map properties; + private final Long start; + private final Long length; + private final ByteBuffer fileEncryptionKey; + private final ByteBuffer fileAADPrefix; + + private ReaderParamsImpl( + InputFile file, + Schema schema, + ParquetReadOptions options, + Function> batchedReaderFunc, + NameMapping mapping, + Expression filter, + boolean reuseContainers, + boolean caseSensitive, + int maxRecordsPerBatch, + Map properties, + Long start, + Long length, + ByteBuffer fileEncryptionKey, + ByteBuffer fileAADPrefix) { + this.file = file; + this.schema = schema; + this.options = options; + this.batchedReaderFunc = batchedReaderFunc; + this.mapping = mapping; + this.filter = filter; + this.reuseContainers = reuseContainers; + this.caseSensitive = caseSensitive; + this.maxRecordsPerBatch = maxRecordsPerBatch; + this.properties = properties; + this.start = start; + this.length = length; + this.fileEncryptionKey = fileEncryptionKey; + this.fileAADPrefix = fileAADPrefix; + } + + @Override + public InputFile file() { + return file; + } + + @Override + public Schema schema() { + return schema; + } + + @Override + public ParquetReadOptions options() { + return options; + } + + @Override + public Function> batchedReaderFunc() { + return batchedReaderFunc; + } + + @Override + public NameMapping mapping() { + return mapping; + } + + @Override + public Expression filter() { + return filter; + } + + @Override + public boolean reuseContainers() { + return reuseContainers; + } + + @Override + public boolean caseSensitive() { + return caseSensitive; + } + + @Override + public int maxRecordsPerBatch() { + return maxRecordsPerBatch; + } + + @Override + public Map properties() { + return properties; + } + + @Override + public Long start() { + return start; + } + + @Override + public Long length() { + return length; + } + + @Override + public ByteBuffer fileEncryptionKey() { + return fileEncryptionKey; + } + + @Override + public ByteBuffer fileAADPrefix() { + return fileAADPrefix; + } + } +} diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java index 58850ec7c9f4..d31e844ae993 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java @@ -36,6 +36,7 @@ import java.nio.ByteBuffer; import java.nio.file.Path; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.function.Function; import java.util.stream.Stream; @@ -314,6 +315,177 @@ public void testFooterMetricsWithNameMappingForFileWithoutIds() throws IOExcepti } } + @Test + public void testVectorizedReaderFactoryConfiguration() throws IOException { + Schema schema = new Schema(optional(1, "intCol", IntegerType.get())); + File file = createTempFile(temp); + + // Write test data + List records = Lists.newArrayList(); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + GenericData.Record record = new GenericData.Record(avroSchema); + record.put("intCol", 42); + records.add(record); + + write(file, schema, Collections.emptyMap(), null, records.toArray(new GenericData.Record[] {})); + + // Reset the flag + TestMockVectorizedReaderFactory.wasCalled = false; + + // Test setting vectorized reader factory + Parquet.ReadBuilder readBuilder = + Parquet.read(Files.localInput(file)) + .project(schema) + .createBatchedReaderFunc(fileSchema -> new MockVectorizedReader()) + .vectorizedReaderFactory(MockVectorizedReaderFactory.class.getName()); + + // We can't easily verify the property directly since it's private, + // but we can verify the build succeeds + readBuilder.build().iterator(); // Should not throw + + // Verify our mock factory was NOT used (because MockVectorizedReaderFactory is not a valid + // factory) + assertThat(TestMockVectorizedReaderFactory.wasCalled) + .as("TestMockVectorizedReaderFactory should not have been called") + .isFalse(); + } + + @Test + public void testVectorizedReaderFactoryRemoveWithNull() throws IOException { + Schema schema = new Schema(optional(1, "intCol", IntegerType.get())); + File file = createTempFile(temp); + + // Write test data + List records = Lists.newArrayList(); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + GenericData.Record record = new GenericData.Record(avroSchema); + record.put("intCol", 42); + records.add(record); + + write(file, schema, Collections.emptyMap(), null, records.toArray(new GenericData.Record[] {})); + + // Test removing vectorized reader factory with null + Parquet.ReadBuilder readBuilder = + Parquet.read(Files.localInput(file)) + .project(schema) + .createBatchedReaderFunc(fileSchema -> new MockVectorizedReader()) + .vectorizedReaderFactory(MockVectorizedReaderFactory.class.getName()) + .vectorizedReaderFactory(null); // Remove it + + // Build should succeed and use default reader + readBuilder.build().iterator(); // Should not throw + } + + @Test + public void testVectorizedReaderFactoryMissingClass() throws IOException { + Schema schema = new Schema(optional(1, "intCol", IntegerType.get())); + File file = createTempFile(temp); + + // Write test data + List records = Lists.newArrayList(); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + GenericData.Record record = new GenericData.Record(avroSchema); + record.put("intCol", 42); + records.add(record); + + write(file, schema, Collections.emptyMap(), null, records.toArray(new GenericData.Record[] {})); + + // Test with non-existent class - should fall back to default reader + Parquet.ReadBuilder readBuilder = + Parquet.read(Files.localInput(file)) + .project(schema) + .createBatchedReaderFunc(fileSchema -> new MockVectorizedReader()) + .vectorizedReaderFactory("com.example.NonExistentFactory"); + + // Should not throw - falls back to default reader + readBuilder.build().iterator(); + } + + @Test + public void testVectorizedReaderFactoryInvalidClass() throws IOException { + Schema schema = new Schema(optional(1, "intCol", IntegerType.get())); + File file = createTempFile(temp); + + // Write test data + List records = Lists.newArrayList(); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + GenericData.Record record = new GenericData.Record(avroSchema); + record.put("intCol", 42); + records.add(record); + + write(file, schema, Collections.emptyMap(), null, records.toArray(new GenericData.Record[] {})); + + // Test with a class that doesn't implement VectorizedParquetReaderFactory + Parquet.ReadBuilder readBuilder = + Parquet.read(Files.localInput(file)) + .project(schema) + .createBatchedReaderFunc(fileSchema -> new MockVectorizedReader()) + .vectorizedReaderFactory(InvalidReaderFactory.class.getName()); + + // Should not throw - falls back to default reader + readBuilder.build().iterator(); + } + + @Test + public void testVectorizedReaderFactoryNoDefaultConstructor() throws IOException { + Schema schema = new Schema(optional(1, "intCol", IntegerType.get())); + File file = createTempFile(temp); + + // Write test data + List records = Lists.newArrayList(); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + GenericData.Record record = new GenericData.Record(avroSchema); + record.put("intCol", 42); + records.add(record); + + write(file, schema, Collections.emptyMap(), null, records.toArray(new GenericData.Record[] {})); + + // Test with a class that has no default constructor + Parquet.ReadBuilder readBuilder = + Parquet.read(Files.localInput(file)) + .project(schema) + .createBatchedReaderFunc(fileSchema -> new MockVectorizedReader()) + .vectorizedReaderFactory(NoDefaultConstructorFactory.class.getName()); + + // Should not throw - falls back to default reader + readBuilder.build().iterator(); + } + + @Test + public void testVectorizedReaderFactorySuccessfulLoad() throws IOException { + Schema schema = new Schema(optional(1, "intCol", IntegerType.get())); + File file = createTempFile(temp); + + // Write test data + List records = Lists.newArrayList(); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + GenericData.Record record = new GenericData.Record(avroSchema); + record.put("intCol", 42); + records.add(record); + + write(file, schema, Collections.emptyMap(), null, records.toArray(new GenericData.Record[] {})); + + // Reset the flag + TestMockVectorizedReaderFactory.wasCalled = false; + + // Test successful factory loading + Parquet.ReadBuilder readBuilder = + Parquet.read(Files.localInput(file)) + .project(schema) + .createBatchedReaderFunc(fileSchema -> new MockVectorizedReader()) + .vectorizedReaderFactory(TestMockVectorizedReaderFactory.class.getName()); + + // Build and consume the reader + Iterator iterator = readBuilder.build().iterator(); + assertThat(iterator.hasNext()).isTrue(); + iterator.next(); + + // Verify our mock factory was actually used + assertThat(TestMockVectorizedReaderFactory.wasCalled) + .as("Mock factory should have been called") + .isTrue(); + } + private Pair generateFile( Function> createWriterFunc, int desiredRecordCount, @@ -354,4 +526,74 @@ private Pair generateFile( records.toArray(new GenericData.Record[] {})); return Pair.of(file, size); } + + // Test helper classes + + /** A mock VectorizedReader for testing. */ + public static class MockVectorizedReader implements VectorizedReader { + @Override + public Object read(Object reuse, int numRows) { + return null; + } + + @Override + public void setBatchSize(int batchSize) { + // No-op + } + + @Override + public void close() { + // No-op + } + } + + /** A mock factory class that implements VectorizedParquetReaderFactory for testing. */ + public static class TestMockVectorizedReaderFactory implements VectorizedParquetReaderFactory { + static boolean wasCalled = false; + + @Override + public String name() { + return "test-mock"; + } + + @Override + @SuppressWarnings("unchecked") + public org.apache.iceberg.io.CloseableIterable createReader(ReaderParams params) { + wasCalled = true; + // Return a simple iterable that provides the mock data + GenericData.Record record = + new GenericData.Record(AvroSchemaUtil.convert(params.schema().asStruct(), "table")); + record.put(0, 42); + return (org.apache.iceberg.io.CloseableIterable) + org.apache.iceberg.io.CloseableIterable.withNoopClose(Collections.singletonList(record)); + } + } + + /** A mock factory class without implementing the interface. */ + public static class InvalidReaderFactory { + public InvalidReaderFactory() {} + + public String name() { + return "invalid"; + } + } + + /** A mock factory class with no default constructor. */ + public static class NoDefaultConstructorFactory implements VectorizedParquetReaderFactory { + @SuppressWarnings("unused") + public NoDefaultConstructorFactory(String unusedParam) {} + + @Override + public String name() { + return "no-default"; + } + + @Override + public org.apache.iceberg.io.CloseableIterable createReader(ReaderParams params) { + return null; + } + } + + /** A simple reference class that can be loaded. */ + public static class MockVectorizedReaderFactory {} } diff --git a/spark/v3.4/build.gradle b/spark/v3.4/build.gradle index 69baf216c3cc..c4e59f1ec12c 100644 --- a/spark/v3.4/build.gradle +++ b/spark/v3.4/build.gradle @@ -265,6 +265,8 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') integrationImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') integrationImplementation project(path: ":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') + integrationImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:${libs.versions.comet.get()}" + // runtime dependencies for running Hive Catalog based integration test integrationRuntimeOnly project(':iceberg-hive-metastore') diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ParquetBatchReadConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ParquetBatchReadConf.java index 442d728d4d69..dc84485e17f4 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ParquetBatchReadConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ParquetBatchReadConf.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark; import java.io.Serializable; +import java.util.Optional; import org.immutables.value.Value; @Value.Immutable @@ -26,4 +27,6 @@ public interface ParquetBatchReadConf extends Serializable { int batchSize(); ParquetReaderType readerType(); + + Optional factoryClassName(); } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index dd7e2c20c1b9..62465f46f435 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -379,4 +379,11 @@ public ParquetReaderType parquetReaderType() { .defaultValue(SparkSQLProperties.PARQUET_READER_TYPE_DEFAULT) .parse(); } + + public String parquetVectorizedReaderFactory() { + return confParser + .stringConf() + .sessionConf(SparkSQLProperties.PARQUET_VECTORIZED_READER_FACTORY) + .parseOptional(); + } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index fa7d4a4b185a..abeb29da3a30 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -30,6 +30,13 @@ private SparkSQLProperties() {} // Controls which Parquet reader implementation to use public static final String PARQUET_READER_TYPE = "spark.sql.iceberg.parquet.reader-type"; public static final ParquetReaderType PARQUET_READER_TYPE_DEFAULT = ParquetReaderType.ICEBERG; + // Controls the fully qualified class name of the vectorized Parquet reader factory + public static final String PARQUET_VECTORIZED_READER_FACTORY = + "spark.sql.iceberg.parquet.vectorized-reader.factory"; + + // Comet vectorized reader factory class name + public static final String COMET_VECTORIZED_READER_FACTORY_CLASS = + "org.apache.iceberg.spark.parquet.CometVectorizedParquetReaderFactory"; // Controls whether reading/writing timestamps without timezones is allowed @Deprecated diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java index 81b7d83a7077..d6350af7a273 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java @@ -19,18 +19,22 @@ package org.apache.iceberg.spark.data.vectorized; import java.io.IOException; +import org.apache.comet.CometConf; import org.apache.comet.CometSchemaImporter; import org.apache.comet.parquet.AbstractColumnReader; import org.apache.comet.parquet.ColumnReader; +import org.apache.comet.parquet.ParquetColumnSpec; +import org.apache.comet.parquet.RowGroupReader; import org.apache.comet.parquet.TypeUtil; import org.apache.comet.parquet.Utils; import org.apache.comet.shaded.arrow.memory.RootAllocator; import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.parquet.CometTypeUtils; import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; @@ -42,23 +46,28 @@ class CometColumnReader implements VectorizedReader { private final ColumnDescriptor descriptor; private final DataType sparkType; + private final int fieldId; // The delegated ColumnReader from Comet side private AbstractColumnReader delegate; private boolean initialized = false; private int batchSize = DEFAULT_BATCH_SIZE; private CometSchemaImporter importer; + private ParquetColumnSpec spec; - CometColumnReader(DataType sparkType, ColumnDescriptor descriptor) { + CometColumnReader(DataType sparkType, ColumnDescriptor descriptor, int fieldId) { this.sparkType = sparkType; this.descriptor = descriptor; + this.fieldId = fieldId; } CometColumnReader(Types.NestedField field) { DataType dataType = SparkSchemaUtil.convert(field.type()); StructField structField = new StructField(field.name(), dataType, false, Metadata.empty()); this.sparkType = dataType; - this.descriptor = TypeUtil.convertToParquet(structField); + this.descriptor = + CometTypeUtils.buildColumnDescriptor(TypeUtil.convertToParquetSpec(structField)); + this.fieldId = field.fieldId(); } public AbstractColumnReader delegate() { @@ -92,7 +101,26 @@ public void reset() { } this.importer = new CometSchemaImporter(new RootAllocator()); - this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false); + + spec = CometTypeUtils.descriptorToParquetColumnSpec(descriptor); + + boolean useLegacyTime = + Boolean.parseBoolean( + SQLConf.get() + .getConfString( + CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP().key(), "false")); + boolean useLazyMaterialization = + Boolean.parseBoolean( + SQLConf.get().getConfString(CometConf.COMET_USE_LAZY_MATERIALIZATION().key(), "false")); + this.delegate = + Utils.getColumnReader( + sparkType, + spec, + importer, + batchSize, + true, // Comet sets this to true for native execution + useLazyMaterialization, + useLegacyTime); this.initialized = true; } @@ -111,9 +139,9 @@ public DataType sparkType() { *

NOTE: this should be called before reading a new Parquet column chunk, and after {@link * CometColumnReader#reset} is called. */ - public void setPageReader(PageReader pageReader) throws IOException { + public void setPageReader(RowGroupReader pageStore) throws IOException { Preconditions.checkState(initialized, "Invalid state: 'reset' should be called first"); - ((ColumnReader) delegate).setPageReader(pageReader); + ((ColumnReader) delegate).setRowGroupReader(pageStore, spec); } @Override diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java index 3d3e9aca24de..8b11293684e0 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java @@ -23,7 +23,8 @@ import java.util.List; import java.util.Map; import org.apache.comet.parquet.AbstractColumnReader; -import org.apache.comet.parquet.BatchReader; +import org.apache.comet.parquet.IcebergCometBatchReader; +import org.apache.comet.parquet.RowGroupReader; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -51,15 +52,14 @@ class CometColumnarBatchReader implements VectorizedReader { // calling BatchReader.nextBatch, the isDeleted value is not yet available, so // DeleteColumnReader.readBatch must be called explicitly later, after the isDeleted value is // available. - private final BatchReader delegate; + private final IcebergCometBatchReader delegate; CometColumnarBatchReader(List> readers, Schema schema) { this.readers = readers.stream().map(CometColumnReader.class::cast).toArray(CometColumnReader[]::new); AbstractColumnReader[] abstractColumnReaders = new AbstractColumnReader[readers.size()]; - this.delegate = new BatchReader(abstractColumnReaders); - delegate.setSparkSchema(SparkSchemaUtil.convert(schema)); + this.delegate = new IcebergCometBatchReader(readers.size(), SparkSchemaUtil.convert(schema)); } @Override @@ -71,16 +71,18 @@ public void setRowGroupInfo( && !(readers[i] instanceof CometPositionColumnReader) && !(readers[i] instanceof CometDeleteColumnReader)) { readers[i].reset(); - readers[i].setPageReader(pageStore.getPageReader(readers[i].descriptor())); + readers[i].setPageReader((RowGroupReader) pageStore); } } catch (IOException e) { throw new UncheckedIOException("Failed to setRowGroupInfo for Comet vectorization", e); } } + AbstractColumnReader[] delegateReaders = new AbstractColumnReader[readers.length]; for (int i = 0; i < readers.length; i++) { - delegate.getColumnReaders()[i] = this.readers[i].delegate(); + delegateReaders[i] = readers[i].delegate(); } + delegate.init(delegateReaders); } @Override diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java index c665002e8f66..76e3c2895558 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java @@ -21,6 +21,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import org.apache.comet.parquet.ConstantColumnReader; +import org.apache.iceberg.spark.parquet.CometTypeUtils; import org.apache.iceberg.types.Types; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; @@ -34,7 +35,11 @@ class CometConstantColumnReader extends CometColumnReader { super(field); // use delegate to set constant value on the native side to be consumed by native execution. setDelegate( - new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), false)); + new ConstantColumnReader( + sparkType(), + CometTypeUtils.descriptorToParquetColumnSpec(descriptor()), + convertToSparkValue(value), + false)); } @Override diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java index 26219014f777..a3aa3519c6d4 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java @@ -50,7 +50,7 @@ private static class DeleteColumnReader extends MetadataColumnReader { DeleteColumnReader(boolean[] isDeleted) { super( DataTypes.BooleanType, - TypeUtil.convertToParquet( + TypeUtil.convertToParquetSpec( new StructField("_deleted", DataTypes.BooleanType, false, Metadata.empty())), false /* useDecimal128 = false */, false /* isConstant = false */); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java index 1949a717982a..4c22edbb857e 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java @@ -20,6 +20,7 @@ import org.apache.comet.parquet.MetadataColumnReader; import org.apache.comet.parquet.Native; +import org.apache.iceberg.spark.parquet.CometTypeUtils; import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; import org.apache.spark.sql.types.DataTypes; @@ -44,7 +45,7 @@ private static class PositionColumnReader extends MetadataColumnReader { PositionColumnReader(ColumnDescriptor descriptor) { super( DataTypes.LongType, - descriptor, + CometTypeUtils.descriptorToParquetColumnSpec(descriptor), false /* useDecimal128 = false */, false /* isConstant */); } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java index 779dc240d4f6..a509258f1f92 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java @@ -133,6 +133,7 @@ public VectorizedReader primitive( return null; } - return new CometColumnReader(SparkSchemaUtil.convert(icebergField.type()), desc); + return new CometColumnReader( + SparkSchemaUtil.convert(icebergField.type()), desc, parquetFieldId); } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/parquet/CometTypeUtils.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/parquet/CometTypeUtils.java new file mode 100644 index 000000000000..b94058842510 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/parquet/CometTypeUtils.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.parquet; + +import java.util.Map; +import org.apache.comet.parquet.ParquetColumnSpec; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; + +public class CometTypeUtils { + + private CometTypeUtils() {} + + public static ParquetColumnSpec descriptorToParquetColumnSpec(ColumnDescriptor descriptor) { + + String[] path = descriptor.getPath(); + PrimitiveType primitiveType = descriptor.getPrimitiveType(); + String physicalType = primitiveType.getPrimitiveTypeName().name(); + + int typeLength = + primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY + ? primitiveType.getTypeLength() + : 0; + + boolean isRepeated = primitiveType.getRepetition() == Type.Repetition.REPEATED; + + String logicalTypeName = null; + Map logicalTypeParams = Maps.newHashMap(); + LogicalTypeAnnotation logicalType = primitiveType.getLogicalTypeAnnotation(); + + if (logicalType != null) { + logicalTypeName = logicalType.getClass().getSimpleName(); + + // Handle specific logical types + if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal = + (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("precision", String.valueOf(decimal.getPrecision())); + logicalTypeParams.put("scale", String.valueOf(decimal.getScale())); + } else if (logicalType instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamp = + (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isAdjustedToUTC", String.valueOf(timestamp.isAdjustedToUTC())); + logicalTypeParams.put("unit", timestamp.getUnit().name()); + } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimeLogicalTypeAnnotation time = + (LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isAdjustedToUTC", String.valueOf(time.isAdjustedToUTC())); + logicalTypeParams.put("unit", time.getUnit().name()); + } else if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) { + LogicalTypeAnnotation.IntLogicalTypeAnnotation intType = + (LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isSigned", String.valueOf(intType.isSigned())); + logicalTypeParams.put("bitWidth", String.valueOf(intType.getBitWidth())); + } + } + + int id = -1; + Type type = descriptor.getPrimitiveType(); + if (type != null && type.getId() != null) { + id = type.getId().intValue(); + } + + return new ParquetColumnSpec( + id, + path, + physicalType, + typeLength, + isRepeated, + descriptor.getMaxDefinitionLevel(), + descriptor.getMaxRepetitionLevel(), + logicalTypeName, + logicalTypeParams); + } + + public static ColumnDescriptor buildColumnDescriptor(ParquetColumnSpec columnSpec) { + PrimitiveType.PrimitiveTypeName primType = + PrimitiveType.PrimitiveTypeName.valueOf(columnSpec.getPhysicalType()); + + Type.Repetition repetition; + if (columnSpec.getMaxRepetitionLevel() > 0) { + repetition = Type.Repetition.REPEATED; + } else if (columnSpec.getMaxDefinitionLevel() > 0) { + repetition = Type.Repetition.OPTIONAL; + } else { + repetition = Type.Repetition.REQUIRED; + } + + String name = columnSpec.getPath()[columnSpec.getPath().length - 1]; + // Reconstruct the logical type from parameters + LogicalTypeAnnotation logicalType = null; + if (columnSpec.getLogicalTypeName() != null) { + logicalType = + reconstructLogicalType( + columnSpec.getLogicalTypeName(), columnSpec.getLogicalTypeParams()); + } + + PrimitiveType primitiveType; + if (primType == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { + primitiveType = + Types.primitive(primType, repetition) + .length(columnSpec.getTypeLength()) + .as(logicalType) + .id(columnSpec.getFieldId()) + .named(name); + } else { + primitiveType = + Types.primitive(primType, repetition) + .as(logicalType) + .id(columnSpec.getFieldId()) + .named(name); + } + + return new ColumnDescriptor( + columnSpec.getPath(), + primitiveType, + columnSpec.getMaxRepetitionLevel(), + columnSpec.getMaxDefinitionLevel()); + } + + private static LogicalTypeAnnotation reconstructLogicalType( + String logicalTypeName, Map params) { + + switch (logicalTypeName) { + // MAP + case "MapLogicalTypeAnnotation": + return LogicalTypeAnnotation.mapType(); + + // LIST + case "ListLogicalTypeAnnotation": + return LogicalTypeAnnotation.listType(); + + // STRING + case "StringLogicalTypeAnnotation": + return LogicalTypeAnnotation.stringType(); + + // MAP_KEY_VALUE + case "MapKeyValueLogicalTypeAnnotation": + return LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance(); + + // ENUM + case "EnumLogicalTypeAnnotation": + return LogicalTypeAnnotation.enumType(); + + // DECIMAL + case "DecimalLogicalTypeAnnotation": + if (!params.containsKey("scale") || !params.containsKey("precision")) { + throw new IllegalArgumentException( + "Missing required parameters for DecimalLogicalTypeAnnotation: " + params); + } + int scale = Integer.parseInt(params.get("scale")); + int precision = Integer.parseInt(params.get("precision")); + return LogicalTypeAnnotation.decimalType(scale, precision); + + // DATE + case "DateLogicalTypeAnnotation": + return LogicalTypeAnnotation.dateType(); + + // TIME + case "TimeLogicalTypeAnnotation": + if (!params.containsKey("isAdjustedToUTC") || !params.containsKey("unit")) { + throw new IllegalArgumentException( + "Missing required parameters for TimeLogicalTypeAnnotation: " + params); + } + + boolean isUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC")); + String timeUnitStr = params.get("unit"); + + LogicalTypeAnnotation.TimeUnit timeUnit; + switch (timeUnitStr) { + case "MILLIS": + timeUnit = LogicalTypeAnnotation.TimeUnit.MILLIS; + break; + case "MICROS": + timeUnit = LogicalTypeAnnotation.TimeUnit.MICROS; + break; + case "NANOS": + timeUnit = LogicalTypeAnnotation.TimeUnit.NANOS; + break; + default: + throw new IllegalArgumentException("Unknown time unit: " + timeUnitStr); + } + return LogicalTypeAnnotation.timeType(isUTC, timeUnit); + + // TIMESTAMP + case "TimestampLogicalTypeAnnotation": + if (!params.containsKey("isAdjustedToUTC") || !params.containsKey("unit")) { + throw new IllegalArgumentException( + "Missing required parameters for TimestampLogicalTypeAnnotation: " + params); + } + boolean isAdjustedToUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC")); + String unitStr = params.get("unit"); + + LogicalTypeAnnotation.TimeUnit unit; + switch (unitStr) { + case "MILLIS": + unit = LogicalTypeAnnotation.TimeUnit.MILLIS; + break; + case "MICROS": + unit = LogicalTypeAnnotation.TimeUnit.MICROS; + break; + case "NANOS": + unit = LogicalTypeAnnotation.TimeUnit.NANOS; + break; + default: + throw new IllegalArgumentException("Unknown timestamp unit: " + unitStr); + } + return LogicalTypeAnnotation.timestampType(isAdjustedToUTC, unit); + + // INTEGER + case "IntLogicalTypeAnnotation": + if (!params.containsKey("isSigned") || !params.containsKey("bitWidth")) { + throw new IllegalArgumentException( + "Missing required parameters for IntLogicalTypeAnnotation: " + params); + } + boolean isSigned = Boolean.parseBoolean(params.get("isSigned")); + int bitWidth = Integer.parseInt(params.get("bitWidth")); + return LogicalTypeAnnotation.intType(bitWidth, isSigned); + + // JSON + case "JsonLogicalTypeAnnotation": + return LogicalTypeAnnotation.jsonType(); + + // BSON + case "BsonLogicalTypeAnnotation": + return LogicalTypeAnnotation.bsonType(); + + // UUID + case "UUIDLogicalTypeAnnotation": + return LogicalTypeAnnotation.uuidType(); + + // INTERVAL + case "IntervalLogicalTypeAnnotation": + return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance(); + + default: + throw new IllegalArgumentException("Unknown logical type: " + logicalTypeName); + } + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/parquet/CometVectorizedParquetReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/parquet/CometVectorizedParquetReader.java new file mode 100644 index 000000000000..dee3fab05ff6 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/parquet/CometVectorizedParquetReader.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.parquet; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.function.Function; +import org.apache.comet.parquet.FileReader; +import org.apache.comet.parquet.ParquetColumnSpec; +import org.apache.comet.parquet.ReadOptions; +import org.apache.comet.parquet.RowGroupReader; +import org.apache.comet.parquet.WrappedInputFile; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Schema; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableGroup; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.parquet.ReadConf; +import org.apache.iceberg.parquet.VectorizedReader; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.ByteBuffers; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.schema.MessageType; + +public class CometVectorizedParquetReader extends CloseableGroup + implements CloseableIterable { + private final InputFile input; + private final ParquetReadOptions options; + private final Schema expectedSchema; + private final Function> batchReaderFunc; + private final Expression filter; + private final boolean reuseContainers; + private final boolean caseSensitive; + private final int batchSize; + private final NameMapping nameMapping; + private final Map properties; + private final Long start; + private final Long length; + private final ByteBuffer fileEncryptionKey; + private final ByteBuffer fileAADPrefix; + + private CometVectorizedParquetReader( + InputFile input, + Schema expectedSchema, + ParquetReadOptions options, + Function> readerFunc, + NameMapping nameMapping, + Expression filter, + boolean reuseContainers, + boolean caseSensitive, + int maxRecordsPerBatch, + Map properties, + Long start, + Long length, + ByteBuffer fileEncryptionKey, + ByteBuffer fileAADPrefix) { + this.input = input; + this.expectedSchema = expectedSchema; + this.options = options; + this.batchReaderFunc = readerFunc; + // replace alwaysTrue with null to avoid extra work evaluating a trivial filter + this.filter = filter == Expressions.alwaysTrue() ? null : filter; + this.reuseContainers = reuseContainers; + this.caseSensitive = caseSensitive; + this.batchSize = maxRecordsPerBatch; + this.nameMapping = nameMapping; + this.properties = properties; + this.start = start; + this.length = length; + this.fileEncryptionKey = fileEncryptionKey; + this.fileAADPrefix = fileAADPrefix; + } + + public static Builder builder( + InputFile file, + Schema schema, + ParquetReadOptions options, + Function> batchedReaderFunc) { + return new Builder(file, schema, options, batchedReaderFunc); + } + + public static class Builder { + private final InputFile file; + private final Schema schema; + private final ParquetReadOptions options; + private final Function> batchedReaderFunc; + private NameMapping nameMapping = null; + private Expression filter = null; + private boolean reuseContainers = false; + private boolean caseSensitive = true; + private int maxRecordsPerBatch = 10000; + private Map properties = null; + private Long start = null; + private Long length = null; + private ByteBuffer fileEncryptionKey = null; + private ByteBuffer fileAADPrefix = null; + + private Builder( + InputFile file, + Schema schema, + ParquetReadOptions options, + Function> batchedReaderFunc) { + this.file = file; + this.schema = schema; + this.options = options; + this.batchedReaderFunc = batchedReaderFunc; + } + + public Builder nameMapping(NameMapping mapping) { + this.nameMapping = mapping; + return this; + } + + public Builder filter(Expression filterExpr) { + this.filter = filterExpr; + return this; + } + + public Builder reuseContainers(boolean reuse) { + this.reuseContainers = reuse; + return this; + } + + public Builder caseSensitive(boolean sensitive) { + this.caseSensitive = sensitive; + return this; + } + + public Builder maxRecordsPerBatch(int maxRecords) { + this.maxRecordsPerBatch = maxRecords; + return this; + } + + public Builder properties(Map props) { + this.properties = props; + return this; + } + + public Builder split(Long splitStart, Long splitLength) { + this.start = splitStart; + this.length = splitLength; + return this; + } + + public Builder encryption(ByteBuffer encryptionKey, ByteBuffer aadPrefix) { + this.fileEncryptionKey = encryptionKey; + this.fileAADPrefix = aadPrefix; + return this; + } + + public CometVectorizedParquetReader build() { + return new CometVectorizedParquetReader<>( + file, + schema, + options, + batchedReaderFunc, + nameMapping, + filter, + reuseContainers, + caseSensitive, + maxRecordsPerBatch, + properties, + start, + length, + fileEncryptionKey, + fileAADPrefix); + } + } + + private ReadConf conf = null; + + private ReadConf init() { + if (conf == null) { + ReadConf readConf = + new ReadConf( + input, + options, + expectedSchema, + filter, + null, + batchReaderFunc, + nameMapping, + reuseContainers, + caseSensitive, + batchSize); + this.conf = readConf.copy(); + return readConf; + } + return conf; + } + + @Override + public CloseableIterator iterator() { + FileIterator iter = + new FileIterator<>(init(), properties, start, length, fileEncryptionKey, fileAADPrefix); + addCloseable(iter); + return iter; + } + + private static class FileIterator implements CloseableIterator { + private final boolean[] shouldSkip; + private final VectorizedReader model; + private final long totalValues; + private final int batchSize; + private final List> columnChunkMetadata; + private final boolean reuseContainers; + private int nextRowGroup = 0; + private long nextRowGroupStart = 0; + private long valuesRead = 0; + private T last = null; + private final FileReader cometReader; + + FileIterator( + ReadConf conf, + Map properties, + Long start, + Long length, + ByteBuffer fileEncryptionKey, + ByteBuffer fileAADPrefix) { + this.shouldSkip = conf.shouldSkip(); + this.totalValues = conf.totalValues(); + this.reuseContainers = conf.reuseContainers(); + this.model = conf.vectorizedModel(); + this.batchSize = conf.batchSize(); + this.model.setBatchSize(this.batchSize); + this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups(); + this.cometReader = + newCometReader( + conf.file(), + conf.projection(), + properties, + start, + length, + fileEncryptionKey, + fileAADPrefix); + } + + private FileReader newCometReader( + InputFile file, + MessageType projection, + Map properties, + Long start, + Long length, + ByteBuffer fileEncryptionKey, + ByteBuffer fileAADPrefix) { + try { + ReadOptions cometOptions = ReadOptions.builder(new Configuration()).build(); + + FileReader fileReader = + new FileReader( + new WrappedInputFile(file), + cometOptions, + properties, + start, + length, + ByteBuffers.toByteArray(fileEncryptionKey), + ByteBuffers.toByteArray(fileAADPrefix)); + + List columnDescriptors = projection.getColumns(); + + List specs = Lists.newArrayList(); + + for (ColumnDescriptor descriptor : columnDescriptors) { + ParquetColumnSpec spec = CometTypeUtils.descriptorToParquetColumnSpec(descriptor); + specs.add(spec); + } + + fileReader.setRequestedSchemaFromSpecs(specs); + return fileReader; + } catch (IOException e) { + throw new UncheckedIOException("Failed to open Parquet file: " + file.location(), e); + } + } + + @Override + public boolean hasNext() { + return valuesRead < totalValues; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + if (valuesRead >= nextRowGroupStart) { + advance(); + } + + // batchSize is an integer, so casting to integer is safe + int numValuesToRead = (int) Math.min(nextRowGroupStart - valuesRead, batchSize); + if (reuseContainers) { + this.last = model.read(last, numValuesToRead); + } else { + this.last = model.read(null, numValuesToRead); + } + valuesRead += numValuesToRead; + + return last; + } + + private void advance() { + while (shouldSkip[nextRowGroup]) { + nextRowGroup += 1; + cometReader.skipNextRowGroup(); + } + RowGroupReader pages; + try { + pages = cometReader.readNextRowGroup(); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + + model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup)); + nextRowGroupStart += pages.getRowCount(); + nextRowGroup += 1; + } + + @Override + public void close() throws IOException { + model.close(); + cometReader.close(); + } + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/parquet/CometVectorizedParquetReaderFactory.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/parquet/CometVectorizedParquetReaderFactory.java new file mode 100644 index 000000000000..25e7616ac373 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/parquet/CometVectorizedParquetReaderFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.parquet; + +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.parquet.VectorizedParquetReaderFactory; + +/** + * Factory for creating Comet-based vectorized Parquet readers. + * + *

This factory is loaded via reflection when the {@code read.parquet.vectorized-reader.factory} + * property is set to {@code org.apache.iceberg.spark.parquet.CometVectorizedParquetReaderFactory}. + * It provides Comet's native vectorized Parquet reader implementation for Iceberg tables. + */ +public class CometVectorizedParquetReaderFactory implements VectorizedParquetReaderFactory { + + @Override + public String name() { + return "comet"; + } + + @Override + public CloseableIterable createReader(ReaderParams params) { + return CometVectorizedParquetReader.builder( + params.file(), params.schema(), params.options(), params.batchedReaderFunc()) + .nameMapping(params.mapping()) + .filter(params.filter()) + .reuseContainers(params.reuseContainers()) + .caseSensitive(params.caseSensitive()) + .maxRecordsPerBatch(params.maxRecordsPerBatch()) + .properties(params.properties()) + .split(params.start(), params.length()) + .encryption(params.fileEncryptionKey(), params.fileAADPrefix()) + .build(); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index ff30f29aeae6..9aead554e3e9 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -122,6 +122,7 @@ private CloseableIterable newParquetIterable( // read performance as every batch read doesn't have to pay the cost of allocating memory. .reuseContainers() .withNameMapping(nameMapping()) + .vectorizedReaderFactory(parquetConf.factoryClassName().orElse(null)) .build(); } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 0626d0b43985..764bbc47d923 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -137,10 +137,18 @@ public PartitionReaderFactory createReaderFactory() { } private ParquetBatchReadConf parquetBatchReadConf(ParquetReaderType readerType) { - return ImmutableParquetBatchReadConf.builder() - .batchSize(readConf.parquetBatchSize()) - .readerType(readerType) - .build(); + String factoryClassName = readConf.parquetVectorizedReaderFactory(); + + ImmutableParquetBatchReadConf.Builder builder = + ImmutableParquetBatchReadConf.builder() + .batchSize(readConf.parquetBatchSize()) + .readerType(readerType); + + if (factoryClassName != null) { + builder.factoryClassName(factoryClassName); + } + + return builder.build(); } private OrcBatchReadConf orcBatchReadConf() { diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle index f31383a5582d..8c617cf9aa10 100644 --- a/spark/v3.5/build.gradle +++ b/spark/v3.5/build.gradle @@ -267,6 +267,8 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') integrationImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') integrationImplementation project(path: ":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') + integrationImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:${libs.versions.comet.get()}" + // runtime dependencies for running Hive Catalog based integration test integrationRuntimeOnly project(':iceberg-hive-metastore') diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/ParquetBatchReadConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/ParquetBatchReadConf.java index 442d728d4d69..dc84485e17f4 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/ParquetBatchReadConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/ParquetBatchReadConf.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark; import java.io.Serializable; +import java.util.Optional; import org.immutables.value.Value; @Value.Immutable @@ -26,4 +27,6 @@ public interface ParquetBatchReadConf extends Serializable { int batchSize(); ParquetReaderType readerType(); + + Optional factoryClassName(); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 9ea08c316919..e0b017d6c6b1 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -375,4 +375,11 @@ public ParquetReaderType parquetReaderType() { .defaultValue(SparkSQLProperties.PARQUET_READER_TYPE_DEFAULT) .parse(); } + + public String parquetVectorizedReaderFactory() { + return confParser + .stringConf() + .sessionConf(SparkSQLProperties.PARQUET_VECTORIZED_READER_FACTORY) + .parseOptional(); + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index 5e76123cab42..5aae7fd5cbeb 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -30,6 +30,14 @@ private SparkSQLProperties() {} // Controls which Parquet reader implementation to use public static final String PARQUET_READER_TYPE = "spark.sql.iceberg.parquet.reader-type"; public static final ParquetReaderType PARQUET_READER_TYPE_DEFAULT = ParquetReaderType.ICEBERG; + // Controls the fully qualified class name of the vectorized Parquet reader factory + public static final String PARQUET_VECTORIZED_READER_FACTORY = + "spark.sql.iceberg.parquet.vectorized-reader.factory"; + + // Comet vectorized reader factory class name + public static final String COMET_VECTORIZED_READER_FACTORY_CLASS = + "org.apache.iceberg.spark.parquet.CometVectorizedParquetReaderFactory"; + // Controls whether to perform the nullability check during writes public static final String CHECK_NULLABILITY = "spark.sql.iceberg.check-nullability"; public static final boolean CHECK_NULLABILITY_DEFAULT = true; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java index 81b7d83a7077..d6350af7a273 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java @@ -19,18 +19,22 @@ package org.apache.iceberg.spark.data.vectorized; import java.io.IOException; +import org.apache.comet.CometConf; import org.apache.comet.CometSchemaImporter; import org.apache.comet.parquet.AbstractColumnReader; import org.apache.comet.parquet.ColumnReader; +import org.apache.comet.parquet.ParquetColumnSpec; +import org.apache.comet.parquet.RowGroupReader; import org.apache.comet.parquet.TypeUtil; import org.apache.comet.parquet.Utils; import org.apache.comet.shaded.arrow.memory.RootAllocator; import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.parquet.CometTypeUtils; import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; @@ -42,23 +46,28 @@ class CometColumnReader implements VectorizedReader { private final ColumnDescriptor descriptor; private final DataType sparkType; + private final int fieldId; // The delegated ColumnReader from Comet side private AbstractColumnReader delegate; private boolean initialized = false; private int batchSize = DEFAULT_BATCH_SIZE; private CometSchemaImporter importer; + private ParquetColumnSpec spec; - CometColumnReader(DataType sparkType, ColumnDescriptor descriptor) { + CometColumnReader(DataType sparkType, ColumnDescriptor descriptor, int fieldId) { this.sparkType = sparkType; this.descriptor = descriptor; + this.fieldId = fieldId; } CometColumnReader(Types.NestedField field) { DataType dataType = SparkSchemaUtil.convert(field.type()); StructField structField = new StructField(field.name(), dataType, false, Metadata.empty()); this.sparkType = dataType; - this.descriptor = TypeUtil.convertToParquet(structField); + this.descriptor = + CometTypeUtils.buildColumnDescriptor(TypeUtil.convertToParquetSpec(structField)); + this.fieldId = field.fieldId(); } public AbstractColumnReader delegate() { @@ -92,7 +101,26 @@ public void reset() { } this.importer = new CometSchemaImporter(new RootAllocator()); - this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false); + + spec = CometTypeUtils.descriptorToParquetColumnSpec(descriptor); + + boolean useLegacyTime = + Boolean.parseBoolean( + SQLConf.get() + .getConfString( + CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP().key(), "false")); + boolean useLazyMaterialization = + Boolean.parseBoolean( + SQLConf.get().getConfString(CometConf.COMET_USE_LAZY_MATERIALIZATION().key(), "false")); + this.delegate = + Utils.getColumnReader( + sparkType, + spec, + importer, + batchSize, + true, // Comet sets this to true for native execution + useLazyMaterialization, + useLegacyTime); this.initialized = true; } @@ -111,9 +139,9 @@ public DataType sparkType() { *

NOTE: this should be called before reading a new Parquet column chunk, and after {@link * CometColumnReader#reset} is called. */ - public void setPageReader(PageReader pageReader) throws IOException { + public void setPageReader(RowGroupReader pageStore) throws IOException { Preconditions.checkState(initialized, "Invalid state: 'reset' should be called first"); - ((ColumnReader) delegate).setPageReader(pageReader); + ((ColumnReader) delegate).setRowGroupReader(pageStore, spec); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java index 3d3e9aca24de..8b11293684e0 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java @@ -23,7 +23,8 @@ import java.util.List; import java.util.Map; import org.apache.comet.parquet.AbstractColumnReader; -import org.apache.comet.parquet.BatchReader; +import org.apache.comet.parquet.IcebergCometBatchReader; +import org.apache.comet.parquet.RowGroupReader; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -51,15 +52,14 @@ class CometColumnarBatchReader implements VectorizedReader { // calling BatchReader.nextBatch, the isDeleted value is not yet available, so // DeleteColumnReader.readBatch must be called explicitly later, after the isDeleted value is // available. - private final BatchReader delegate; + private final IcebergCometBatchReader delegate; CometColumnarBatchReader(List> readers, Schema schema) { this.readers = readers.stream().map(CometColumnReader.class::cast).toArray(CometColumnReader[]::new); AbstractColumnReader[] abstractColumnReaders = new AbstractColumnReader[readers.size()]; - this.delegate = new BatchReader(abstractColumnReaders); - delegate.setSparkSchema(SparkSchemaUtil.convert(schema)); + this.delegate = new IcebergCometBatchReader(readers.size(), SparkSchemaUtil.convert(schema)); } @Override @@ -71,16 +71,18 @@ public void setRowGroupInfo( && !(readers[i] instanceof CometPositionColumnReader) && !(readers[i] instanceof CometDeleteColumnReader)) { readers[i].reset(); - readers[i].setPageReader(pageStore.getPageReader(readers[i].descriptor())); + readers[i].setPageReader((RowGroupReader) pageStore); } } catch (IOException e) { throw new UncheckedIOException("Failed to setRowGroupInfo for Comet vectorization", e); } } + AbstractColumnReader[] delegateReaders = new AbstractColumnReader[readers.length]; for (int i = 0; i < readers.length; i++) { - delegate.getColumnReaders()[i] = this.readers[i].delegate(); + delegateReaders[i] = readers[i].delegate(); } + delegate.init(delegateReaders); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java index 047c96314b13..bb3eca5aa45f 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java @@ -21,6 +21,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import org.apache.comet.parquet.ConstantColumnReader; +import org.apache.iceberg.spark.parquet.CometTypeUtils; import org.apache.iceberg.types.Types; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; @@ -34,7 +35,11 @@ class CometConstantColumnReader extends CometColumnReader { super(field); // use delegate to set constant value on the native side to be consumed by native execution. setDelegate( - new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), false)); + new ConstantColumnReader( + sparkType(), + CometTypeUtils.descriptorToParquetColumnSpec(descriptor()), + convertToSparkValue(value), + false)); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java index 26219014f777..a3aa3519c6d4 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java @@ -50,7 +50,7 @@ private static class DeleteColumnReader extends MetadataColumnReader { DeleteColumnReader(boolean[] isDeleted) { super( DataTypes.BooleanType, - TypeUtil.convertToParquet( + TypeUtil.convertToParquetSpec( new StructField("_deleted", DataTypes.BooleanType, false, Metadata.empty())), false /* useDecimal128 = false */, false /* isConstant = false */); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java index bcc0e514c28d..017318b13872 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java @@ -20,6 +20,7 @@ import org.apache.comet.parquet.MetadataColumnReader; import org.apache.comet.parquet.Native; +import org.apache.iceberg.spark.parquet.CometTypeUtils; import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; import org.apache.spark.sql.types.DataTypes; @@ -44,7 +45,7 @@ private static class PositionColumnReader extends MetadataColumnReader { PositionColumnReader(ColumnDescriptor descriptor) { super( DataTypes.LongType, - descriptor, + CometTypeUtils.descriptorToParquetColumnSpec(descriptor), false /* useDecimal128 = false */, false /* isConstant = false */); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java index 779dc240d4f6..a509258f1f92 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java @@ -133,6 +133,7 @@ public VectorizedReader primitive( return null; } - return new CometColumnReader(SparkSchemaUtil.convert(icebergField.type()), desc); + return new CometColumnReader( + SparkSchemaUtil.convert(icebergField.type()), desc, parquetFieldId); } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/parquet/CometTypeUtils.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/parquet/CometTypeUtils.java new file mode 100644 index 000000000000..b94058842510 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/parquet/CometTypeUtils.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.parquet; + +import java.util.Map; +import org.apache.comet.parquet.ParquetColumnSpec; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; + +public class CometTypeUtils { + + private CometTypeUtils() {} + + public static ParquetColumnSpec descriptorToParquetColumnSpec(ColumnDescriptor descriptor) { + + String[] path = descriptor.getPath(); + PrimitiveType primitiveType = descriptor.getPrimitiveType(); + String physicalType = primitiveType.getPrimitiveTypeName().name(); + + int typeLength = + primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY + ? primitiveType.getTypeLength() + : 0; + + boolean isRepeated = primitiveType.getRepetition() == Type.Repetition.REPEATED; + + String logicalTypeName = null; + Map logicalTypeParams = Maps.newHashMap(); + LogicalTypeAnnotation logicalType = primitiveType.getLogicalTypeAnnotation(); + + if (logicalType != null) { + logicalTypeName = logicalType.getClass().getSimpleName(); + + // Handle specific logical types + if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal = + (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("precision", String.valueOf(decimal.getPrecision())); + logicalTypeParams.put("scale", String.valueOf(decimal.getScale())); + } else if (logicalType instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamp = + (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isAdjustedToUTC", String.valueOf(timestamp.isAdjustedToUTC())); + logicalTypeParams.put("unit", timestamp.getUnit().name()); + } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimeLogicalTypeAnnotation time = + (LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isAdjustedToUTC", String.valueOf(time.isAdjustedToUTC())); + logicalTypeParams.put("unit", time.getUnit().name()); + } else if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) { + LogicalTypeAnnotation.IntLogicalTypeAnnotation intType = + (LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isSigned", String.valueOf(intType.isSigned())); + logicalTypeParams.put("bitWidth", String.valueOf(intType.getBitWidth())); + } + } + + int id = -1; + Type type = descriptor.getPrimitiveType(); + if (type != null && type.getId() != null) { + id = type.getId().intValue(); + } + + return new ParquetColumnSpec( + id, + path, + physicalType, + typeLength, + isRepeated, + descriptor.getMaxDefinitionLevel(), + descriptor.getMaxRepetitionLevel(), + logicalTypeName, + logicalTypeParams); + } + + public static ColumnDescriptor buildColumnDescriptor(ParquetColumnSpec columnSpec) { + PrimitiveType.PrimitiveTypeName primType = + PrimitiveType.PrimitiveTypeName.valueOf(columnSpec.getPhysicalType()); + + Type.Repetition repetition; + if (columnSpec.getMaxRepetitionLevel() > 0) { + repetition = Type.Repetition.REPEATED; + } else if (columnSpec.getMaxDefinitionLevel() > 0) { + repetition = Type.Repetition.OPTIONAL; + } else { + repetition = Type.Repetition.REQUIRED; + } + + String name = columnSpec.getPath()[columnSpec.getPath().length - 1]; + // Reconstruct the logical type from parameters + LogicalTypeAnnotation logicalType = null; + if (columnSpec.getLogicalTypeName() != null) { + logicalType = + reconstructLogicalType( + columnSpec.getLogicalTypeName(), columnSpec.getLogicalTypeParams()); + } + + PrimitiveType primitiveType; + if (primType == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { + primitiveType = + Types.primitive(primType, repetition) + .length(columnSpec.getTypeLength()) + .as(logicalType) + .id(columnSpec.getFieldId()) + .named(name); + } else { + primitiveType = + Types.primitive(primType, repetition) + .as(logicalType) + .id(columnSpec.getFieldId()) + .named(name); + } + + return new ColumnDescriptor( + columnSpec.getPath(), + primitiveType, + columnSpec.getMaxRepetitionLevel(), + columnSpec.getMaxDefinitionLevel()); + } + + private static LogicalTypeAnnotation reconstructLogicalType( + String logicalTypeName, Map params) { + + switch (logicalTypeName) { + // MAP + case "MapLogicalTypeAnnotation": + return LogicalTypeAnnotation.mapType(); + + // LIST + case "ListLogicalTypeAnnotation": + return LogicalTypeAnnotation.listType(); + + // STRING + case "StringLogicalTypeAnnotation": + return LogicalTypeAnnotation.stringType(); + + // MAP_KEY_VALUE + case "MapKeyValueLogicalTypeAnnotation": + return LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance(); + + // ENUM + case "EnumLogicalTypeAnnotation": + return LogicalTypeAnnotation.enumType(); + + // DECIMAL + case "DecimalLogicalTypeAnnotation": + if (!params.containsKey("scale") || !params.containsKey("precision")) { + throw new IllegalArgumentException( + "Missing required parameters for DecimalLogicalTypeAnnotation: " + params); + } + int scale = Integer.parseInt(params.get("scale")); + int precision = Integer.parseInt(params.get("precision")); + return LogicalTypeAnnotation.decimalType(scale, precision); + + // DATE + case "DateLogicalTypeAnnotation": + return LogicalTypeAnnotation.dateType(); + + // TIME + case "TimeLogicalTypeAnnotation": + if (!params.containsKey("isAdjustedToUTC") || !params.containsKey("unit")) { + throw new IllegalArgumentException( + "Missing required parameters for TimeLogicalTypeAnnotation: " + params); + } + + boolean isUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC")); + String timeUnitStr = params.get("unit"); + + LogicalTypeAnnotation.TimeUnit timeUnit; + switch (timeUnitStr) { + case "MILLIS": + timeUnit = LogicalTypeAnnotation.TimeUnit.MILLIS; + break; + case "MICROS": + timeUnit = LogicalTypeAnnotation.TimeUnit.MICROS; + break; + case "NANOS": + timeUnit = LogicalTypeAnnotation.TimeUnit.NANOS; + break; + default: + throw new IllegalArgumentException("Unknown time unit: " + timeUnitStr); + } + return LogicalTypeAnnotation.timeType(isUTC, timeUnit); + + // TIMESTAMP + case "TimestampLogicalTypeAnnotation": + if (!params.containsKey("isAdjustedToUTC") || !params.containsKey("unit")) { + throw new IllegalArgumentException( + "Missing required parameters for TimestampLogicalTypeAnnotation: " + params); + } + boolean isAdjustedToUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC")); + String unitStr = params.get("unit"); + + LogicalTypeAnnotation.TimeUnit unit; + switch (unitStr) { + case "MILLIS": + unit = LogicalTypeAnnotation.TimeUnit.MILLIS; + break; + case "MICROS": + unit = LogicalTypeAnnotation.TimeUnit.MICROS; + break; + case "NANOS": + unit = LogicalTypeAnnotation.TimeUnit.NANOS; + break; + default: + throw new IllegalArgumentException("Unknown timestamp unit: " + unitStr); + } + return LogicalTypeAnnotation.timestampType(isAdjustedToUTC, unit); + + // INTEGER + case "IntLogicalTypeAnnotation": + if (!params.containsKey("isSigned") || !params.containsKey("bitWidth")) { + throw new IllegalArgumentException( + "Missing required parameters for IntLogicalTypeAnnotation: " + params); + } + boolean isSigned = Boolean.parseBoolean(params.get("isSigned")); + int bitWidth = Integer.parseInt(params.get("bitWidth")); + return LogicalTypeAnnotation.intType(bitWidth, isSigned); + + // JSON + case "JsonLogicalTypeAnnotation": + return LogicalTypeAnnotation.jsonType(); + + // BSON + case "BsonLogicalTypeAnnotation": + return LogicalTypeAnnotation.bsonType(); + + // UUID + case "UUIDLogicalTypeAnnotation": + return LogicalTypeAnnotation.uuidType(); + + // INTERVAL + case "IntervalLogicalTypeAnnotation": + return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance(); + + default: + throw new IllegalArgumentException("Unknown logical type: " + logicalTypeName); + } + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/parquet/CometVectorizedParquetReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/parquet/CometVectorizedParquetReader.java new file mode 100644 index 000000000000..dee3fab05ff6 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/parquet/CometVectorizedParquetReader.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.parquet; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.function.Function; +import org.apache.comet.parquet.FileReader; +import org.apache.comet.parquet.ParquetColumnSpec; +import org.apache.comet.parquet.ReadOptions; +import org.apache.comet.parquet.RowGroupReader; +import org.apache.comet.parquet.WrappedInputFile; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Schema; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableGroup; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.parquet.ReadConf; +import org.apache.iceberg.parquet.VectorizedReader; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.ByteBuffers; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.schema.MessageType; + +public class CometVectorizedParquetReader extends CloseableGroup + implements CloseableIterable { + private final InputFile input; + private final ParquetReadOptions options; + private final Schema expectedSchema; + private final Function> batchReaderFunc; + private final Expression filter; + private final boolean reuseContainers; + private final boolean caseSensitive; + private final int batchSize; + private final NameMapping nameMapping; + private final Map properties; + private final Long start; + private final Long length; + private final ByteBuffer fileEncryptionKey; + private final ByteBuffer fileAADPrefix; + + private CometVectorizedParquetReader( + InputFile input, + Schema expectedSchema, + ParquetReadOptions options, + Function> readerFunc, + NameMapping nameMapping, + Expression filter, + boolean reuseContainers, + boolean caseSensitive, + int maxRecordsPerBatch, + Map properties, + Long start, + Long length, + ByteBuffer fileEncryptionKey, + ByteBuffer fileAADPrefix) { + this.input = input; + this.expectedSchema = expectedSchema; + this.options = options; + this.batchReaderFunc = readerFunc; + // replace alwaysTrue with null to avoid extra work evaluating a trivial filter + this.filter = filter == Expressions.alwaysTrue() ? null : filter; + this.reuseContainers = reuseContainers; + this.caseSensitive = caseSensitive; + this.batchSize = maxRecordsPerBatch; + this.nameMapping = nameMapping; + this.properties = properties; + this.start = start; + this.length = length; + this.fileEncryptionKey = fileEncryptionKey; + this.fileAADPrefix = fileAADPrefix; + } + + public static Builder builder( + InputFile file, + Schema schema, + ParquetReadOptions options, + Function> batchedReaderFunc) { + return new Builder(file, schema, options, batchedReaderFunc); + } + + public static class Builder { + private final InputFile file; + private final Schema schema; + private final ParquetReadOptions options; + private final Function> batchedReaderFunc; + private NameMapping nameMapping = null; + private Expression filter = null; + private boolean reuseContainers = false; + private boolean caseSensitive = true; + private int maxRecordsPerBatch = 10000; + private Map properties = null; + private Long start = null; + private Long length = null; + private ByteBuffer fileEncryptionKey = null; + private ByteBuffer fileAADPrefix = null; + + private Builder( + InputFile file, + Schema schema, + ParquetReadOptions options, + Function> batchedReaderFunc) { + this.file = file; + this.schema = schema; + this.options = options; + this.batchedReaderFunc = batchedReaderFunc; + } + + public Builder nameMapping(NameMapping mapping) { + this.nameMapping = mapping; + return this; + } + + public Builder filter(Expression filterExpr) { + this.filter = filterExpr; + return this; + } + + public Builder reuseContainers(boolean reuse) { + this.reuseContainers = reuse; + return this; + } + + public Builder caseSensitive(boolean sensitive) { + this.caseSensitive = sensitive; + return this; + } + + public Builder maxRecordsPerBatch(int maxRecords) { + this.maxRecordsPerBatch = maxRecords; + return this; + } + + public Builder properties(Map props) { + this.properties = props; + return this; + } + + public Builder split(Long splitStart, Long splitLength) { + this.start = splitStart; + this.length = splitLength; + return this; + } + + public Builder encryption(ByteBuffer encryptionKey, ByteBuffer aadPrefix) { + this.fileEncryptionKey = encryptionKey; + this.fileAADPrefix = aadPrefix; + return this; + } + + public CometVectorizedParquetReader build() { + return new CometVectorizedParquetReader<>( + file, + schema, + options, + batchedReaderFunc, + nameMapping, + filter, + reuseContainers, + caseSensitive, + maxRecordsPerBatch, + properties, + start, + length, + fileEncryptionKey, + fileAADPrefix); + } + } + + private ReadConf conf = null; + + private ReadConf init() { + if (conf == null) { + ReadConf readConf = + new ReadConf( + input, + options, + expectedSchema, + filter, + null, + batchReaderFunc, + nameMapping, + reuseContainers, + caseSensitive, + batchSize); + this.conf = readConf.copy(); + return readConf; + } + return conf; + } + + @Override + public CloseableIterator iterator() { + FileIterator iter = + new FileIterator<>(init(), properties, start, length, fileEncryptionKey, fileAADPrefix); + addCloseable(iter); + return iter; + } + + private static class FileIterator implements CloseableIterator { + private final boolean[] shouldSkip; + private final VectorizedReader model; + private final long totalValues; + private final int batchSize; + private final List> columnChunkMetadata; + private final boolean reuseContainers; + private int nextRowGroup = 0; + private long nextRowGroupStart = 0; + private long valuesRead = 0; + private T last = null; + private final FileReader cometReader; + + FileIterator( + ReadConf conf, + Map properties, + Long start, + Long length, + ByteBuffer fileEncryptionKey, + ByteBuffer fileAADPrefix) { + this.shouldSkip = conf.shouldSkip(); + this.totalValues = conf.totalValues(); + this.reuseContainers = conf.reuseContainers(); + this.model = conf.vectorizedModel(); + this.batchSize = conf.batchSize(); + this.model.setBatchSize(this.batchSize); + this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups(); + this.cometReader = + newCometReader( + conf.file(), + conf.projection(), + properties, + start, + length, + fileEncryptionKey, + fileAADPrefix); + } + + private FileReader newCometReader( + InputFile file, + MessageType projection, + Map properties, + Long start, + Long length, + ByteBuffer fileEncryptionKey, + ByteBuffer fileAADPrefix) { + try { + ReadOptions cometOptions = ReadOptions.builder(new Configuration()).build(); + + FileReader fileReader = + new FileReader( + new WrappedInputFile(file), + cometOptions, + properties, + start, + length, + ByteBuffers.toByteArray(fileEncryptionKey), + ByteBuffers.toByteArray(fileAADPrefix)); + + List columnDescriptors = projection.getColumns(); + + List specs = Lists.newArrayList(); + + for (ColumnDescriptor descriptor : columnDescriptors) { + ParquetColumnSpec spec = CometTypeUtils.descriptorToParquetColumnSpec(descriptor); + specs.add(spec); + } + + fileReader.setRequestedSchemaFromSpecs(specs); + return fileReader; + } catch (IOException e) { + throw new UncheckedIOException("Failed to open Parquet file: " + file.location(), e); + } + } + + @Override + public boolean hasNext() { + return valuesRead < totalValues; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + if (valuesRead >= nextRowGroupStart) { + advance(); + } + + // batchSize is an integer, so casting to integer is safe + int numValuesToRead = (int) Math.min(nextRowGroupStart - valuesRead, batchSize); + if (reuseContainers) { + this.last = model.read(last, numValuesToRead); + } else { + this.last = model.read(null, numValuesToRead); + } + valuesRead += numValuesToRead; + + return last; + } + + private void advance() { + while (shouldSkip[nextRowGroup]) { + nextRowGroup += 1; + cometReader.skipNextRowGroup(); + } + RowGroupReader pages; + try { + pages = cometReader.readNextRowGroup(); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + + model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup)); + nextRowGroupStart += pages.getRowCount(); + nextRowGroup += 1; + } + + @Override + public void close() throws IOException { + model.close(); + cometReader.close(); + } + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/parquet/CometVectorizedParquetReaderFactory.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/parquet/CometVectorizedParquetReaderFactory.java new file mode 100644 index 000000000000..25e7616ac373 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/parquet/CometVectorizedParquetReaderFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.parquet; + +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.parquet.VectorizedParquetReaderFactory; + +/** + * Factory for creating Comet-based vectorized Parquet readers. + * + *

This factory is loaded via reflection when the {@code read.parquet.vectorized-reader.factory} + * property is set to {@code org.apache.iceberg.spark.parquet.CometVectorizedParquetReaderFactory}. + * It provides Comet's native vectorized Parquet reader implementation for Iceberg tables. + */ +public class CometVectorizedParquetReaderFactory implements VectorizedParquetReaderFactory { + + @Override + public String name() { + return "comet"; + } + + @Override + public CloseableIterable createReader(ReaderParams params) { + return CometVectorizedParquetReader.builder( + params.file(), params.schema(), params.options(), params.batchedReaderFunc()) + .nameMapping(params.mapping()) + .filter(params.filter()) + .reuseContainers(params.reuseContainers()) + .caseSensitive(params.caseSensitive()) + .maxRecordsPerBatch(params.maxRecordsPerBatch()) + .properties(params.properties()) + .split(params.start(), params.length()) + .encryption(params.fileEncryptionKey(), params.fileAADPrefix()) + .build(); + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index ff30f29aeae6..9aead554e3e9 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -122,6 +122,7 @@ private CloseableIterable newParquetIterable( // read performance as every batch read doesn't have to pay the cost of allocating memory. .reuseContainers() .withNameMapping(nameMapping()) + .vectorizedReaderFactory(parquetConf.factoryClassName().orElse(null)) .build(); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 0626d0b43985..764bbc47d923 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -137,10 +137,18 @@ public PartitionReaderFactory createReaderFactory() { } private ParquetBatchReadConf parquetBatchReadConf(ParquetReaderType readerType) { - return ImmutableParquetBatchReadConf.builder() - .batchSize(readConf.parquetBatchSize()) - .readerType(readerType) - .build(); + String factoryClassName = readConf.parquetVectorizedReaderFactory(); + + ImmutableParquetBatchReadConf.Builder builder = + ImmutableParquetBatchReadConf.builder() + .batchSize(readConf.parquetBatchSize()) + .readerType(readerType); + + if (factoryClassName != null) { + builder.factoryClassName(factoryClassName); + } + + return builder.build(); } private OrcBatchReadConf orcBatchReadConf() { diff --git a/spark/v4.0/build.gradle b/spark/v4.0/build.gradle index 8ebed9bd439b..552fb4021f1c 100644 --- a/spark/v4.0/build.gradle +++ b/spark/v4.0/build.gradle @@ -276,6 +276,8 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') integrationImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') integrationImplementation project(path: ":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') + integrationImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_2.13:${libs.versions.comet.get()}" + // runtime dependencies for running Hive Catalog based integration test integrationRuntimeOnly project(':iceberg-hive-metastore') diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/ParquetBatchReadConf.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/ParquetBatchReadConf.java index 442d728d4d69..dc84485e17f4 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/ParquetBatchReadConf.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/ParquetBatchReadConf.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark; import java.io.Serializable; +import java.util.Optional; import org.immutables.value.Value; @Value.Immutable @@ -26,4 +27,6 @@ public interface ParquetBatchReadConf extends Serializable { int batchSize(); ParquetReaderType readerType(); + + Optional factoryClassName(); } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 2788e160d526..6e8331c0bcac 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -375,4 +375,12 @@ public ParquetReaderType parquetReaderType() { .defaultValue(SparkSQLProperties.PARQUET_READER_TYPE_DEFAULT) .parse(); } + + public String parquetVectorizedReaderFactory() { + return confParser + .stringConf() + .sessionConf(SparkSQLProperties.PARQUET_VECTORIZED_READER_FACTORY) + .defaultValue(SparkSQLProperties.COMET_VECTORIZED_READER_FACTORY_CLASS) + .parseOptional(); + } } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index 81139969f746..e5e8c275124a 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -30,6 +30,14 @@ private SparkSQLProperties() {} // Controls which Parquet reader implementation to use public static final String PARQUET_READER_TYPE = "spark.sql.iceberg.parquet.reader-type"; public static final ParquetReaderType PARQUET_READER_TYPE_DEFAULT = ParquetReaderType.ICEBERG; + // Controls the fully qualified class name of the vectorized Parquet reader factory + public static final String PARQUET_VECTORIZED_READER_FACTORY = + "spark.sql.iceberg.parquet.vectorized-reader.factory"; + + // Comet vectorized reader factory class name + public static final String COMET_VECTORIZED_READER_FACTORY_CLASS = + "org.apache.iceberg.spark.parquet.CometVectorizedParquetReaderFactory"; + // Controls whether to perform the nullability check during writes public static final String CHECK_NULLABILITY = "spark.sql.iceberg.check-nullability"; public static final boolean CHECK_NULLABILITY_DEFAULT = true; diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java index 81b7d83a7077..d6350af7a273 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java @@ -19,18 +19,22 @@ package org.apache.iceberg.spark.data.vectorized; import java.io.IOException; +import org.apache.comet.CometConf; import org.apache.comet.CometSchemaImporter; import org.apache.comet.parquet.AbstractColumnReader; import org.apache.comet.parquet.ColumnReader; +import org.apache.comet.parquet.ParquetColumnSpec; +import org.apache.comet.parquet.RowGroupReader; import org.apache.comet.parquet.TypeUtil; import org.apache.comet.parquet.Utils; import org.apache.comet.shaded.arrow.memory.RootAllocator; import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.parquet.CometTypeUtils; import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; @@ -42,23 +46,28 @@ class CometColumnReader implements VectorizedReader { private final ColumnDescriptor descriptor; private final DataType sparkType; + private final int fieldId; // The delegated ColumnReader from Comet side private AbstractColumnReader delegate; private boolean initialized = false; private int batchSize = DEFAULT_BATCH_SIZE; private CometSchemaImporter importer; + private ParquetColumnSpec spec; - CometColumnReader(DataType sparkType, ColumnDescriptor descriptor) { + CometColumnReader(DataType sparkType, ColumnDescriptor descriptor, int fieldId) { this.sparkType = sparkType; this.descriptor = descriptor; + this.fieldId = fieldId; } CometColumnReader(Types.NestedField field) { DataType dataType = SparkSchemaUtil.convert(field.type()); StructField structField = new StructField(field.name(), dataType, false, Metadata.empty()); this.sparkType = dataType; - this.descriptor = TypeUtil.convertToParquet(structField); + this.descriptor = + CometTypeUtils.buildColumnDescriptor(TypeUtil.convertToParquetSpec(structField)); + this.fieldId = field.fieldId(); } public AbstractColumnReader delegate() { @@ -92,7 +101,26 @@ public void reset() { } this.importer = new CometSchemaImporter(new RootAllocator()); - this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false); + + spec = CometTypeUtils.descriptorToParquetColumnSpec(descriptor); + + boolean useLegacyTime = + Boolean.parseBoolean( + SQLConf.get() + .getConfString( + CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP().key(), "false")); + boolean useLazyMaterialization = + Boolean.parseBoolean( + SQLConf.get().getConfString(CometConf.COMET_USE_LAZY_MATERIALIZATION().key(), "false")); + this.delegate = + Utils.getColumnReader( + sparkType, + spec, + importer, + batchSize, + true, // Comet sets this to true for native execution + useLazyMaterialization, + useLegacyTime); this.initialized = true; } @@ -111,9 +139,9 @@ public DataType sparkType() { *

NOTE: this should be called before reading a new Parquet column chunk, and after {@link * CometColumnReader#reset} is called. */ - public void setPageReader(PageReader pageReader) throws IOException { + public void setPageReader(RowGroupReader pageStore) throws IOException { Preconditions.checkState(initialized, "Invalid state: 'reset' should be called first"); - ((ColumnReader) delegate).setPageReader(pageReader); + ((ColumnReader) delegate).setRowGroupReader(pageStore, spec); } @Override diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java index 3d3e9aca24de..20319cbc1ab1 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java @@ -23,7 +23,8 @@ import java.util.List; import java.util.Map; import org.apache.comet.parquet.AbstractColumnReader; -import org.apache.comet.parquet.BatchReader; +import org.apache.comet.parquet.IcebergCometBatchReader; +import org.apache.comet.parquet.RowGroupReader; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -51,15 +52,13 @@ class CometColumnarBatchReader implements VectorizedReader { // calling BatchReader.nextBatch, the isDeleted value is not yet available, so // DeleteColumnReader.readBatch must be called explicitly later, after the isDeleted value is // available. - private final BatchReader delegate; + private final IcebergCometBatchReader delegate; CometColumnarBatchReader(List> readers, Schema schema) { this.readers = readers.stream().map(CometColumnReader.class::cast).toArray(CometColumnReader[]::new); - AbstractColumnReader[] abstractColumnReaders = new AbstractColumnReader[readers.size()]; - this.delegate = new BatchReader(abstractColumnReaders); - delegate.setSparkSchema(SparkSchemaUtil.convert(schema)); + this.delegate = new IcebergCometBatchReader(readers.size(), SparkSchemaUtil.convert(schema)); } @Override @@ -71,16 +70,18 @@ public void setRowGroupInfo( && !(readers[i] instanceof CometPositionColumnReader) && !(readers[i] instanceof CometDeleteColumnReader)) { readers[i].reset(); - readers[i].setPageReader(pageStore.getPageReader(readers[i].descriptor())); + readers[i].setPageReader((RowGroupReader) pageStore); } } catch (IOException e) { throw new UncheckedIOException("Failed to setRowGroupInfo for Comet vectorization", e); } } + AbstractColumnReader[] delegateReaders = new AbstractColumnReader[readers.length]; for (int i = 0; i < readers.length; i++) { - delegate.getColumnReaders()[i] = this.readers[i].delegate(); + delegateReaders[i] = readers[i].delegate(); } + delegate.init(delegateReaders); } @Override diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java index 047c96314b13..bb3eca5aa45f 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java @@ -21,6 +21,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import org.apache.comet.parquet.ConstantColumnReader; +import org.apache.iceberg.spark.parquet.CometTypeUtils; import org.apache.iceberg.types.Types; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; @@ -34,7 +35,11 @@ class CometConstantColumnReader extends CometColumnReader { super(field); // use delegate to set constant value on the native side to be consumed by native execution. setDelegate( - new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), false)); + new ConstantColumnReader( + sparkType(), + CometTypeUtils.descriptorToParquetColumnSpec(descriptor()), + convertToSparkValue(value), + false)); } @Override diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java index 26219014f777..a3aa3519c6d4 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java @@ -50,7 +50,7 @@ private static class DeleteColumnReader extends MetadataColumnReader { DeleteColumnReader(boolean[] isDeleted) { super( DataTypes.BooleanType, - TypeUtil.convertToParquet( + TypeUtil.convertToParquetSpec( new StructField("_deleted", DataTypes.BooleanType, false, Metadata.empty())), false /* useDecimal128 = false */, false /* isConstant = false */); diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java index bcc0e514c28d..017318b13872 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java @@ -20,6 +20,7 @@ import org.apache.comet.parquet.MetadataColumnReader; import org.apache.comet.parquet.Native; +import org.apache.iceberg.spark.parquet.CometTypeUtils; import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; import org.apache.spark.sql.types.DataTypes; @@ -44,7 +45,7 @@ private static class PositionColumnReader extends MetadataColumnReader { PositionColumnReader(ColumnDescriptor descriptor) { super( DataTypes.LongType, - descriptor, + CometTypeUtils.descriptorToParquetColumnSpec(descriptor), false /* useDecimal128 = false */, false /* isConstant = false */); } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java index 779dc240d4f6..a509258f1f92 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java @@ -133,6 +133,7 @@ public VectorizedReader primitive( return null; } - return new CometColumnReader(SparkSchemaUtil.convert(icebergField.type()), desc); + return new CometColumnReader( + SparkSchemaUtil.convert(icebergField.type()), desc, parquetFieldId); } } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/parquet/CometTypeUtils.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/parquet/CometTypeUtils.java new file mode 100644 index 000000000000..b94058842510 --- /dev/null +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/parquet/CometTypeUtils.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.parquet; + +import java.util.Map; +import org.apache.comet.parquet.ParquetColumnSpec; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; + +public class CometTypeUtils { + + private CometTypeUtils() {} + + public static ParquetColumnSpec descriptorToParquetColumnSpec(ColumnDescriptor descriptor) { + + String[] path = descriptor.getPath(); + PrimitiveType primitiveType = descriptor.getPrimitiveType(); + String physicalType = primitiveType.getPrimitiveTypeName().name(); + + int typeLength = + primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY + ? primitiveType.getTypeLength() + : 0; + + boolean isRepeated = primitiveType.getRepetition() == Type.Repetition.REPEATED; + + String logicalTypeName = null; + Map logicalTypeParams = Maps.newHashMap(); + LogicalTypeAnnotation logicalType = primitiveType.getLogicalTypeAnnotation(); + + if (logicalType != null) { + logicalTypeName = logicalType.getClass().getSimpleName(); + + // Handle specific logical types + if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal = + (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("precision", String.valueOf(decimal.getPrecision())); + logicalTypeParams.put("scale", String.valueOf(decimal.getScale())); + } else if (logicalType instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamp = + (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isAdjustedToUTC", String.valueOf(timestamp.isAdjustedToUTC())); + logicalTypeParams.put("unit", timestamp.getUnit().name()); + } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimeLogicalTypeAnnotation time = + (LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isAdjustedToUTC", String.valueOf(time.isAdjustedToUTC())); + logicalTypeParams.put("unit", time.getUnit().name()); + } else if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) { + LogicalTypeAnnotation.IntLogicalTypeAnnotation intType = + (LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isSigned", String.valueOf(intType.isSigned())); + logicalTypeParams.put("bitWidth", String.valueOf(intType.getBitWidth())); + } + } + + int id = -1; + Type type = descriptor.getPrimitiveType(); + if (type != null && type.getId() != null) { + id = type.getId().intValue(); + } + + return new ParquetColumnSpec( + id, + path, + physicalType, + typeLength, + isRepeated, + descriptor.getMaxDefinitionLevel(), + descriptor.getMaxRepetitionLevel(), + logicalTypeName, + logicalTypeParams); + } + + public static ColumnDescriptor buildColumnDescriptor(ParquetColumnSpec columnSpec) { + PrimitiveType.PrimitiveTypeName primType = + PrimitiveType.PrimitiveTypeName.valueOf(columnSpec.getPhysicalType()); + + Type.Repetition repetition; + if (columnSpec.getMaxRepetitionLevel() > 0) { + repetition = Type.Repetition.REPEATED; + } else if (columnSpec.getMaxDefinitionLevel() > 0) { + repetition = Type.Repetition.OPTIONAL; + } else { + repetition = Type.Repetition.REQUIRED; + } + + String name = columnSpec.getPath()[columnSpec.getPath().length - 1]; + // Reconstruct the logical type from parameters + LogicalTypeAnnotation logicalType = null; + if (columnSpec.getLogicalTypeName() != null) { + logicalType = + reconstructLogicalType( + columnSpec.getLogicalTypeName(), columnSpec.getLogicalTypeParams()); + } + + PrimitiveType primitiveType; + if (primType == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { + primitiveType = + Types.primitive(primType, repetition) + .length(columnSpec.getTypeLength()) + .as(logicalType) + .id(columnSpec.getFieldId()) + .named(name); + } else { + primitiveType = + Types.primitive(primType, repetition) + .as(logicalType) + .id(columnSpec.getFieldId()) + .named(name); + } + + return new ColumnDescriptor( + columnSpec.getPath(), + primitiveType, + columnSpec.getMaxRepetitionLevel(), + columnSpec.getMaxDefinitionLevel()); + } + + private static LogicalTypeAnnotation reconstructLogicalType( + String logicalTypeName, Map params) { + + switch (logicalTypeName) { + // MAP + case "MapLogicalTypeAnnotation": + return LogicalTypeAnnotation.mapType(); + + // LIST + case "ListLogicalTypeAnnotation": + return LogicalTypeAnnotation.listType(); + + // STRING + case "StringLogicalTypeAnnotation": + return LogicalTypeAnnotation.stringType(); + + // MAP_KEY_VALUE + case "MapKeyValueLogicalTypeAnnotation": + return LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance(); + + // ENUM + case "EnumLogicalTypeAnnotation": + return LogicalTypeAnnotation.enumType(); + + // DECIMAL + case "DecimalLogicalTypeAnnotation": + if (!params.containsKey("scale") || !params.containsKey("precision")) { + throw new IllegalArgumentException( + "Missing required parameters for DecimalLogicalTypeAnnotation: " + params); + } + int scale = Integer.parseInt(params.get("scale")); + int precision = Integer.parseInt(params.get("precision")); + return LogicalTypeAnnotation.decimalType(scale, precision); + + // DATE + case "DateLogicalTypeAnnotation": + return LogicalTypeAnnotation.dateType(); + + // TIME + case "TimeLogicalTypeAnnotation": + if (!params.containsKey("isAdjustedToUTC") || !params.containsKey("unit")) { + throw new IllegalArgumentException( + "Missing required parameters for TimeLogicalTypeAnnotation: " + params); + } + + boolean isUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC")); + String timeUnitStr = params.get("unit"); + + LogicalTypeAnnotation.TimeUnit timeUnit; + switch (timeUnitStr) { + case "MILLIS": + timeUnit = LogicalTypeAnnotation.TimeUnit.MILLIS; + break; + case "MICROS": + timeUnit = LogicalTypeAnnotation.TimeUnit.MICROS; + break; + case "NANOS": + timeUnit = LogicalTypeAnnotation.TimeUnit.NANOS; + break; + default: + throw new IllegalArgumentException("Unknown time unit: " + timeUnitStr); + } + return LogicalTypeAnnotation.timeType(isUTC, timeUnit); + + // TIMESTAMP + case "TimestampLogicalTypeAnnotation": + if (!params.containsKey("isAdjustedToUTC") || !params.containsKey("unit")) { + throw new IllegalArgumentException( + "Missing required parameters for TimestampLogicalTypeAnnotation: " + params); + } + boolean isAdjustedToUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC")); + String unitStr = params.get("unit"); + + LogicalTypeAnnotation.TimeUnit unit; + switch (unitStr) { + case "MILLIS": + unit = LogicalTypeAnnotation.TimeUnit.MILLIS; + break; + case "MICROS": + unit = LogicalTypeAnnotation.TimeUnit.MICROS; + break; + case "NANOS": + unit = LogicalTypeAnnotation.TimeUnit.NANOS; + break; + default: + throw new IllegalArgumentException("Unknown timestamp unit: " + unitStr); + } + return LogicalTypeAnnotation.timestampType(isAdjustedToUTC, unit); + + // INTEGER + case "IntLogicalTypeAnnotation": + if (!params.containsKey("isSigned") || !params.containsKey("bitWidth")) { + throw new IllegalArgumentException( + "Missing required parameters for IntLogicalTypeAnnotation: " + params); + } + boolean isSigned = Boolean.parseBoolean(params.get("isSigned")); + int bitWidth = Integer.parseInt(params.get("bitWidth")); + return LogicalTypeAnnotation.intType(bitWidth, isSigned); + + // JSON + case "JsonLogicalTypeAnnotation": + return LogicalTypeAnnotation.jsonType(); + + // BSON + case "BsonLogicalTypeAnnotation": + return LogicalTypeAnnotation.bsonType(); + + // UUID + case "UUIDLogicalTypeAnnotation": + return LogicalTypeAnnotation.uuidType(); + + // INTERVAL + case "IntervalLogicalTypeAnnotation": + return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance(); + + default: + throw new IllegalArgumentException("Unknown logical type: " + logicalTypeName); + } + } +} diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/parquet/CometVectorizedParquetReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/parquet/CometVectorizedParquetReader.java new file mode 100644 index 000000000000..dee3fab05ff6 --- /dev/null +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/parquet/CometVectorizedParquetReader.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.parquet; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.function.Function; +import org.apache.comet.parquet.FileReader; +import org.apache.comet.parquet.ParquetColumnSpec; +import org.apache.comet.parquet.ReadOptions; +import org.apache.comet.parquet.RowGroupReader; +import org.apache.comet.parquet.WrappedInputFile; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Schema; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableGroup; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.parquet.ReadConf; +import org.apache.iceberg.parquet.VectorizedReader; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.ByteBuffers; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.schema.MessageType; + +public class CometVectorizedParquetReader extends CloseableGroup + implements CloseableIterable { + private final InputFile input; + private final ParquetReadOptions options; + private final Schema expectedSchema; + private final Function> batchReaderFunc; + private final Expression filter; + private final boolean reuseContainers; + private final boolean caseSensitive; + private final int batchSize; + private final NameMapping nameMapping; + private final Map properties; + private final Long start; + private final Long length; + private final ByteBuffer fileEncryptionKey; + private final ByteBuffer fileAADPrefix; + + private CometVectorizedParquetReader( + InputFile input, + Schema expectedSchema, + ParquetReadOptions options, + Function> readerFunc, + NameMapping nameMapping, + Expression filter, + boolean reuseContainers, + boolean caseSensitive, + int maxRecordsPerBatch, + Map properties, + Long start, + Long length, + ByteBuffer fileEncryptionKey, + ByteBuffer fileAADPrefix) { + this.input = input; + this.expectedSchema = expectedSchema; + this.options = options; + this.batchReaderFunc = readerFunc; + // replace alwaysTrue with null to avoid extra work evaluating a trivial filter + this.filter = filter == Expressions.alwaysTrue() ? null : filter; + this.reuseContainers = reuseContainers; + this.caseSensitive = caseSensitive; + this.batchSize = maxRecordsPerBatch; + this.nameMapping = nameMapping; + this.properties = properties; + this.start = start; + this.length = length; + this.fileEncryptionKey = fileEncryptionKey; + this.fileAADPrefix = fileAADPrefix; + } + + public static Builder builder( + InputFile file, + Schema schema, + ParquetReadOptions options, + Function> batchedReaderFunc) { + return new Builder(file, schema, options, batchedReaderFunc); + } + + public static class Builder { + private final InputFile file; + private final Schema schema; + private final ParquetReadOptions options; + private final Function> batchedReaderFunc; + private NameMapping nameMapping = null; + private Expression filter = null; + private boolean reuseContainers = false; + private boolean caseSensitive = true; + private int maxRecordsPerBatch = 10000; + private Map properties = null; + private Long start = null; + private Long length = null; + private ByteBuffer fileEncryptionKey = null; + private ByteBuffer fileAADPrefix = null; + + private Builder( + InputFile file, + Schema schema, + ParquetReadOptions options, + Function> batchedReaderFunc) { + this.file = file; + this.schema = schema; + this.options = options; + this.batchedReaderFunc = batchedReaderFunc; + } + + public Builder nameMapping(NameMapping mapping) { + this.nameMapping = mapping; + return this; + } + + public Builder filter(Expression filterExpr) { + this.filter = filterExpr; + return this; + } + + public Builder reuseContainers(boolean reuse) { + this.reuseContainers = reuse; + return this; + } + + public Builder caseSensitive(boolean sensitive) { + this.caseSensitive = sensitive; + return this; + } + + public Builder maxRecordsPerBatch(int maxRecords) { + this.maxRecordsPerBatch = maxRecords; + return this; + } + + public Builder properties(Map props) { + this.properties = props; + return this; + } + + public Builder split(Long splitStart, Long splitLength) { + this.start = splitStart; + this.length = splitLength; + return this; + } + + public Builder encryption(ByteBuffer encryptionKey, ByteBuffer aadPrefix) { + this.fileEncryptionKey = encryptionKey; + this.fileAADPrefix = aadPrefix; + return this; + } + + public CometVectorizedParquetReader build() { + return new CometVectorizedParquetReader<>( + file, + schema, + options, + batchedReaderFunc, + nameMapping, + filter, + reuseContainers, + caseSensitive, + maxRecordsPerBatch, + properties, + start, + length, + fileEncryptionKey, + fileAADPrefix); + } + } + + private ReadConf conf = null; + + private ReadConf init() { + if (conf == null) { + ReadConf readConf = + new ReadConf( + input, + options, + expectedSchema, + filter, + null, + batchReaderFunc, + nameMapping, + reuseContainers, + caseSensitive, + batchSize); + this.conf = readConf.copy(); + return readConf; + } + return conf; + } + + @Override + public CloseableIterator iterator() { + FileIterator iter = + new FileIterator<>(init(), properties, start, length, fileEncryptionKey, fileAADPrefix); + addCloseable(iter); + return iter; + } + + private static class FileIterator implements CloseableIterator { + private final boolean[] shouldSkip; + private final VectorizedReader model; + private final long totalValues; + private final int batchSize; + private final List> columnChunkMetadata; + private final boolean reuseContainers; + private int nextRowGroup = 0; + private long nextRowGroupStart = 0; + private long valuesRead = 0; + private T last = null; + private final FileReader cometReader; + + FileIterator( + ReadConf conf, + Map properties, + Long start, + Long length, + ByteBuffer fileEncryptionKey, + ByteBuffer fileAADPrefix) { + this.shouldSkip = conf.shouldSkip(); + this.totalValues = conf.totalValues(); + this.reuseContainers = conf.reuseContainers(); + this.model = conf.vectorizedModel(); + this.batchSize = conf.batchSize(); + this.model.setBatchSize(this.batchSize); + this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups(); + this.cometReader = + newCometReader( + conf.file(), + conf.projection(), + properties, + start, + length, + fileEncryptionKey, + fileAADPrefix); + } + + private FileReader newCometReader( + InputFile file, + MessageType projection, + Map properties, + Long start, + Long length, + ByteBuffer fileEncryptionKey, + ByteBuffer fileAADPrefix) { + try { + ReadOptions cometOptions = ReadOptions.builder(new Configuration()).build(); + + FileReader fileReader = + new FileReader( + new WrappedInputFile(file), + cometOptions, + properties, + start, + length, + ByteBuffers.toByteArray(fileEncryptionKey), + ByteBuffers.toByteArray(fileAADPrefix)); + + List columnDescriptors = projection.getColumns(); + + List specs = Lists.newArrayList(); + + for (ColumnDescriptor descriptor : columnDescriptors) { + ParquetColumnSpec spec = CometTypeUtils.descriptorToParquetColumnSpec(descriptor); + specs.add(spec); + } + + fileReader.setRequestedSchemaFromSpecs(specs); + return fileReader; + } catch (IOException e) { + throw new UncheckedIOException("Failed to open Parquet file: " + file.location(), e); + } + } + + @Override + public boolean hasNext() { + return valuesRead < totalValues; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + if (valuesRead >= nextRowGroupStart) { + advance(); + } + + // batchSize is an integer, so casting to integer is safe + int numValuesToRead = (int) Math.min(nextRowGroupStart - valuesRead, batchSize); + if (reuseContainers) { + this.last = model.read(last, numValuesToRead); + } else { + this.last = model.read(null, numValuesToRead); + } + valuesRead += numValuesToRead; + + return last; + } + + private void advance() { + while (shouldSkip[nextRowGroup]) { + nextRowGroup += 1; + cometReader.skipNextRowGroup(); + } + RowGroupReader pages; + try { + pages = cometReader.readNextRowGroup(); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + + model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup)); + nextRowGroupStart += pages.getRowCount(); + nextRowGroup += 1; + } + + @Override + public void close() throws IOException { + model.close(); + cometReader.close(); + } + } +} diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/parquet/CometVectorizedParquetReaderFactory.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/parquet/CometVectorizedParquetReaderFactory.java new file mode 100644 index 000000000000..25e7616ac373 --- /dev/null +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/parquet/CometVectorizedParquetReaderFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.parquet; + +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.parquet.VectorizedParquetReaderFactory; + +/** + * Factory for creating Comet-based vectorized Parquet readers. + * + *

This factory is loaded via reflection when the {@code read.parquet.vectorized-reader.factory} + * property is set to {@code org.apache.iceberg.spark.parquet.CometVectorizedParquetReaderFactory}. + * It provides Comet's native vectorized Parquet reader implementation for Iceberg tables. + */ +public class CometVectorizedParquetReaderFactory implements VectorizedParquetReaderFactory { + + @Override + public String name() { + return "comet"; + } + + @Override + public CloseableIterable createReader(ReaderParams params) { + return CometVectorizedParquetReader.builder( + params.file(), params.schema(), params.options(), params.batchedReaderFunc()) + .nameMapping(params.mapping()) + .filter(params.filter()) + .reuseContainers(params.reuseContainers()) + .caseSensitive(params.caseSensitive()) + .maxRecordsPerBatch(params.maxRecordsPerBatch()) + .properties(params.properties()) + .split(params.start(), params.length()) + .encryption(params.fileEncryptionKey(), params.fileAADPrefix()) + .build(); + } +} diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index ff30f29aeae6..9aead554e3e9 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -122,6 +122,7 @@ private CloseableIterable newParquetIterable( // read performance as every batch read doesn't have to pay the cost of allocating memory. .reuseContainers() .withNameMapping(nameMapping()) + .vectorizedReaderFactory(parquetConf.factoryClassName().orElse(null)) .build(); } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 0626d0b43985..764bbc47d923 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -137,10 +137,18 @@ public PartitionReaderFactory createReaderFactory() { } private ParquetBatchReadConf parquetBatchReadConf(ParquetReaderType readerType) { - return ImmutableParquetBatchReadConf.builder() - .batchSize(readConf.parquetBatchSize()) - .readerType(readerType) - .build(); + String factoryClassName = readConf.parquetVectorizedReaderFactory(); + + ImmutableParquetBatchReadConf.Builder builder = + ImmutableParquetBatchReadConf.builder() + .batchSize(readConf.parquetBatchSize()) + .readerType(readerType); + + if (factoryClassName != null) { + builder.factoryClassName(factoryClassName); + } + + return builder.build(); } private OrcBatchReadConf orcBatchReadConf() {