-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark: Encapsulate parquet objects for Comet #13786
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
8df3ab4
ece26bb
ba75958
e77a63b
b3b38eb
c522693
2cc4622
c093598
461bd23
43ea520
22d5de1
bc0b83e
17d9a79
e65888e
096fe99
6d74b80
efb0f9e
4e29904
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -845,6 +845,7 @@ project(':iceberg-orc') { | |
| } | ||
|
|
||
| project(':iceberg-parquet') { | ||
|
|
||
| test { | ||
| useJUnitPlatform() | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,7 +43,7 @@ | |
| * | ||
| * @param <T> type of value to read | ||
| */ | ||
| class ReadConf<T> { | ||
| public class ReadConf<T> { | ||
| private final ParquetFileReader reader; | ||
| private final InputFile file; | ||
| private final ParquetReadOptions options; | ||
|
|
@@ -60,7 +60,7 @@ class ReadConf<T> { | |
| private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetaDataForRowGroups; | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| ReadConf( | ||
| public ReadConf( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need this public?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes we do. The Comet reader is now in a different module and needs to access the read conf for read information. I could make it protected and then derive a class explicitly meant for custom readers, but that seems overkill.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it's used outside parquet module, it would be better renamed
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would rather not rename an existing class. I feel it will reduce confusion on the scope of the changes. |
||
| InputFile file, | ||
| ParquetReadOptions options, | ||
| Schema expectedSchema, | ||
|
|
@@ -146,7 +146,7 @@ private ReadConf(ReadConf<T> toCopy) { | |
| this.columnChunkMetaDataForRowGroups = toCopy.columnChunkMetaDataForRowGroups; | ||
| } | ||
|
|
||
| ParquetFileReader reader() { | ||
| public ParquetFileReader reader() { | ||
| if (reader != null) { | ||
| reader.setRequestedSchema(projection); | ||
| return reader; | ||
|
|
@@ -157,35 +157,43 @@ ParquetFileReader reader() { | |
| return newReader; | ||
| } | ||
|
|
||
| ParquetValueReader<T> model() { | ||
| public InputFile file() { | ||
| return file; | ||
| } | ||
|
|
||
| public MessageType projection() { | ||
| return projection; | ||
| } | ||
|
|
||
| public ParquetValueReader<T> model() { | ||
| return model; | ||
| } | ||
|
|
||
| VectorizedReader<T> vectorizedModel() { | ||
| public VectorizedReader<T> vectorizedModel() { | ||
| return vectorizedModel; | ||
| } | ||
|
|
||
| boolean[] shouldSkip() { | ||
| public boolean[] shouldSkip() { | ||
| return shouldSkip; | ||
| } | ||
|
|
||
| long totalValues() { | ||
| public long totalValues() { | ||
| return totalValues; | ||
| } | ||
|
|
||
| boolean reuseContainers() { | ||
| public boolean reuseContainers() { | ||
| return reuseContainers; | ||
| } | ||
|
|
||
| Integer batchSize() { | ||
| public Integer batchSize() { | ||
| return batchSize; | ||
| } | ||
|
|
||
| List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetadataForRowGroups() { | ||
| public List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetadataForRowGroups() { | ||
| return columnChunkMetaDataForRowGroups; | ||
| } | ||
|
|
||
| ReadConf<T> copy() { | ||
| public ReadConf<T> copy() { | ||
| return new ReadConf<>(this); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg.parquet; | ||
|
|
||
| import java.nio.ByteBuffer; | ||
| import java.util.Map; | ||
| import java.util.function.Function; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.expressions.Expression; | ||
| import org.apache.iceberg.io.CloseableIterable; | ||
| import org.apache.iceberg.io.InputFile; | ||
| import org.apache.iceberg.mapping.NameMapping; | ||
| import org.apache.parquet.ParquetReadOptions; | ||
| import org.apache.parquet.schema.MessageType; | ||
|
|
||
| /** | ||
| * Service Provider Interface (SPI) for creating custom vectorized Parquet readers. | ||
| * | ||
| * <p>Implementations of this interface can be loaded at runtime using Java's {@link | ||
| * java.util.ServiceLoader} mechanism. To register an implementation, create a file named {@code | ||
| * META-INF/services/org.apache.iceberg.parquet.VectorizedParquetReaderFactory} containing the fully | ||
| * qualified class name of the implementation. | ||
| * | ||
| * <p>This allows for pluggable vectorized reader implementations (e.g., Comet, Arrow, Velox) | ||
| * without requiring the core parquet module to depend on specific execution engines. | ||
| */ | ||
| public interface VectorizedParquetReaderFactory { | ||
|
|
||
| /** | ||
| * Returns the unique identifier for this reader factory. | ||
| * | ||
| * <p>This name is used to select the reader factory via configuration. For example, "comet" for | ||
| * the Comet vectorized reader. | ||
| * | ||
| * @return the unique name for this factory | ||
| */ | ||
| String name(); | ||
|
|
||
| /** | ||
| * Creates a vectorized parquet reader with the given configuration. | ||
| * | ||
| * @param file the input file to read | ||
| * @param schema the expected schema for the data | ||
| * @param options parquet read options | ||
| * @param batchedReaderFunc function to create a VectorizedReader from a MessageType | ||
| * @param mapping name mapping for schema evolution | ||
| * @param filter filter expression to apply during reading | ||
| * @param reuseContainers whether to reuse containers for records | ||
| * @param caseSensitive whether column name matching should be case-sensitive | ||
| * @param maxRecordsPerBatch maximum number of records per batch | ||
| * @param properties additional properties for reader configuration | ||
| * @param start optional start position for reading | ||
| * @param length optional length to read | ||
| * @param fileEncryptionKey optional encryption key for the file | ||
| * @param fileAADPrefix optional AAD prefix for encryption | ||
| * @param <T> the type of records returned by the reader | ||
| * @return a closeable iterable of records | ||
| */ | ||
| <T> CloseableIterable<T> createReader( | ||
parthchandra marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| InputFile file, | ||
| Schema schema, | ||
| ParquetReadOptions options, | ||
| Function<MessageType, VectorizedReader<?>> batchedReaderFunc, | ||
| NameMapping mapping, | ||
| Expression filter, | ||
| boolean reuseContainers, | ||
| boolean caseSensitive, | ||
| int maxRecordsPerBatch, | ||
| Map<String, String> properties, | ||
| Long start, | ||
| Long length, | ||
| ByteBuffer fileEncryptionKey, | ||
| ByteBuffer fileAADPrefix); | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.