Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
ccf5dd3
current progress
Oct 16, 2025
add1a90
seems to be working for spark non vectorized and avro
Oct 17, 2025
3b2ceca
filters working
Oct 17, 2025
eaad2b1
prevent overflow
Oct 17, 2025
5600b37
use read support instead of mapping function
Oct 20, 2025
018a11e
use repaired schema instead of doing operations after we read the data
Oct 20, 2025
c9bdd39
add spark log support
Oct 20, 2025
a8430cf
remove find cols to multiply class
Oct 20, 2025
2dde0fc
log file changes as requested and set up the read supports for spark …
Oct 21, 2025
36c0d1c
hive working
Oct 21, 2025
d7d6dd5
add individual test and fix issue with dropping messagetype logical
Oct 22, 2025
04c6ce5
revert calls for rewrite avro with extra param
Oct 22, 2025
00c85e5
revert config to prevent timestamp evolutions
Oct 22, 2025
195f90e
A few fixes
Oct 22, 2025
3f6b4a8
fix bug with field reuse in avro schema repair
Oct 22, 2025
93d005a
fix read parquet log block timestamp ntz
Oct 22, 2025
47ce37e
allow long to timestampntz without cast
Oct 22, 2025
036d9cc
refactor AvroSchemaRepair for performance and add unit tests
Oct 23, 2025
bdff8ff
refactor schema repair for performance and add testing
Oct 23, 2025
b8f889e
try fix other spark versions
Oct 23, 2025
3c401e5
fix spark 3.3 build
Oct 24, 2025
2d6d1ea
fix spark3.4 build
Oct 24, 2025
6a47cc9
hopefully fix spark4
Oct 24, 2025
11e0422
fix issue with union schema, add table schema to cdc in missing place
Oct 24, 2025
ead216c
add spark cow read testing for repair
Oct 24, 2025
5f7512c
building, and add spark mor tests
Oct 24, 2025
6ec7eb5
forgot to add the zips
Oct 24, 2025
7bf05a9
cow write testing
Oct 25, 2025
5167319
add mor testing
Oct 25, 2025
31e5a59
disable tests for spark 3.3
Oct 26, 2025
f860355
fix for spark 3.4
Oct 26, 2025
37bec3f
fix spark33 for real
Oct 26, 2025
a8dfeff
remove fg reader test
Oct 26, 2025
7c07204
remove unneeded avro utils change
Oct 26, 2025
cb8a8fe
fix spark 4
Oct 27, 2025
616898d
fix timestamps in deltastreamer test
Oct 27, 2025
d0d502a
fix failing test
Oct 27, 2025
52fced7
vectorized fallback for 3.3 and 3.4
Oct 28, 2025
f0cb0c9
fix vectorized fallback
Oct 28, 2025
0bd08f0
add testing, and also fallback for local timestamp millis
Oct 28, 2025
fb711a0
add tests from java-parquet
Oct 28, 2025
bab0c7a
replace import with hardcode
Oct 28, 2025
e6fba1b
fix long import names
Oct 29, 2025
7deced3
address most review comments
Oct 29, 2025
3a95184
Fixing spark3.3 reads
nsivabalan Nov 5, 2025
58322eb
minor renames
nsivabalan Nov 5, 2025
38398f1
Adding java docs
nsivabalan Nov 5, 2025
6aafc13
java docs
nsivabalan Nov 5, 2025
5153d4e
refactor repairFooterSchema to a common module
Nov 5, 2025
8a0759c
rename resolveNullableSchema method
Nov 5, 2025
2911d07
add check for no compactions and clustering on test table
Nov 5, 2025
d23db24
Add checks for enableLogicalTimestampFieldRepair
Nov 5, 2025
a32494f
Revert pom changes
yihua Nov 6, 2025
6eec8a7
Revert "refactor repairFooterSchema to a common module"
yihua Nov 6, 2025
8ccd313
Fix scalastyle
yihua Nov 6, 2025
3456728
Fix build on Spark 3.3 and 3.4
yihua Nov 6, 2025
bee84af
Fix TestHoodieDeltaStreamer
yihua Nov 6, 2025
9f40c75
Fix licences
yihua Nov 6, 2025
11a2dce
Fix Spark 4
yihua Nov 7, 2025
eabdc23
Fix Spark 4 and renames
yihua Nov 7, 2025
cdf8624
Fix Spark40LegacyHoodieParquetFileFormat
yihua Nov 7, 2025
36ac25f
Add flag for repairing loogical timestamp in avro log file reader
Nov 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
import static org.apache.hudi.avro.AvroSchemaUtils.getNonNullTypeFromUnion;
import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.LAZY;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
Expand Down Expand Up @@ -1019,7 +1019,7 @@ static void validateSecondaryIndexSchemaEvolution(

if (writerField != null && !tableField.schema().equals(writerField.schema())) {
// Check if this is just making the field nullable/non-nullable, which is safe from SI perspective
if (resolveNullableSchema(tableField.schema()).equals(resolveNullableSchema(writerField.schema()))) {
if (getNonNullTypeFromUnion(tableField.schema()).equals(getNonNullTypeFromUnion(writerField.schema()))) {
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public StorageConfiguration<?> getStorageConf() {

@Override
public HoodieReaderContext<ArrayWritable> getHoodieReaderContext(String tablePath, Schema avroSchema, StorageConfiguration<?> storageConf, HoodieTableMetaClient metaClient) {
HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator readerCreator = (inputSplit, jobConf) -> new MapredParquetInputFormat().getRecordReader(inputSplit, jobConf, null);
HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator readerCreator = (inputSplit, jobConf, dataSchema) -> new MapredParquetInputFormat().getRecordReader(inputSplit, jobConf, null);
JobConf jobConf = new JobConf(storageConf.unwrapAs(Configuration.class));
setupJobconf(jobConf, avroSchema);
return new HiveHoodieReaderContext(readerCreator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.avro.Schema;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
Expand Down Expand Up @@ -268,7 +269,11 @@ private static void assertWritablePrimaryTypeMatchesSchema(Schema schema, Writab
break;

case LONG:
assertInstanceOf(LongWritable.class, writable);
if (schema.getLogicalType() instanceof LogicalTypes.TimestampMillis) {
assertInstanceOf(TimestampWritable.class, writable);
} else {
assertInstanceOf(LongWritable.class, writable);
}
break;

case FLOAT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.io.storage;

import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieFileFormat;
Expand All @@ -40,6 +41,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.SchemaRepair;
import org.apache.spark.sql.HoodieInternalRowUtils;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
Expand All @@ -60,13 +62,15 @@

import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
import static org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter;

public class HoodieSparkParquetReader implements HoodieSparkFileReader {

private final StoragePath path;
private final HoodieStorage storage;
private final FileFormatUtils parquetUtils;
private final List<ClosableIterator> readerIterators = new ArrayList<>();
private Option<MessageType> fileSchemaOption = Option.empty();
private Option<StructType> structTypeOption = Option.empty();
private Option<Schema> schemaOption = Option.empty();

Expand Down Expand Up @@ -116,19 +120,20 @@ public ClosableIterator<String> getRecordKeyIterator() throws IOException {
}

public ClosableIterator<UnsafeRow> getUnsafeRowIterator(Schema requestedSchema) throws IOException {
return getUnsafeRowIterator(HoodieInternalRowUtils.getCachedSchema(requestedSchema));
}

public ClosableIterator<UnsafeRow> getUnsafeRowIterator(StructType requestedSchema) throws IOException {
SparkBasicSchemaEvolution evolution = new SparkBasicSchemaEvolution(getStructSchema(), requestedSchema, SQLConf.get().sessionLocalTimeZone());
Schema nonNullSchema = AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema);
StructType structSchema = HoodieInternalRowUtils.getCachedSchema(nonNullSchema);
Option<MessageType> messageSchema = Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema));
boolean enableTimestampFieldRepair = storage.getConf().getBoolean("logicalTimestampField.repair.enable", true);
StructType dataStructType = convertToStruct(enableTimestampFieldRepair ? SchemaRepair.repairLogicalTypes(getFileSchema(), messageSchema) : getFileSchema());
SparkBasicSchemaEvolution evolution = new SparkBasicSchemaEvolution(dataStructType, structSchema, SQLConf.get().sessionLocalTimeZone());
String readSchemaJson = evolution.getRequestSchema().json();
storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA, readSchemaJson);
storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(), readSchemaJson);
storage.getConf().set(SQLConf.PARQUET_BINARY_AS_STRING().key(), SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()).toString());
storage.getConf().set(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()).toString());
ParquetReader<InternalRow> reader = ParquetReader.builder(new HoodieParquetReadSupport(Option$.MODULE$.empty(), true,
ParquetReader<InternalRow> reader = ParquetReader.builder(new HoodieParquetReadSupport(Option$.MODULE$.empty(), true, true,
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("CORRECTED"),
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("LEGACY")),
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("LEGACY"), messageSchema),
new Path(path.toUri()))
.withConf(storage.getConf().unwrapAs(Configuration.class))
.build();
Expand All @@ -139,15 +144,22 @@ public ClosableIterator<UnsafeRow> getUnsafeRowIterator(StructType requestedSche
return projectedIterator;
}

private MessageType getFileSchema() {
if (fileSchemaOption.isEmpty()) {
MessageType messageType = ((ParquetUtils) parquetUtils).readSchema(storage, path);
fileSchemaOption = Option.of(messageType);
}
return fileSchemaOption.get();
}

@Override
public Schema getSchema() {
if (schemaOption.isEmpty()) {
// Some types in avro are not compatible with parquet.
// Avro only supports representing Decimals as fixed byte array
// and therefore if we convert to Avro directly we'll lose logical type-info.
MessageType messageType = ((ParquetUtils) parquetUtils).readSchema(storage, path);
StructType structType = new ParquetToSparkSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(messageType);
structTypeOption = Option.of(structType);
MessageType messageType = getFileSchema();
StructType structType = getStructSchema();
schemaOption = Option.of(SparkAdapterSupport$.MODULE$.sparkAdapter()
.getAvroSchemaConverters()
.toAvroType(structType, true, messageType.getName(), StringUtils.EMPTY_STRING));
Expand All @@ -157,11 +169,16 @@ public Schema getSchema() {

protected StructType getStructSchema() {
if (structTypeOption.isEmpty()) {
getSchema();
MessageType messageType = getFileSchema();
structTypeOption = Option.of(convertToStruct(messageType));
}
return structTypeOption.get();
}

private StructType convertToStruct(MessageType messageType) {
return new ParquetToSparkSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(messageType);
}

@Override
public void close() {
readerIterators.forEach(ClosableIterator::close);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
import scala.Enumeration;
import scala.Function1;

import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
import static org.apache.hudi.avro.AvroSchemaUtils.getNonNullTypeFromUnion;
import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_FIELD_ID_WRITE_ENABLED;
import static org.apache.hudi.config.HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD;
import static org.apache.hudi.config.HoodieWriteConfig.AVRO_SCHEMA_STRING;
Expand Down Expand Up @@ -226,7 +226,7 @@ private void writeFields(InternalRow row, StructType schema, ValueWriter[] field
}

private ValueWriter makeWriter(Schema avroSchema, DataType dataType) {
Schema resolvedSchema = avroSchema == null ? null : resolveNullableSchema(avroSchema);
Schema resolvedSchema = avroSchema == null ? null : getNonNullTypeFromUnion(avroSchema);
LogicalType logicalType = resolvedSchema != null ? resolvedSchema.getLogicalType() : null;

if (dataType == DataTypes.BooleanType) {
Expand Down Expand Up @@ -429,7 +429,7 @@ private Type convertField(Schema avroFieldSchema, StructField structField) {
}

private Type convertField(Schema avroFieldSchema, StructField structField, Type.Repetition repetition) {
Schema resolvedSchema = avroFieldSchema == null ? null : resolveNullableSchema(avroFieldSchema);
Schema resolvedSchema = avroFieldSchema == null ? null : getNonNullTypeFromUnion(avroFieldSchema);
LogicalType logicalType = resolvedSchema != null ? resolvedSchema.getLogicalType() : null;

DataType dataType = structField.dataType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ object AvroConversionUtils {
recordNamespace: String): Row => GenericRecord = {
val serde = getCatalystRowSerDe(sourceSqlType)
val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(sourceSqlType, structName, recordNamespace)
val nullable = AvroSchemaUtils.resolveNullableSchema(avroSchema) != avroSchema
val nullable = AvroSchemaUtils.getNonNullTypeFromUnion(avroSchema) != avroSchema

val converter = AvroConversionUtils.createInternalRowToAvroConverter(sourceSqlType, avroSchema, nullable)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi
// making Spark deserialize its internal representation [[InternalRow]] into [[Row]] for subsequent conversion
// (and back)
val sameSchema = writerAvroSchema.equals(readerAvroSchema)
val nullable = AvroSchemaUtils.resolveNullableSchema(writerAvroSchema) != writerAvroSchema
val nullable = AvroSchemaUtils.getNonNullTypeFromUnion(writerAvroSchema) != writerAvroSchema

// NOTE: We have to serialize Avro schema, and then subsequently parse it on the executor node, since Spark
// serializer is not able to digest it
Expand Down Expand Up @@ -160,7 +160,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi
// making Spark deserialize its internal representation [[InternalRow]] into [[Row]] for subsequent conversion
// (and back)
val sameSchema = writerAvroSchema.equals(readerAvroSchema)
val nullable = AvroSchemaUtils.resolveNullableSchema(writerAvroSchema) != writerAvroSchema
val nullable = AvroSchemaUtils.getNonNullTypeFromUnion(writerAvroSchema) != writerAvroSchema

// NOTE: We have to serialize Avro schema, and then subsequently parse it on the executor node, since Spark
// serializer is not able to digest it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import org.apache.hudi.common.util.collection.{CachingIterator, ClosableIterator
import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory, HoodieSparkParquetReader}
import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, StoragePath}
import org.apache.hudi.util.CloseableInternalRowIterator

import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter
import org.apache.spark.sql.HoodieInternalRowUtils
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.JoinedRow
Expand Down Expand Up @@ -70,27 +73,36 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
override def getFileRecordIterator(filePath: StoragePath,
start: Long,
length: Long,
dataSchema: Schema,
dataSchema: Schema, // dataSchema refers to table schema in most cases(non log file reads).
requiredSchema: Schema,
storage: HoodieStorage): ClosableIterator[InternalRow] = {
val hasRowIndexField = AvroSchemaUtils.containsFieldInSchema(requiredSchema, ROW_INDEX_TEMPORARY_COLUMN_NAME)
if (hasRowIndexField) {
assert(getRecordContext.supportsParquetRowIndex())
}
val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
if (FSUtils.isLogFile(filePath)) {
// TODO: introduce pk filter in log file reader
new HoodieSparkFileReaderFactory(storage).newParquetFileReader(filePath)
.asInstanceOf[HoodieSparkParquetReader].getUnsafeRowIterator(structType).asInstanceOf[ClosableIterator[InternalRow]]
.asInstanceOf[HoodieSparkParquetReader].getUnsafeRowIterator(requiredSchema).asInstanceOf[ClosableIterator[InternalRow]]
} else {
val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
// partition value is empty because the spark parquet reader will append the partition columns to
// each row if they are given. That is the only usage of the partition values in the reader.
val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
.createPartitionedFile(InternalRow.empty, filePath, start, length)
val (readSchema, readFilters) = getSchemaAndFiltersForRead(structType, hasRowIndexField)

// Convert Avro dataSchema to Parquet MessageType for timestamp precision conversion
val tableSchemaOpt = if (dataSchema != null) {
val hadoopConf = storage.getConf.unwrapAs(classOf[Configuration])
val parquetSchema = getAvroSchemaConverter(hadoopConf).convert(dataSchema)
org.apache.hudi.common.util.Option.of(parquetSchema)
} else {
org.apache.hudi.common.util.Option.empty[org.apache.parquet.schema.MessageType]()
}
new CloseableInternalRowIterator(baseFileReader.read(fileInfo,
readSchema, StructType(Seq.empty), getSchemaHandler.getInternalSchemaOpt,
readFilters, storage.getConf.asInstanceOf[StorageConfiguration[Configuration]]))
readFilters, storage.getConf.asInstanceOf[StorageConfiguration[Configuration]], tableSchemaOpt))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hudi.common.util
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.storage.StorageConfiguration
import org.apache.parquet.schema.MessageType
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
Expand All @@ -37,12 +38,14 @@ trait SparkColumnarFileReader extends Serializable {
* @param internalSchemaOpt option of internal schema for schema.on.read
* @param filters filters for data skipping. Not guaranteed to be used; the spark plan will also apply the filters.
* @param storageConf the hadoop conf
* @param tableSchemaOpt option of table schema for timestamp precision conversion
* @return iterator of rows read from the file output type says [[InternalRow]] but could be [[ColumnarBatch]]
*/
def read(file: PartitionedFile,
requiredSchema: StructType,
partitionSchema: StructType,
internalSchemaOpt: util.Option[InternalSchema],
filters: Seq[Filter],
storageConf: StorageConfiguration[Configuration]): Iterator[InternalRow]
storageConf: StorageConfiguration[Configuration],
tableSchemaOpt: util.Option[MessageType] = util.Option.empty()): Iterator[InternalRow]
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.parquet.hadoop.metadata.FileMetaData
import org.apache.spark.sql.HoodieSchemaUtils
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{ArrayTransform, Attribute, Cast, CreateNamedStruct, CreateStruct, Expression, GetStructField, LambdaFunction, Literal, MapEntries, MapFromEntries, NamedLambdaVariable, UnsafeProjection}
import org.apache.spark.sql.types.{ArrayType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{ArrayType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, StringType, StructField, StructType, TimestampNTZType}

object HoodieParquetFileFormatHelper {

Expand Down Expand Up @@ -58,6 +58,9 @@ object HoodieParquetFileFormatHelper {
def isDataTypeEqual(requiredType: DataType, fileType: DataType): Boolean = (requiredType, fileType) match {
case (requiredType, fileType) if requiredType == fileType => true

// prevent illegal cast
case (TimestampNTZType, LongType) => true

case (ArrayType(rt, _), ArrayType(ft, _)) =>
// Do not care about nullability as schema evolution require fields to be nullable
isDataTypeEqual(rt, ft)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.hudi.common.util.ValidationUtils

import org.apache.parquet.hadoop.api.InitContext
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
import org.apache.parquet.schema.{GroupType, MessageType, Type, Types}
import org.apache.parquet.schema.{GroupType, MessageType, SchemaRepair, Type, Types}
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec

import java.time.ZoneId
Expand All @@ -34,17 +34,24 @@ import scala.collection.JavaConverters._
class HoodieParquetReadSupport(
convertTz: Option[ZoneId],
enableVectorizedReader: Boolean,
val enableTimestampFieldRepair: Boolean,
datetimeRebaseSpec: RebaseSpec,
int96RebaseSpec: RebaseSpec)
int96RebaseSpec: RebaseSpec,
tableSchemaOpt: org.apache.hudi.common.util.Option[org.apache.parquet.schema.MessageType] = org.apache.hudi.common.util.Option.empty())
extends ParquetReadSupport(convertTz, enableVectorizedReader, datetimeRebaseSpec, int96RebaseSpec) with SparkAdapterSupport {

override def init(context: InitContext): ReadContext = {
val readContext = super.init(context)
val requestedParquetSchema = readContext.getRequestedSchema
// repair is needed here because this is the schema that is used by the reader to decide what
// conversions are necessary
val requestedParquetSchema = if (enableTimestampFieldRepair) {
SchemaRepair.repairLogicalTypes(readContext.getRequestedSchema, tableSchemaOpt)
} else {
readContext.getRequestedSchema
}
val trimmedParquetSchema = HoodieParquetReadSupport.trimParquetSchema(requestedParquetSchema, context.getFileSchema)
new ReadContext(trimmedParquetSchema, readContext.getReadSupportMetadata)
}

}

object HoodieParquetReadSupport {
Expand Down
Loading
Loading