Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@

public class Parquet {
private static final Logger LOG = LoggerFactory.getLogger(Parquet.class);
private static final String VECTORIZED_READER_FACTORY = "read.parquet.vectorized-reader.factory";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a unit test for changes in this class?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a bunch of tests for this. For the Comet specific code, there is a plan to add a diff file that can be applied to change the reader for all the existing tests and we can add a ci test pipeline that verifies that nothing is broken.


private Parquet() {}

Expand Down Expand Up @@ -1378,6 +1379,22 @@ public ReadBuilder setCustomType(int fieldId, Class<? extends StructLike> struct
return this;
}

/**
* Sets the vectorized reader factory class to use for reading Parquet files.
*
* @param factoryClassName fully qualified class name of the VectorizedParquetReaderFactory
* implementation, or null to use the default reader
* @return this builder for method chaining
*/
public ReadBuilder vectorizedReaderFactory(String factoryClassName) {
if (factoryClassName != null) {
this.properties.put(VECTORIZED_READER_FACTORY, factoryClassName);
} else {
this.properties.remove(VECTORIZED_READER_FACTORY);
}
return this;
}

public ReadBuilder withFileEncryptionKey(ByteBuffer encryptionKey) {
this.fileEncryptionKey = encryptionKey;
return this;
Expand All @@ -1389,7 +1406,7 @@ public ReadBuilder withAADPrefix(ByteBuffer aadPrefix) {
}

@Override
@SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"})
@SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity", "MethodLength"})
public <D> CloseableIterable<D> build() {
FileDecryptionProperties fileDecryptionProperties = null;
if (fileEncryptionKey != null) {
Expand Down Expand Up @@ -1442,6 +1459,29 @@ public <D> CloseableIterable<D> build() {
}

if (batchedReaderFunc != null) {
// Try to load custom vectorized reader factory from properties
String factoryName = properties.get(VECTORIZED_READER_FACTORY);

if (factoryName != null) {
LOG.info("Loading custom vectorized reader factory: {}", factoryName);
VectorizedParquetReaderFactory factory = loadReaderFactory(factoryName);
if (factory != null) {
return factory.createReader(
VectorizedParquetReaderFactory.ReaderParams.builder(
file, schema, options, batchedReaderFunc)
.nameMapping(mapping)
.filter(filter)
.reuseContainers(reuseContainers)
.caseSensitive(caseSensitive)
.maxRecordsPerBatch(maxRecordsPerBatch)
.properties(properties)
.split(start, length)
.encryption(fileEncryptionKey, fileAADPrefix)
.build());
}
}

// Fall back to default VectorizedParquetReader
return new VectorizedParquetReader<>(
file,
schema,
Expand Down Expand Up @@ -1537,6 +1577,21 @@ public <D> CloseableIterable<D> build() {
}
}

private static VectorizedParquetReaderFactory loadReaderFactory(String className) {
try {
Class<?> factoryClass = Class.forName(className);
if (VectorizedParquetReaderFactory.class.isAssignableFrom(factoryClass)) {
return (VectorizedParquetReaderFactory) factoryClass.getDeclaredConstructor().newInstance();
} else {
LOG.warn("Class {} does not implement VectorizedParquetReaderFactory interface", className);
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than return null for multiple failure cases, how about

VectorizedParquetReaderFactory factory = null;
try {
  Class<?> factoryClass = Class.forName(className);
  if (VectorizedParquetReaderFactory.class.isAssignableFrom(factoryClass)) {
    factory = (VectorizedParquetReaderFactory) factoryClass.getDeclaredConstructor().newInstance();
  } else {
    // log warning
  } catch (...) {
    // log warning
  }
  return factory;
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We lose a little bit of helpful text in the error, but made the change.

}
} catch (Exception e) {
LOG.warn("Failed to instantiate vectorized reader factory: {}", className, e);
return null;
}
}

private static class ParquetReadBuilder<T> extends ParquetReader.Builder<T> {
private Schema schema = null;
private ReadSupport<T> readSupport = null;
Expand Down
30 changes: 19 additions & 11 deletions parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
*
* @param <T> type of value to read
*/
class ReadConf<T> {
public class ReadConf<T> {
private final ParquetFileReader reader;
private final InputFile file;
private final ParquetReadOptions options;
Expand All @@ -60,7 +60,7 @@ class ReadConf<T> {
private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetaDataForRowGroups;

@SuppressWarnings("unchecked")
ReadConf(
public ReadConf(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this public?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we do. The Comet reader is now in a different module and needs to access the read conf for read information. I could make it protected and then derive a class explicitly meant for custom readers, but that seems overkill.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's used outside parquet module, it would be better renamed ParquetReadConf.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather not rename an existing class. I feel it will reduce confusion on the scope of the changes.

InputFile file,
ParquetReadOptions options,
Schema expectedSchema,
Expand Down Expand Up @@ -146,7 +146,7 @@ private ReadConf(ReadConf<T> toCopy) {
this.columnChunkMetaDataForRowGroups = toCopy.columnChunkMetaDataForRowGroups;
}

ParquetFileReader reader() {
public ParquetFileReader reader() {
if (reader != null) {
reader.setRequestedSchema(projection);
return reader;
Expand All @@ -157,35 +157,43 @@ ParquetFileReader reader() {
return newReader;
}

ParquetValueReader<T> model() {
public InputFile file() {
return file;
}

public MessageType projection() {
return projection;
}

public ParquetValueReader<T> model() {
return model;
}

VectorizedReader<T> vectorizedModel() {
public VectorizedReader<T> vectorizedModel() {
return vectorizedModel;
}

boolean[] shouldSkip() {
public boolean[] shouldSkip() {
return shouldSkip;
}

long totalValues() {
public long totalValues() {
return totalValues;
}

boolean reuseContainers() {
public boolean reuseContainers() {
return reuseContainers;
}

Integer batchSize() {
public Integer batchSize() {
return batchSize;
}

List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetadataForRowGroups() {
public List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetadataForRowGroups() {
return columnChunkMetaDataForRowGroups;
}

ReadConf<T> copy() {
public ReadConf<T> copy() {
return new ReadConf<>(this);
}

Expand Down
Loading
Loading