Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
e46d157
current progress
Oct 16, 2025
513e8a1
seems to be working for spark non vectorized and avro
Oct 17, 2025
bb4a278
filters working
Oct 17, 2025
814d442
prevent overflow
Oct 17, 2025
3ed9ee7
use read support instead of mapping function
Oct 20, 2025
c6c6caa
use repaired schema instead of doing operations after we read the data
Oct 20, 2025
2f447c2
add spark log support
Oct 20, 2025
be727d9
remove find cols to multiply class
Oct 20, 2025
639e57c
log file changes as requested and set up the read supports for spark …
Oct 21, 2025
c1179df
hive working
Oct 21, 2025
50a64cd
add individual test and fix issue with dropping messagetype logical
Oct 22, 2025
7441315
revert calls for rewrite avro with extra param
Oct 22, 2025
0b83171
revert config to prevent timestamp evolutions
Oct 22, 2025
2450d9b
A few fixes
Oct 22, 2025
0eb01ca
fix bug with field reuse in avro schema repair
Oct 22, 2025
5090d2e
fix read parquet log block timestamp ntz
Oct 22, 2025
674babb
allow long to timestampntz without cast
Oct 22, 2025
796e7f0
refactor AvroSchemaRepair for performance and add unit tests
Oct 23, 2025
0ffcc96
refactor schema repair for performance and add testing
Oct 23, 2025
6231af6
try fix other spark versions
Oct 23, 2025
5b89d6d
fix spark 3.3 build
Oct 24, 2025
bb4f550
fix spark3.4 build
Oct 24, 2025
3e7a298
hopefully fix spark4
Oct 24, 2025
f297231
fix issue with union schema, add table schema to cdc in missing place
Oct 24, 2025
b0933c0
add spark cow read testing for repair
Oct 24, 2025
6a6836f
building, and add spark mor tests
Oct 24, 2025
a5ac68a
forgot to add the zips
Oct 24, 2025
47506f6
cow write testing
Oct 25, 2025
758645e
add mor testing
Oct 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.avro.HoodieTimestampAwareParquetInputFormat;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
Expand All @@ -37,6 +39,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.avro.HiveTypeUtils;
Expand Down Expand Up @@ -94,7 +97,18 @@ public StorageConfiguration<?> getStorageConf() {

@Override
public HoodieReaderContext<ArrayWritable> getHoodieReaderContext(String tablePath, Schema avroSchema, StorageConfiguration<?> storageConf, HoodieTableMetaClient metaClient) {
HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator readerCreator = (inputSplit, jobConf) -> new MapredParquetInputFormat().getRecordReader(inputSplit, jobConf, null);
HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator readerCreator = (inputSplit, jobConf, dataSchema) -> {
if (HoodieColumnProjectionUtils.supportTimestamp(jobConf)) {
try {
return new ParquetRecordReaderWrapper(new HoodieTimestampAwareParquetInputFormat(Option.empty(), Option.of(dataSchema)), inputSplit, jobConf, null);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} else {
return new MapredParquetInputFormat().getRecordReader(inputSplit, jobConf, null);
}
};

JobConf jobConf = new JobConf(storageConf.unwrapAs(Configuration.class));
setupJobconf(jobConf, avroSchema);
return new HiveHoodieReaderContext(readerCreator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.avro.Schema;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
Expand Down Expand Up @@ -268,7 +269,11 @@ private static void assertWritablePrimaryTypeMatchesSchema(Schema schema, Writab
break;

case LONG:
assertInstanceOf(LongWritable.class, writable);
if (schema.getLogicalType() instanceof LogicalTypes.TimestampMillis) {
assertInstanceOf(TimestampWritable.class, writable);
} else {
assertInstanceOf(LongWritable.class, writable);
}
break;

case FLOAT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.io.storage;

import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieFileFormat;
Expand All @@ -40,6 +41,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.SchemaRepair;
import org.apache.spark.sql.HoodieInternalRowUtils;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
Expand All @@ -60,13 +62,15 @@

import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
import static org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter;

public class HoodieSparkParquetReader implements HoodieSparkFileReader {

private final StoragePath path;
private final HoodieStorage storage;
private final FileFormatUtils parquetUtils;
private final List<ClosableIterator> readerIterators = new ArrayList<>();
private Option<MessageType> messageTypeOption = Option.empty();
private Option<StructType> structTypeOption = Option.empty();
private Option<Schema> schemaOption = Option.empty();

Expand Down Expand Up @@ -116,19 +120,20 @@ public ClosableIterator<String> getRecordKeyIterator() throws IOException {
}

public ClosableIterator<UnsafeRow> getUnsafeRowIterator(Schema requestedSchema) throws IOException {
return getUnsafeRowIterator(HoodieInternalRowUtils.getCachedSchema(requestedSchema));
}

public ClosableIterator<UnsafeRow> getUnsafeRowIterator(StructType requestedSchema) throws IOException {
SparkBasicSchemaEvolution evolution = new SparkBasicSchemaEvolution(getStructSchema(), requestedSchema, SQLConf.get().sessionLocalTimeZone());
Schema requestNonNull = AvroSchemaUtils.resolveNullableSchema(requestedSchema);
StructType structSchema = HoodieInternalRowUtils.getCachedSchema(requestNonNull);
Option<MessageType> messageSchema = Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(requestNonNull));
MessageType dataMessageType = SchemaRepair.repairLogicalTypes(getMessageType(), messageSchema);
StructType dataStructType = convertToStruct(dataMessageType);
SparkBasicSchemaEvolution evolution = new SparkBasicSchemaEvolution(dataStructType, structSchema, SQLConf.get().sessionLocalTimeZone());
String readSchemaJson = evolution.getRequestSchema().json();
storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA, readSchemaJson);
storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(), readSchemaJson);
storage.getConf().set(SQLConf.PARQUET_BINARY_AS_STRING().key(), SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()).toString());
storage.getConf().set(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()).toString());
ParquetReader<InternalRow> reader = ParquetReader.builder(new HoodieParquetReadSupport(Option$.MODULE$.empty(), true,
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("CORRECTED"),
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("LEGACY")),
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("LEGACY"), messageSchema),
new Path(path.toUri()))
.withConf(storage.getConf().unwrapAs(Configuration.class))
.build();
Expand All @@ -139,15 +144,22 @@ public ClosableIterator<UnsafeRow> getUnsafeRowIterator(StructType requestedSche
return projectedIterator;
}

private MessageType getMessageType() {
if (messageTypeOption.isEmpty()) {
MessageType messageType = ((ParquetUtils) parquetUtils).readSchema(storage, path);
messageTypeOption = Option.of(messageType);
}
return messageTypeOption.get();
}

@Override
public Schema getSchema() {
if (schemaOption.isEmpty()) {
// Some types in avro are not compatible with parquet.
// Avro only supports representing Decimals as fixed byte array
// and therefore if we convert to Avro directly we'll lose logical type-info.
MessageType messageType = ((ParquetUtils) parquetUtils).readSchema(storage, path);
StructType structType = new ParquetToSparkSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(messageType);
structTypeOption = Option.of(structType);
MessageType messageType = getMessageType();
StructType structType = getStructSchema();
schemaOption = Option.of(SparkAdapterSupport$.MODULE$.sparkAdapter()
.getAvroSchemaConverters()
.toAvroType(structType, true, messageType.getName(), StringUtils.EMPTY_STRING));
Expand All @@ -157,11 +169,16 @@ public Schema getSchema() {

protected StructType getStructSchema() {
if (structTypeOption.isEmpty()) {
getSchema();
MessageType messageType = getMessageType();
structTypeOption = Option.of(convertToStruct(messageType));
}
return structTypeOption.get();
}

private StructType convertToStruct(MessageType messageType) {
return new ParquetToSparkSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(messageType);
}

@Override
public void close() {
readerIterators.forEach(ClosableIterator::close);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import org.apache.hudi.common.util.collection.{CachingIterator, ClosableIterator
import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory, HoodieSparkParquetReader}
import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, StoragePath}
import org.apache.hudi.util.CloseableInternalRowIterator

import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter
import org.apache.spark.sql.HoodieInternalRowUtils
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.JoinedRow
Expand Down Expand Up @@ -75,19 +78,28 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
if (hasRowIndexField) {
assert(getRecordContext.supportsParquetRowIndex())
}
val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
if (FSUtils.isLogFile(filePath)) {
new HoodieSparkFileReaderFactory(storage).newParquetFileReader(filePath)
.asInstanceOf[HoodieSparkParquetReader].getUnsafeRowIterator(structType).asInstanceOf[ClosableIterator[InternalRow]]
.asInstanceOf[HoodieSparkParquetReader].getUnsafeRowIterator(requiredSchema).asInstanceOf[ClosableIterator[InternalRow]]
} else {
val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
// partition value is empty because the spark parquet reader will append the partition columns to
// each row if they are given. That is the only usage of the partition values in the reader.
val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
.createPartitionedFile(InternalRow.empty, filePath, start, length)
val (readSchema, readFilters) = getSchemaAndFiltersForRead(structType, hasRowIndexField)

// Convert Avro dataSchema to Parquet MessageType for timestamp precision conversion
val tableSchemaOpt = if (dataSchema != null) {
val hadoopConf = storage.getConf.unwrapAs(classOf[Configuration])
val parquetSchema = getAvroSchemaConverter(hadoopConf).convert(dataSchema)
org.apache.hudi.common.util.Option.of(parquetSchema)
} else {
org.apache.hudi.common.util.Option.empty[org.apache.parquet.schema.MessageType]()
}
new CloseableInternalRowIterator(baseFileReader.read(fileInfo,
readSchema, StructType(Seq.empty), getSchemaHandler.getInternalSchemaOpt,
readFilters, storage.getConf.asInstanceOf[StorageConfiguration[Configuration]]))
readFilters, storage.getConf.asInstanceOf[StorageConfiguration[Configuration]], tableSchemaOpt))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hudi.common.util
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.storage.StorageConfiguration
import org.apache.parquet.schema.MessageType
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
Expand All @@ -37,12 +38,14 @@ trait SparkColumnarFileReader extends Serializable {
* @param internalSchemaOpt option of internal schema for schema.on.read
* @param filters filters for data skipping. Not guaranteed to be used; the spark plan will also apply the filters.
* @param storageConf the hadoop conf
* @param tableSchemaOpt option of table schema for timestamp precision conversion
* @return iterator of rows read from the file output type says [[InternalRow]] but could be [[ColumnarBatch]]
*/
def read(file: PartitionedFile,
requiredSchema: StructType,
partitionSchema: StructType,
internalSchemaOpt: util.Option[InternalSchema],
filters: Seq[Filter],
storageConf: StorageConfiguration[Configuration]): Iterator[InternalRow]
storageConf: StorageConfiguration[Configuration],
tableSchemaOpt: util.Option[MessageType] = util.Option.empty()): Iterator[InternalRow]
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.parquet.hadoop.metadata.FileMetaData
import org.apache.spark.sql.HoodieSchemaUtils
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{ArrayTransform, Attribute, Cast, CreateNamedStruct, CreateStruct, Expression, GetStructField, LambdaFunction, Literal, MapEntries, MapFromEntries, NamedLambdaVariable, UnsafeProjection}
import org.apache.spark.sql.types.{ArrayType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{ArrayType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, StringType, StructField, StructType, TimestampNTZType}

object HoodieParquetFileFormatHelper {

Expand Down Expand Up @@ -58,6 +58,9 @@ object HoodieParquetFileFormatHelper {
def isDataTypeEqual(requiredType: DataType, fileType: DataType): Boolean = (requiredType, fileType) match {
case (requiredType, fileType) if requiredType == fileType => true

// prevent illegal cast
case (TimestampNTZType, LongType) => true

case (ArrayType(rt, _), ArrayType(ft, _)) =>
// Do not care about nullability as schema evolution require fields to be nullable
isDataTypeEqual(rt, ft)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.hudi.common.util.ValidationUtils

import org.apache.parquet.hadoop.api.InitContext
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
import org.apache.parquet.schema.{GroupType, MessageType, Type, Types}
import org.apache.parquet.schema.{GroupType, MessageType, SchemaRepair, Type, Types}
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec

import java.time.ZoneId
Expand All @@ -35,16 +35,16 @@ class HoodieParquetReadSupport(
convertTz: Option[ZoneId],
enableVectorizedReader: Boolean,
datetimeRebaseSpec: RebaseSpec,
int96RebaseSpec: RebaseSpec)
int96RebaseSpec: RebaseSpec,
tableSchemaOpt: org.apache.hudi.common.util.Option[org.apache.parquet.schema.MessageType] = org.apache.hudi.common.util.Option.empty())
extends ParquetReadSupport(convertTz, enableVectorizedReader, datetimeRebaseSpec, int96RebaseSpec) with SparkAdapterSupport {

override def init(context: InitContext): ReadContext = {
val readContext = super.init(context)
val requestedParquetSchema = readContext.getRequestedSchema
val requestedParquetSchema = SchemaRepair.repairLogicalTypes(readContext.getRequestedSchema, tableSchemaOpt)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is fix for the Avro parquet reader? Also, the Hive reader needs a fix too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can it be ensured that the readContext.getRequestedSchema coming from the parquet footer?

val trimmedParquetSchema = HoodieParquetReadSupport.trimParquetSchema(requestedParquetSchema, context.getFileSchema)
new ReadContext(trimmedParquetSchema, readContext.getReadSupportMetadata)
}

}

object HoodieParquetReadSupport {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ class MultipleColumnarFileFormatReader(parquetReader: SparkColumnarFileReader, o
* @param storageConf the hadoop conf
* @return iterator of rows read from the file output type says [[InternalRow]] but could be [[ColumnarBatch]]
*/
override def read(file: PartitionedFile, requiredSchema: StructType, partitionSchema: StructType, internalSchemaOpt: util.Option[InternalSchema], filters: Seq[Filter], storageConf: StorageConfiguration[Configuration]): Iterator[InternalRow] = {
override def read(file: PartitionedFile, requiredSchema: StructType, partitionSchema: StructType, internalSchemaOpt: util.Option[InternalSchema], filters: Seq[Filter], storageConf: StorageConfiguration[Configuration], tableSchemaOpt: util.Option[org.apache.parquet.schema.MessageType]): Iterator[InternalRow] = {
val filePath = sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
val fileFormat = HoodieFileFormat.fromFileExtension(filePath.getFileExtension)
fileFormat match {
case HoodieFileFormat.PARQUET =>
parquetReader.read(file, requiredSchema, partitionSchema, internalSchemaOpt, filters, storageConf)
parquetReader.read(file, requiredSchema, partitionSchema, internalSchemaOpt, filters, storageConf, tableSchemaOpt)
case HoodieFileFormat.ORC =>
orcReader.read(file, requiredSchema, partitionSchema, internalSchemaOpt, filters, storageConf)
orcReader.read(file, requiredSchema, partitionSchema, internalSchemaOpt, filters, storageConf, tableSchemaOpt)
case _ =>
throw new IllegalArgumentException(s"Unsupported file format for file: $filePath")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1208,7 +1208,9 @@ public static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Schem
return oldValue;
case LONG:
if (oldSchema.getLogicalType() != newSchema.getLogicalType()) {
if (oldSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis) {
if (oldSchema.getLogicalType() == null || newSchema.getLogicalType() == null) {
return oldValue;
} else if (oldSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis) {
if (newSchema.getLogicalType() instanceof LogicalTypes.TimestampMicros) {
return DateTimeUtils.millisToMicros((Long) oldValue);
}
Expand Down Expand Up @@ -1485,6 +1487,10 @@ public static boolean recordNeedsRewriteForExtendedAvroTypePromotion(Schema writ
case DOUBLE:
case FLOAT:
case LONG:
if (readerSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis
&& writerSchema.getLogicalType() instanceof LogicalTypes.TimestampMicros) {
return true;
}
return !(writerSchema.getType().equals(Schema.Type.INT) || writerSchema.getType().equals(Schema.Type.LONG));
default:
return !writerSchema.getType().equals(readerSchema.getType());
Expand Down
Loading
Loading