Skip to content

Commit efb0f9e

Browse files
committed
address review comments
1 parent 6d74b80 commit efb0f9e

File tree

6 files changed

+249
-43
lines changed

6 files changed

+249
-43
lines changed

parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1379,18 +1379,6 @@ public ReadBuilder setCustomType(int fieldId, Class<? extends StructLike> struct
13791379
return this;
13801380
}
13811381

1382-
/** Convenience method to enable comet */
1383-
public ReadBuilder enableComet(boolean enableComet) {
1384-
if (enableComet) {
1385-
this.properties.put(
1386-
VECTORIZED_READER_FACTORY,
1387-
"org.apache.iceberg.spark.parquet.CometVectorizedParquetReaderFactory");
1388-
} else {
1389-
this.properties.remove(VECTORIZED_READER_FACTORY);
1390-
}
1391-
return this;
1392-
}
1393-
13941382
/**
13951383
* Sets the vectorized reader factory class to use for reading Parquet files.
13961384
*
@@ -1472,11 +1460,11 @@ public <D> CloseableIterable<D> build() {
14721460

14731461
if (batchedReaderFunc != null) {
14741462
// Try to load custom vectorized reader factory from properties
1475-
String readerName = properties.get(VECTORIZED_READER_FACTORY);
1463+
String factoryName = properties.get(VECTORIZED_READER_FACTORY);
14761464

1477-
if (readerName != null) {
1478-
LOG.info("Loading custom vectorized reader factory: {}", readerName);
1479-
VectorizedParquetReaderFactory factory = loadReaderFactory(readerName);
1465+
if (factoryName != null) {
1466+
LOG.info("Loading custom vectorized reader factory: {}", factoryName);
1467+
VectorizedParquetReaderFactory factory = loadReaderFactory(factoryName);
14801468
if (factory != null) {
14811469
return factory.createReader(
14821470
VectorizedParquetReaderFactory.ReaderParams.builder(
@@ -1592,18 +1580,12 @@ public <D> CloseableIterable<D> build() {
15921580
private static VectorizedParquetReaderFactory loadReaderFactory(String className) {
15931581
try {
15941582
Class<?> factoryClass = Class.forName(className);
1595-
if (!VectorizedParquetReaderFactory.class.isAssignableFrom(factoryClass)) {
1583+
if (VectorizedParquetReaderFactory.class.isAssignableFrom(factoryClass)) {
1584+
return (VectorizedParquetReaderFactory) factoryClass.getDeclaredConstructor().newInstance();
1585+
} else {
15961586
LOG.warn("Class {} does not implement VectorizedParquetReaderFactory interface", className);
15971587
return null;
15981588
}
1599-
return (VectorizedParquetReaderFactory) factoryClass.getDeclaredConstructor().newInstance();
1600-
} catch (ClassNotFoundException e) {
1601-
LOG.warn("Could not find vectorized reader factory class: {}", className, e);
1602-
return null;
1603-
} catch (NoSuchMethodException e) {
1604-
LOG.warn(
1605-
"Vectorized reader factory class {} does not have a no-arg constructor", className, e);
1606-
return null;
16071589
} catch (Exception e) {
16081590
LOG.warn("Failed to instantiate vectorized reader factory: {}", className, e);
16091591
return null;

parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.nio.ByteBuffer;
3737
import java.nio.file.Path;
3838
import java.util.Collections;
39+
import java.util.Iterator;
3940
import java.util.List;
4041
import java.util.function.Function;
4142
import java.util.stream.Stream;
@@ -314,6 +315,176 @@ public void testFooterMetricsWithNameMappingForFileWithoutIds() throws IOExcepti
314315
}
315316
}
316317

318+
@Test
319+
public void testVectorizedReaderFactoryConfiguration() throws IOException {
320+
Schema schema = new Schema(optional(1, "intCol", IntegerType.get()));
321+
File file = createTempFile(temp);
322+
323+
// Write test data
324+
List<GenericData.Record> records = Lists.newArrayList();
325+
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());
326+
GenericData.Record record = new GenericData.Record(avroSchema);
327+
record.put("intCol", 42);
328+
records.add(record);
329+
330+
write(file, schema, Collections.emptyMap(), null, records.toArray(new GenericData.Record[] {}));
331+
332+
// Reset the flag
333+
TestMockVectorizedReaderFactory.wasCalled = false;
334+
335+
// Test setting vectorized reader factory
336+
Parquet.ReadBuilder readBuilder =
337+
Parquet.read(Files.localInput(file))
338+
.project(schema)
339+
.createBatchedReaderFunc(fileSchema -> new MockVectorizedReader())
340+
.vectorizedReaderFactory(MockVectorizedReaderFactory.class.getName());
341+
342+
// We can't easily verify the property directly since it's private,
343+
// but we can verify the build succeeds
344+
readBuilder.build().iterator(); // Should not throw
345+
346+
// Verify our mock factory was NOT used (because MockVectorizedReaderFactory is not a valid factory)
347+
assertThat(TestMockVectorizedReaderFactory.wasCalled)
348+
.as("TestMockVectorizedReaderFactory should not have been called")
349+
.isFalse();
350+
}
351+
352+
@Test
353+
public void testVectorizedReaderFactoryRemoveWithNull() throws IOException {
354+
Schema schema = new Schema(optional(1, "intCol", IntegerType.get()));
355+
File file = createTempFile(temp);
356+
357+
// Write test data
358+
List<GenericData.Record> records = Lists.newArrayList();
359+
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());
360+
GenericData.Record record = new GenericData.Record(avroSchema);
361+
record.put("intCol", 42);
362+
records.add(record);
363+
364+
write(file, schema, Collections.emptyMap(), null, records.toArray(new GenericData.Record[] {}));
365+
366+
// Test removing vectorized reader factory with null
367+
Parquet.ReadBuilder readBuilder =
368+
Parquet.read(Files.localInput(file))
369+
.project(schema)
370+
.createBatchedReaderFunc(fileSchema -> new MockVectorizedReader())
371+
.vectorizedReaderFactory(MockVectorizedReaderFactory.class.getName())
372+
.vectorizedReaderFactory(null); // Remove it
373+
374+
// Build should succeed and use default reader
375+
readBuilder.build().iterator(); // Should not throw
376+
}
377+
378+
@Test
379+
public void testVectorizedReaderFactoryMissingClass() throws IOException {
380+
Schema schema = new Schema(optional(1, "intCol", IntegerType.get()));
381+
File file = createTempFile(temp);
382+
383+
// Write test data
384+
List<GenericData.Record> records = Lists.newArrayList();
385+
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());
386+
GenericData.Record record = new GenericData.Record(avroSchema);
387+
record.put("intCol", 42);
388+
records.add(record);
389+
390+
write(file, schema, Collections.emptyMap(), null, records.toArray(new GenericData.Record[] {}));
391+
392+
// Test with non-existent class - should fall back to default reader
393+
Parquet.ReadBuilder readBuilder =
394+
Parquet.read(Files.localInput(file))
395+
.project(schema)
396+
.createBatchedReaderFunc(fileSchema -> new MockVectorizedReader())
397+
.vectorizedReaderFactory("com.example.NonExistentFactory");
398+
399+
// Should not throw - falls back to default reader
400+
readBuilder.build().iterator();
401+
}
402+
403+
@Test
404+
public void testVectorizedReaderFactoryInvalidClass() throws IOException {
405+
Schema schema = new Schema(optional(1, "intCol", IntegerType.get()));
406+
File file = createTempFile(temp);
407+
408+
// Write test data
409+
List<GenericData.Record> records = Lists.newArrayList();
410+
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());
411+
GenericData.Record record = new GenericData.Record(avroSchema);
412+
record.put("intCol", 42);
413+
records.add(record);
414+
415+
write(file, schema, Collections.emptyMap(), null, records.toArray(new GenericData.Record[] {}));
416+
417+
// Test with a class that doesn't implement VectorizedParquetReaderFactory
418+
Parquet.ReadBuilder readBuilder =
419+
Parquet.read(Files.localInput(file))
420+
.project(schema)
421+
.createBatchedReaderFunc(fileSchema -> new MockVectorizedReader())
422+
.vectorizedReaderFactory(InvalidReaderFactory.class.getName());
423+
424+
// Should not throw - falls back to default reader
425+
readBuilder.build().iterator();
426+
}
427+
428+
@Test
429+
public void testVectorizedReaderFactoryNoDefaultConstructor() throws IOException {
430+
Schema schema = new Schema(optional(1, "intCol", IntegerType.get()));
431+
File file = createTempFile(temp);
432+
433+
// Write test data
434+
List<GenericData.Record> records = Lists.newArrayList();
435+
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());
436+
GenericData.Record record = new GenericData.Record(avroSchema);
437+
record.put("intCol", 42);
438+
records.add(record);
439+
440+
write(file, schema, Collections.emptyMap(), null, records.toArray(new GenericData.Record[] {}));
441+
442+
// Test with a class that has no default constructor
443+
Parquet.ReadBuilder readBuilder =
444+
Parquet.read(Files.localInput(file))
445+
.project(schema)
446+
.createBatchedReaderFunc(fileSchema -> new MockVectorizedReader())
447+
.vectorizedReaderFactory(NoDefaultConstructorFactory.class.getName());
448+
449+
// Should not throw - falls back to default reader
450+
readBuilder.build().iterator();
451+
}
452+
453+
@Test
454+
public void testVectorizedReaderFactorySuccessfulLoad() throws IOException {
455+
Schema schema = new Schema(optional(1, "intCol", IntegerType.get()));
456+
File file = createTempFile(temp);
457+
458+
// Write test data
459+
List<GenericData.Record> records = Lists.newArrayList();
460+
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());
461+
GenericData.Record record = new GenericData.Record(avroSchema);
462+
record.put("intCol", 42);
463+
records.add(record);
464+
465+
write(file, schema, Collections.emptyMap(), null, records.toArray(new GenericData.Record[] {}));
466+
467+
// Reset the flag
468+
TestMockVectorizedReaderFactory.wasCalled = false;
469+
470+
// Test successful factory loading
471+
Parquet.ReadBuilder readBuilder =
472+
Parquet.read(Files.localInput(file))
473+
.project(schema)
474+
.createBatchedReaderFunc(fileSchema -> new MockVectorizedReader())
475+
.vectorizedReaderFactory(TestMockVectorizedReaderFactory.class.getName());
476+
477+
// Build and consume the reader
478+
Iterator<?> iterator = readBuilder.build().iterator();
479+
assertThat(iterator.hasNext()).isTrue();
480+
iterator.next();
481+
482+
// Verify our mock factory was actually used
483+
assertThat(TestMockVectorizedReaderFactory.wasCalled)
484+
.as("Mock factory should have been called")
485+
.isTrue();
486+
}
487+
317488
private Pair<File, Long> generateFile(
318489
Function<MessageType, ParquetValueWriter<?>> createWriterFunc,
319490
int desiredRecordCount,
@@ -354,4 +525,74 @@ private Pair<File, Long> generateFile(
354525
records.toArray(new GenericData.Record[] {}));
355526
return Pair.of(file, size);
356527
}
528+
529+
// Test helper classes
530+
531+
/** A mock VectorizedReader for testing. */
532+
public static class MockVectorizedReader implements VectorizedReader<Object> {
533+
@Override
534+
public Object read(Object reuse, int numRows) {
535+
return null;
536+
}
537+
538+
@Override
539+
public void setBatchSize(int batchSize) {
540+
// No-op
541+
}
542+
543+
@Override
544+
public void close() {
545+
// No-op
546+
}
547+
}
548+
549+
/** A mock factory class that implements VectorizedParquetReaderFactory for testing. */
550+
public static class TestMockVectorizedReaderFactory implements VectorizedParquetReaderFactory {
551+
static boolean wasCalled = false;
552+
553+
@Override
554+
public String name() {
555+
return "test-mock";
556+
}
557+
558+
@Override
559+
@SuppressWarnings("unchecked")
560+
public <T> org.apache.iceberg.io.CloseableIterable<T> createReader(ReaderParams params) {
561+
wasCalled = true;
562+
// Return a simple iterable that provides the mock data
563+
GenericData.Record record =
564+
new GenericData.Record(AvroSchemaUtil.convert(params.schema().asStruct(), "table"));
565+
record.put(0, 42);
566+
return (org.apache.iceberg.io.CloseableIterable<T>)
567+
org.apache.iceberg.io.CloseableIterable.withNoopClose(Collections.singletonList(record));
568+
}
569+
}
570+
571+
/** A mock factory class without implementing the interface. */
572+
public static class InvalidReaderFactory {
573+
public InvalidReaderFactory() {}
574+
575+
public String name() {
576+
return "invalid";
577+
}
578+
}
579+
580+
/** A mock factory class with no default constructor. */
581+
public static class NoDefaultConstructorFactory implements VectorizedParquetReaderFactory {
582+
@SuppressWarnings("unused")
583+
public NoDefaultConstructorFactory(String unusedParam) {}
584+
585+
@Override
586+
public String name() {
587+
return "no-default";
588+
}
589+
590+
@Override
591+
public <T> org.apache.iceberg.io.CloseableIterable<T> createReader(ReaderParams params) {
592+
return null;
593+
}
594+
}
595+
596+
/** A simple reference class that can be loaded. */
597+
public static class MockVectorizedReaderFactory {}
357598
}

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,12 +139,6 @@ public PartitionReaderFactory createReaderFactory() {
139139
private ParquetBatchReadConf parquetBatchReadConf(ParquetReaderType readerType) {
140140
String factoryClassName = readConf.parquetVectorizedReaderFactory();
141141

142-
// If no explicit factory is set and reader type is COMET, use the default Comet factory
143-
if (factoryClassName == null && readerType == ParquetReaderType.COMET) {
144-
factoryClassName =
145-
org.apache.iceberg.spark.SparkSQLProperties.COMET_VECTORIZED_READER_FACTORY_CLASS;
146-
}
147-
148142
ImmutableParquetBatchReadConf.Builder builder =
149143
ImmutableParquetBatchReadConf.builder()
150144
.batchSize(readConf.parquetBatchSize())

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,12 +139,6 @@ public PartitionReaderFactory createReaderFactory() {
139139
private ParquetBatchReadConf parquetBatchReadConf(ParquetReaderType readerType) {
140140
String factoryClassName = readConf.parquetVectorizedReaderFactory();
141141

142-
// If no explicit factory is set and reader type is COMET, use the default Comet factory
143-
if (factoryClassName == null && readerType == ParquetReaderType.COMET) {
144-
factoryClassName =
145-
org.apache.iceberg.spark.SparkSQLProperties.COMET_VECTORIZED_READER_FACTORY_CLASS;
146-
}
147-
148142
ImmutableParquetBatchReadConf.Builder builder =
149143
ImmutableParquetBatchReadConf.builder()
150144
.batchSize(readConf.parquetBatchSize())

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,7 @@ public String parquetVectorizedReaderFactory() {
380380
return confParser
381381
.stringConf()
382382
.sessionConf(SparkSQLProperties.PARQUET_VECTORIZED_READER_FACTORY)
383+
.defaultValue(SparkSQLProperties.COMET_VECTORIZED_READER_FACTORY_CLASS)
383384
.parseOptional();
384385
}
385386
}

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,12 +139,6 @@ public PartitionReaderFactory createReaderFactory() {
139139
private ParquetBatchReadConf parquetBatchReadConf(ParquetReaderType readerType) {
140140
String factoryClassName = readConf.parquetVectorizedReaderFactory();
141141

142-
// If no explicit factory is set and reader type is COMET, use the default Comet factory
143-
if (factoryClassName == null && readerType == ParquetReaderType.COMET) {
144-
factoryClassName =
145-
org.apache.iceberg.spark.SparkSQLProperties.COMET_VECTORIZED_READER_FACTORY_CLASS;
146-
}
147-
148142
ImmutableParquetBatchReadConf.Builder builder =
149143
ImmutableParquetBatchReadConf.builder()
150144
.batchSize(readConf.parquetBatchSize())

0 commit comments

Comments
 (0)