Skip to content

Commit 0256f4a

Browse files
committed
Create a marker class for the Comet reader and a few extra nits
1 parent c3babfd commit 0256f4a

File tree

9 files changed

+48
-51
lines changed

9 files changed

+48
-51
lines changed

arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowFormatModels.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public static void register() {
2828
new ParquetFormatModel<>(
2929
ColumnarBatch.class,
3030
Object.class,
31-
(schema, messageType, constantValues, properties) ->
31+
(schema, messageType, idToConstant) ->
3232
ArrowReader.VectorizedCombinedScanIterator.buildReader(
3333
schema,
3434
messageType, /* setArrowValidityVector */

core/src/main/java/org/apache/iceberg/formats/FormatModel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public interface FormatModel<D, S> {
5858
*
5959
* @return the type of the data structures handled by this model implementation
6060
*/
61-
Class<D> type();
61+
Class<? extends D> type();
6262

6363
/**
6464
* Return the schema type class for the object model implementation processed by this factory.

core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public static synchronized void register(FormatModel<?, ?> formatModel) {
123123
* @return a configured reader builder for the specified format and object model
124124
*/
125125
public static <D, S> ReadBuilder<D, S> readBuilder(
126-
FileFormat format, Class<D> type, InputFile inputFile) {
126+
FileFormat format, Class<? extends D> type, InputFile inputFile) {
127127
FormatModel<D, S> factory = factoryFor(format, type);
128128
return factory.readBuilder(inputFile);
129129
}
@@ -144,7 +144,7 @@ public static <D, S> ReadBuilder<D, S> readBuilder(
144144
* @return a configured data write builder for creating a {@link DataWriter}
145145
*/
146146
public static <D, S> DataWriteBuilder<D, S> dataWriteBuilder(
147-
FileFormat format, Class<D> type, EncryptedOutputFile outputFile) {
147+
FileFormat format, Class<? extends D> type, EncryptedOutputFile outputFile) {
148148
FormatModel<D, S> factory = factoryFor(format, type);
149149
return CommonWriteBuilderImpl.forDataFile(
150150
factory.writeBuilder(outputFile), outputFile.encryptingOutputFile().location(), format);
@@ -198,7 +198,7 @@ public static PositionDeleteWriteBuilder positionDeleteWriteBuilder(
198198
}
199199

200200
@SuppressWarnings("unchecked")
201-
private static <D, S> FormatModel<D, S> factoryFor(FileFormat format, Class<D> type) {
201+
private static <D, S> FormatModel<D, S> factoryFor(FileFormat format, Class<? extends D> type) {
202202
FormatModel<D, S> model = (FormatModel<D, S>) MODELS.get(Pair.of(format, type));
203203
Preconditions.checkArgument(
204204
model != null, "Format model is not registered for format %s and type %s", format, type);

orc/src/main/java/org/apache/iceberg/orc/ORC.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,7 @@ public static class ReadBuilder {
703703
private Function<TypeDescription, OrcRowReader<?>> readerFunc;
704704
private Function<TypeDescription, OrcBatchReader<?>> batchedReaderFunc;
705705
private int recordsPerBatch = VectorizedRowBatch.DEFAULT_SIZE;
706-
private Set<Integer> constantFieldIds = ImmutableSet.of();
706+
private Set<Integer> idToConstant = ImmutableSet.of();
707707

708708
private ReadBuilder(InputFile file) {
709709
Preconditions.checkNotNull(file, "Input file cannot be null");
@@ -780,8 +780,8 @@ public ReadBuilder withNameMapping(NameMapping newNameMapping) {
780780
return this;
781781
}
782782

783-
ReadBuilder constantValues(Set<Integer> newConstantFieldIds) {
784-
this.constantFieldIds = newConstantFieldIds;
783+
ReadBuilder idToConstant(Set<Integer> newIdToConstant) {
784+
this.idToConstant = newIdToConstant;
785785
return this;
786786
}
787787

@@ -792,8 +792,7 @@ public <D> CloseableIterable<D> build() {
792792
conf,
793793
// This is a behavioral change. Previously there were an error if metadata columns were
794794
// present in the schema, now they are removed and the correct reader is created
795-
TypeUtil.selectNot(
796-
schema, Sets.union(constantFieldIds, MetadataColumns.metadataFieldIds())),
795+
TypeUtil.selectNot(schema, Sets.union(idToConstant, MetadataColumns.metadataFieldIds())),
797796
nameMapping,
798797
start,
799798
length,

orc/src/main/java/org/apache/iceberg/orc/ORCFormatModel.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,14 +103,13 @@ public ReadBuilder<D, S> readBuilder(InputFile inputFile) {
103103

104104
@FunctionalInterface
105105
public interface ReaderFunction<D> {
106-
OrcRowReader<D> read(
107-
Schema schema, TypeDescription messageType, Map<Integer, ?> constantValues);
106+
OrcRowReader<D> read(Schema schema, TypeDescription messageType, Map<Integer, ?> idToConstant);
108107
}
109108

110109
@FunctionalInterface
111110
public interface BatchReaderFunction<D> {
112111
OrcBatchReader<D> read(
113-
Schema schema, TypeDescription messageType, Map<Integer, ?> constantValues);
112+
Schema schema, TypeDescription messageType, Map<Integer, ?> idToConstant);
114113
}
115114

116115
@FunctionalInterface
@@ -180,7 +179,7 @@ public ReadBuilder<D, S> recordsPerBatch(int numRowsPerBatch) {
180179

181180
@Override
182181
public ReadBuilder<D, S> idToConstant(Map<Integer, ?> newIdToConstant) {
183-
internal.constantValues(newIdToConstant.keySet());
182+
internal.idToConstant(newIdToConstant.keySet());
184183
this.idToConstant = newIdToConstant;
185184
return this;
186185
}

parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,14 @@
4545
public class ParquetFormatModel<D, S> implements FormatModel<D, S> {
4646
public static final String WRITER_VERSION_KEY = "parquet.writer.version";
4747

48-
private final Class<D> type;
48+
private final Class<? extends D> type;
4949
private final Class<S> schemaType;
5050
private final ReaderFunction<D> readerFunction;
5151
private final BatchReaderFunction<D> batchReaderFunction;
5252
private final WriterFunction<S> writerFunction;
5353

5454
private ParquetFormatModel(
55-
Class<D> type,
55+
Class<? extends D> type,
5656
Class<S> schemaType,
5757
ReaderFunction<D> readerFunction,
5858
BatchReaderFunction<D> batchReaderFunction,
@@ -77,7 +77,7 @@ public ParquetFormatModel(
7777
}
7878

7979
public ParquetFormatModel(
80-
Class<D> type, Class<S> schemaType, BatchReaderFunction<D> batchReaderFunction) {
80+
Class<? extends D> type, Class<S> schemaType, BatchReaderFunction<D> batchReaderFunction) {
8181
this(type, schemaType, null, batchReaderFunction, null);
8282
}
8383

@@ -87,7 +87,7 @@ public FileFormat format() {
8787
}
8888

8989
@Override
90-
public Class<D> type() {
90+
public Class<? extends D> type() {
9191
return type;
9292
}
9393

@@ -109,16 +109,12 @@ public ReadBuilder<D, S> readBuilder(InputFile inputFile) {
109109
@FunctionalInterface
110110
public interface ReaderFunction<D> {
111111
ParquetValueReader<D> read(
112-
Schema schema, MessageType messageType, Map<Integer, ?> constantValues);
112+
Schema schema, MessageType messageType, Map<Integer, ?> idToConstant);
113113
}
114114

115115
@FunctionalInterface
116116
public interface BatchReaderFunction<D> {
117-
VectorizedReader<D> read(
118-
Schema schema,
119-
MessageType messageType,
120-
Map<Integer, ?> constantValues,
121-
Map<String, String> config);
117+
VectorizedReader<D> read(Schema schema, MessageType messageType, Map<Integer, ?> idToConstant);
122118
}
123119

124120
@FunctionalInterface
@@ -323,7 +319,7 @@ public CloseableIterable<D> build() {
323319
return internal
324320
.createBatchedReaderFunc(
325321
(icebergSchema, messageType) ->
326-
batchReaderFunction.read(icebergSchema, messageType, idToConstant, config))
322+
batchReaderFunction.read(icebergSchema, messageType, idToConstant))
327323
.build();
328324
} else {
329325
throw new IllegalStateException("Either readerFunction or batchReaderFunction must be set");

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
import org.apache.iceberg.arrow.vectorized.VectorizedReaderBuilder;
2929
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
3030
import org.apache.iceberg.parquet.VectorizedReader;
31-
import org.apache.iceberg.spark.ParquetReaderType;
3231
import org.apache.iceberg.spark.SparkUtil;
3332
import org.apache.parquet.schema.MessageType;
33+
import org.apache.spark.sql.vectorized.ColumnVector;
3434
import org.apache.spark.sql.vectorized.ColumnarBatch;
3535
import org.slf4j.Logger;
3636
import org.slf4j.LoggerFactory;
@@ -43,8 +43,6 @@ public class VectorizedSparkParquetReaders {
4343
private static final String ENABLE_NULL_CHECK_FOR_GET = "arrow.enable_null_check_for_get";
4444
private static final String ENABLE_NULL_CHECK_FOR_GET_ENV = "ARROW_ENABLE_NULL_CHECK_FOR_GET";
4545

46-
public static final String PARQUET_READER_TYPE = "parquet.reader.type";
47-
4846
static {
4947
try {
5048
enableUnsafeMemoryAccess();
@@ -56,18 +54,6 @@ public class VectorizedSparkParquetReaders {
5654

5755
private VectorizedSparkParquetReaders() {}
5856

59-
public static VectorizedReader<ColumnarBatch> buildReader(
60-
Schema expectedSchema,
61-
MessageType fileSchema,
62-
Map<Integer, ?> idToConstant,
63-
Map<String, String> config) {
64-
if (ParquetReaderType.COMET.name().equals(config.get(PARQUET_READER_TYPE))) {
65-
return buildCometReader(expectedSchema, fileSchema, idToConstant);
66-
} else {
67-
return buildReader(expectedSchema, fileSchema, idToConstant);
68-
}
69-
}
70-
7157
public static ColumnarBatchReader buildReader(
7258
Schema expectedSchema,
7359
MessageType fileSchema,
@@ -91,9 +77,9 @@ public static ColumnarBatchReader buildReader(
9177
return buildReader(expectedSchema, fileSchema, idToConstant, ArrowAllocation.rootAllocator());
9278
}
9379

94-
public static CometColumnarBatchReader buildCometReader(
80+
public static VectorizedReader<ColumnarBatch> buildCometReader(
9581
Schema expectedSchema, MessageType fileSchema, Map<Integer, ?> idToConstant) {
96-
return (CometColumnarBatchReader)
82+
return (VectorizedReader<ColumnarBatch>)
9783
TypeWithSchemaVisitor.visit(
9884
expectedSchema.asStruct(),
9985
fileSchema,
@@ -104,6 +90,13 @@ public static CometColumnarBatchReader buildCometReader(
10490
readers -> new CometColumnarBatchReader(readers, expectedSchema)));
10591
}
10692

93+
/** A subclass of ColumnarBatch to identify Comet readers. */
94+
public static class CometColumnarBatch extends ColumnarBatch {
95+
public CometColumnarBatch(ColumnVector[] columns) {
96+
super(columns);
97+
}
98+
}
99+
107100
// enables unsafe memory access to avoid costly checks to see if index is within bounds
108101
// as long as it is not configured explicitly (see BoundsChecking in Arrow)
109102
private static void enableUnsafeMemoryAccess() {

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
3636
import org.apache.iceberg.spark.OrcBatchReadConf;
3737
import org.apache.iceberg.spark.ParquetBatchReadConf;
38+
import org.apache.iceberg.spark.ParquetReaderType;
3839
import org.apache.iceberg.spark.data.vectorized.ColumnVectorWithFilter;
3940
import org.apache.iceberg.spark.data.vectorized.ColumnarBatchUtil;
4041
import org.apache.iceberg.spark.data.vectorized.UpdatableDeletedColumnVector;
@@ -72,17 +73,20 @@ protected CloseableIterable<ColumnarBatch> newBatchIterable(
7273
Expression residual,
7374
Map<Integer, ?> idToConstant,
7475
@Nonnull SparkDeleteFilter deleteFilter) {
75-
ReadBuilder<ColumnarBatch, ?> readBuilder =
76-
FormatModelRegistry.readBuilder(format, ColumnarBatch.class, inputFile);
76+
ReadBuilder<ColumnarBatch, ?> readBuilder;
7777
if (parquetConf != null) {
7878
readBuilder =
79-
readBuilder
80-
.recordsPerBatch(parquetConf.batchSize())
81-
.set(
82-
VectorizedSparkParquetReaders.PARQUET_READER_TYPE,
83-
parquetConf.readerType().name());
84-
} else if (orcConf != null) {
85-
readBuilder = readBuilder.recordsPerBatch(orcConf.batchSize());
79+
parquetConf.readerType() == ParquetReaderType.COMET
80+
? FormatModelRegistry.readBuilder(
81+
format, VectorizedSparkParquetReaders.CometColumnarBatch.class, inputFile)
82+
: FormatModelRegistry.readBuilder(format, ColumnarBatch.class, inputFile);
83+
84+
readBuilder = readBuilder.recordsPerBatch(parquetConf.batchSize());
85+
} else {
86+
readBuilder = FormatModelRegistry.readBuilder(format, ColumnarBatch.class, inputFile);
87+
if (orcConf != null) {
88+
readBuilder = readBuilder.recordsPerBatch(orcConf.batchSize());
89+
}
8690
}
8791

8892
CloseableIterable<ColumnarBatch> iterable =

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ public static void register() {
5555
new ParquetFormatModel<>(
5656
ColumnarBatch.class, StructType.class, VectorizedSparkParquetReaders::buildReader));
5757

58+
FormatModelRegistry.register(
59+
new ParquetFormatModel<>(
60+
VectorizedSparkParquetReaders.CometColumnarBatch.class,
61+
StructType.class,
62+
VectorizedSparkParquetReaders::buildCometReader));
63+
5864
FormatModelRegistry.register(
5965
new ORCFormatModel<>(
6066
InternalRow.class,

0 commit comments

Comments
 (0)