diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala index fa3731c2c57a7..e3523b75ce679 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala @@ -24,7 +24,7 @@ import org.apache.parquet.hadoop.metadata.FileMetaData import org.apache.spark.sql.HoodieSchemaUtils import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.expressions.{ArrayTransform, Attribute, Cast, CreateNamedStruct, CreateStruct, Expression, GetStructField, LambdaFunction, Literal, MapEntries, MapFromEntries, NamedLambdaVariable, UnsafeProjection} -import org.apache.spark.sql.types.{ArrayType, DataType, DoubleType, FloatType, MapType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{ArrayType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, StringType, StructField, StructType} object HoodieParquetFileFormatHelper { @@ -104,18 +104,24 @@ object HoodieParquetFileFormatHelper { requiredSchema: StructType, partitionSchema: StructType, schemaUtils: HoodieSchemaUtils): UnsafeProjection = { - val floatToDoubleCache = scala.collection.mutable.HashMap.empty[(DataType, DataType), Boolean] + val addedCastCache = scala.collection.mutable.HashMap.empty[(DataType, DataType), Boolean] - def hasFloatToDoubleConversion(src: DataType, dst: DataType): Boolean = { - floatToDoubleCache.getOrElseUpdate((src, dst), { + def hasUnsupportedConversion(src: DataType, dst: DataType): Boolean = { + addedCastCache.getOrElseUpdate((src, dst), { (src, dst) match { case (FloatType, DoubleType) => true + case (IntegerType, DecimalType()) => true + case (LongType, DecimalType()) => true + case (FloatType, DecimalType()) => true + case (DoubleType, DecimalType()) => true + case (StringType, DecimalType()) => true + case (StringType, DateType) => true case (StructType(srcFields), StructType(dstFields)) => - srcFields.zip(dstFields).exists { case (sf, df) => hasFloatToDoubleConversion(sf.dataType, df.dataType) } + srcFields.zip(dstFields).exists { case (sf, df) => hasUnsupportedConversion(sf.dataType, df.dataType) } case (ArrayType(sElem, _), ArrayType(dElem, _)) => - hasFloatToDoubleConversion(sElem, dElem) + hasUnsupportedConversion(sElem, dElem) case (MapType(sKey, sVal, _), MapType(dKey, dVal, _)) => - hasFloatToDoubleConversion(sKey, dKey) || hasFloatToDoubleConversion(sVal, dVal) + hasUnsupportedConversion(sKey, dKey) || hasUnsupportedConversion(sVal, dVal) case _ => false } }) @@ -127,7 +133,14 @@ object HoodieParquetFileFormatHelper { case (FloatType, DoubleType) => val toStr = Cast(expr, StringType, if (needTimeZone) timeZoneId else None) Cast(toStr, dstType, if (needTimeZone) timeZoneId else None) - case (s: StructType, d: StructType) if hasFloatToDoubleConversion(s, d) => + case (IntegerType | LongType | FloatType | DoubleType, dec: DecimalType) => + val toStr = Cast(expr, StringType, if (needTimeZone) timeZoneId else None) + Cast(toStr, dec, if (needTimeZone) timeZoneId else None) + case (StringType, dec: DecimalType) => + Cast(expr, dec, if (needTimeZone) timeZoneId else None) + case (StringType, DateType) => + Cast(expr, DateType, if (needTimeZone) timeZoneId else None) + case (s: StructType, d: StructType) if hasUnsupportedConversion(s, d) => val structFields = s.fields.zip(d.fields).zipWithIndex.map { case ((srcField, dstField), i) => val child = GetStructField(expr, i, Some(dstField.name)) @@ -136,13 +149,13 @@ object HoodieParquetFileFormatHelper { CreateNamedStruct(d.fields.zip(structFields).flatMap { case (f, c) => Seq(Literal(f.name), c) }) - case (ArrayType(sElementType, containsNull), ArrayType(dElementType, _)) if hasFloatToDoubleConversion(sElementType, dElementType) => + case (ArrayType(sElementType, containsNull), ArrayType(dElementType, _)) if hasUnsupportedConversion(sElementType, dElementType) => val lambdaVar = NamedLambdaVariable("element", sElementType, containsNull) val body = recursivelyCastExpressions(lambdaVar, sElementType, dElementType) val func = LambdaFunction(body, Seq(lambdaVar)) ArrayTransform(expr, func) case (MapType(sKeyType, sValType, vnull), MapType(dKeyType, dValType, _)) - if hasFloatToDoubleConversion(sKeyType, dKeyType) || hasFloatToDoubleConversion(sValType, dValType) => + if hasUnsupportedConversion(sKeyType, dKeyType) || hasUnsupportedConversion(sValType, dValType) => val kv = NamedLambdaVariable("kv", new StructType() .add("key", sKeyType, nullable = false) .add("value", sValType, nullable = vnull), nullable = false) diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java index f376b6c1a7c37..036818326efa9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java @@ -23,7 +23,9 @@ import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieAvroRecordMerger; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.model.OverwriteWithLatestMerger; @@ -64,6 +66,7 @@ */ public class HoodieAvroReaderContext extends HoodieReaderContext { private final Map reusableFileReaders; + private final boolean isMultiFormat; /** * Constructs an instance of the reader context that will read data into Avro records. @@ -125,6 +128,7 @@ private HoodieAvroReaderContext( boolean requiresPayloadRecords) { super(storageConfiguration, tableConfig, instantRangeOpt, filterOpt, new AvroRecordContext(tableConfig, payloadClassName, requiresPayloadRecords)); this.reusableFileReaders = reusableFileReaders; + this.isMultiFormat = tableConfig.isMultipleBaseFileFormatsEnabled(); } @Override @@ -136,15 +140,27 @@ public ClosableIterator getFileRecordIterator( Schema requiredSchema, HoodieStorage storage) throws IOException { HoodieAvroFileReader reader; + boolean isLogFile = FSUtils.isLogFile(filePath); + Schema fileOutputSchema; + Map renamedColumns; + if (isLogFile) { + fileOutputSchema = requiredSchema; + renamedColumns = Collections.emptyMap(); + } else { + Pair> requiredSchemaForFileAndRenamedColumns = getSchemaHandler().getRequiredSchemaForFileAndRenamedColumns(filePath); + fileOutputSchema = requiredSchemaForFileAndRenamedColumns.getLeft(); + renamedColumns = requiredSchemaForFileAndRenamedColumns.getRight(); + } if (reusableFileReaders.containsKey(filePath)) { reader = reusableFileReaders.get(filePath); } else { + HoodieFileFormat fileFormat = isMultiFormat && !isLogFile ? HoodieFileFormat.fromFileExtension(filePath.getFileExtension()) : baseFileFormat; reader = (HoodieAvroFileReader) HoodieIOFactory.getIOFactory(storage) .getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(new HoodieConfig(), - filePath, baseFileFormat, Option.empty()); + filePath, fileFormat, Option.empty()); } if (keyFilterOpt.isEmpty()) { - return reader.getIndexedRecordIterator(dataSchema, requiredSchema); + return reader.getIndexedRecordIterator(dataSchema, fileOutputSchema, renamedColumns); } if (reader.supportKeyPredicate()) { List keys = reader.extractKeys(keyFilterOpt); @@ -158,7 +174,7 @@ public ClosableIterator getFileRecordIterator( return reader.getIndexedRecordsByKeyPrefixIterator(keyPrefixes, requiredSchema); } } - return reader.getIndexedRecordIterator(dataSchema, requiredSchema); + return reader.getIndexedRecordIterator(dataSchema, fileOutputSchema, renamedColumns); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 0de9d4dd62f2d..c74c6a698831b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -1406,8 +1406,15 @@ public static boolean recordNeedsRewriteForExtendedAvroTypePromotion(Schema writ if (writerSchema.equals(readerSchema)) { return false; } + if (readerSchema.getLogicalType() != null) { + // check if logical types are equal + return !readerSchema.getLogicalType().equals(writerSchema.getLogicalType()); + } switch (readerSchema.getType()) { case RECORD: + if (readerSchema.getFields().size() > writerSchema.getFields().size()) { + return true; + } for (Schema.Field field : readerSchema.getFields()) { Schema.Field writerField = writerSchema.getField(field.name()); if (writerField == null || recordNeedsRewriteForExtendedAvroTypePromotion(writerField.schema(), field.schema())) { @@ -1430,13 +1437,13 @@ public static boolean recordNeedsRewriteForExtendedAvroTypePromotion(Schema writ case ENUM: return needsRewriteToString(writerSchema, true); case STRING: - case BYTES: return needsRewriteToString(writerSchema, false); case DOUBLE: - // To maintain precision, you need to convert Float -> String -> Double - return writerSchema.getType().equals(Schema.Type.FLOAT); + case FLOAT: + case LONG: + return !(writerSchema.getType().equals(Schema.Type.INT) || writerSchema.getType().equals(Schema.Type.LONG)); default: - return false; + return !writerSchema.getType().equals(readerSchema.getType()); } } @@ -1446,18 +1453,13 @@ public static boolean recordNeedsRewriteForExtendedAvroTypePromotion(Schema writ * string so some intervention is needed */ private static boolean needsRewriteToString(Schema schema, boolean isEnum) { - switch (schema.getType()) { - case INT: - case LONG: - case FLOAT: - case DOUBLE: - case BYTES: - return true; - case ENUM: - return !isEnum; - default: - return false; + if (schema.getLogicalType() != null) { + return true; + } + if (schema.getType() == Schema.Type.ENUM) { + return !isEnum; } + return true; } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java index a05fa38b29657..7ec40955f5b80 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java @@ -24,16 +24,21 @@ import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer; +import org.apache.hudi.common.util.InternalSchemaCache; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Triple; import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.action.InternalSchemaMerger; import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; +import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; @@ -76,22 +81,24 @@ public class FileGroupReaderSchemaHandler { protected final TypedProperties properties; private final DeleteContext deleteContext; + private final HoodieTableMetaClient metaClient; public FileGroupReaderSchemaHandler(HoodieReaderContext readerContext, Schema tableSchema, Schema requestedSchema, Option internalSchemaOpt, - HoodieTableConfig hoodieTableConfig, - TypedProperties properties) { + TypedProperties properties, + HoodieTableMetaClient metaClient) { this.properties = properties; this.readerContext = readerContext; this.tableSchema = tableSchema; this.requestedSchema = AvroSchemaCache.intern(requestedSchema); - this.hoodieTableConfig = hoodieTableConfig; + this.hoodieTableConfig = metaClient.getTableConfig(); this.deleteContext = new DeleteContext(properties, tableSchema); this.requiredSchema = AvroSchemaCache.intern(prepareRequiredSchema(this.deleteContext)); this.internalSchema = pruneInternalSchema(requiredSchema, internalSchemaOpt); this.internalSchemaOpt = getInternalSchemaOpt(internalSchemaOpt); + this.metaClient = metaClient; } public Schema getTableSchema() { @@ -125,6 +132,18 @@ public DeleteContext getDeleteContext() { return deleteContext; } + public Pair> getRequiredSchemaForFileAndRenamedColumns(StoragePath path) { + if (internalSchema.isEmptySchema()) { + return Pair.of(requiredSchema, Collections.emptyMap()); + } + long commitInstantTime = Long.parseLong(FSUtils.getCommitTime(path.getName())); + InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(commitInstantTime, metaClient); + Pair> mergedInternalSchema = new InternalSchemaMerger(fileSchema, internalSchema, + true, false, false).mergeSchemaGetRenamed(); + Schema mergedAvroSchema = AvroSchemaCache.intern(AvroInternalSchemaConverter.convert(mergedInternalSchema.getLeft(), requiredSchema.getFullName())); + return Pair.of(mergedAvroSchema, mergedInternalSchema.getRight()); + } + private InternalSchema pruneInternalSchema(Schema requiredSchema, Option internalSchemaOption) { if (!internalSchemaOption.isPresent()) { return InternalSchema.getEmptyInternalSchema(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java index 5615374c13136..2fd395c2eb3d3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java @@ -116,8 +116,8 @@ private HoodieFileGroupReader(HoodieReaderContext readerContext, HoodieStorag readerContext.setShouldMergeUseRecordPosition(readerParameters.useRecordPosition() && !isSkipMerge && readerContext.getHasLogFiles() && inputSplit.isParquetBaseFile()); readerContext.setHasBootstrapBaseFile(inputSplit.getBaseFileOption().flatMap(HoodieBaseFile::getBootstrapBaseFile).isPresent()); readerContext.setSchemaHandler(readerContext.getRecordContext().supportsParquetRowIndex() - ? new ParquetRowIndexBasedSchemaHandler<>(readerContext, dataSchema, requestedSchema, internalSchemaOpt, tableConfig, props) - : new FileGroupReaderSchemaHandler<>(readerContext, dataSchema, requestedSchema, internalSchemaOpt, tableConfig, props)); + ? new ParquetRowIndexBasedSchemaHandler<>(readerContext, dataSchema, requestedSchema, internalSchemaOpt, props, metaClient) + : new FileGroupReaderSchemaHandler<>(readerContext, dataSchema, requestedSchema, internalSchemaOpt, props, metaClient)); this.outputConverter = readerContext.getSchemaHandler().getOutputConverter(); this.orderingFieldNames = HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(), props, hoodieTableMetaClient); this.readStats = new HoodieReadStats(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/ParquetRowIndexBasedSchemaHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/ParquetRowIndexBasedSchemaHandler.java index c365e09cab29e..d0be7534884e0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/ParquetRowIndexBasedSchemaHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/ParquetRowIndexBasedSchemaHandler.java @@ -21,7 +21,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieReaderContext; -import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.common.util.collection.Pair; @@ -46,9 +46,9 @@ public ParquetRowIndexBasedSchemaHandler(HoodieReaderContext readerContext, Schema dataSchema, Schema requestedSchema, Option internalSchemaOpt, - HoodieTableConfig hoodieTableConfig, - TypedProperties properties) { - super(readerContext, dataSchema, requestedSchema, internalSchemaOpt, hoodieTableConfig, properties); + TypedProperties properties, + HoodieTableMetaClient metaClient) { + super(readerContext, dataSchema, requestedSchema, internalSchemaOpt, properties, metaClient); if (!readerContext.getRecordContext().supportsParquetRowIndex()) { throw new IllegalStateException("Using " + this.getClass().getName() + " but context does not support parquet row index"); } diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java index bb79087e97e09..e84090a54182b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java @@ -29,7 +29,9 @@ import org.apache.avro.generic.IndexedRecord; import java.io.IOException; +import java.util.Collections; import java.util.List; +import java.util.Map; import static org.apache.hudi.common.util.TypeUtils.unsafeCast; @@ -48,7 +50,11 @@ protected ClosableIterator getIndexedRecordIterator(Schema reader return getIndexedRecordIterator(readerSchema, readerSchema); } - public abstract ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException; + public ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException { + return getIndexedRecordIterator(readerSchema, requestedSchema, Collections.emptyMap()); + } + + public abstract ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema, Map renamedColumns) throws IOException; public abstract ClosableIterator getIndexedRecordsByKeysIterator(List keys, Schema readerSchema) diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java index a5e339b83da97..48aae71fd0c93 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java @@ -96,9 +96,10 @@ private HoodieNativeAvroHFileReader(HFileReaderFactory readerFactory, @Override public ClosableIterator getIndexedRecordIterator(Schema readerSchema, - Schema requestedSchema) + Schema requestedSchema, + Map renamedColumns) throws IOException { - if (!Objects.equals(readerSchema, requestedSchema)) { + if (!Objects.equals(readerSchema, requestedSchema) || !renamedColumns.isEmpty()) { throw new UnsupportedOperationException( "Schema projections are not supported in HFile reader"); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 2f0d76043de37..e8b009e3a1092 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -561,7 +561,8 @@ private ReusableFileGroupRecordBufferLoader buildReusableRecordBu readerContext.initRecordMerger(metadataConfig.getProps()); readerContext.setHasBootstrapBaseFile(false); readerContext.setHasLogFiles(fileSlice.hasLogFiles()); - readerContext.setSchemaHandler(new FileGroupReaderSchemaHandler<>(readerContext, SCHEMA, SCHEMA, Option.empty(), metadataMetaClient.getTableConfig(), metadataConfig.getProps())); + readerContext.setSchemaHandler(new FileGroupReaderSchemaHandler<>(readerContext, SCHEMA, SCHEMA, Option.empty(), + metadataConfig.getProps(), metadataMetaClient)); readerContext.setShouldMergeUseRecordPosition(false); readerContext.setLatestCommitTime(latestMetadataInstantTime); return FileGroupRecordBufferLoader.createReusable(readerContext); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 4bfeee6d8704f..230eb954507b6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -1044,7 +1044,7 @@ private static ClosableIterator> getLogRecords(List(readerContext, writerSchemaOpt.get(), writerSchemaOpt.get(), Option.empty(), tableConfig, properties)); + readerContext.setSchemaHandler(new FileGroupReaderSchemaHandler<>(readerContext, writerSchemaOpt.get(), writerSchemaOpt.get(), Option.empty(), properties, datasetMetaClient)); HoodieReadStats readStats = new HoodieReadStats(); KeyBasedFileGroupRecordBuffer recordBuffer = new KeyBasedFileGroupRecordBuffer<>(readerContext, datasetMetaClient, readerContext.getMergeMode(), PartialUpdateMode.NONE, properties, tableConfig.getPreCombineFields(), diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java index f7823cef0cdd7..cdb69431927dc 100644 --- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java @@ -1131,4 +1131,53 @@ public void testProjectSchemaWithNullableAndNestedFields(List projectedF assertEquals(expectedSchema, projectedSchema); assertTrue(AvroSchemaUtils.isSchemaCompatible(projectedSchema, expectedSchema, false)); } + + private static Stream recordNeedsRewriteForExtendedAvroTypePromotion() { + Schema decimal1 = LogicalTypes.decimal(12, 2).addToSchema(Schema.create(Schema.Type.BYTES)); + Schema decimal2 = LogicalTypes.decimal(10, 2).addToSchema(Schema.create(Schema.Type.BYTES)); + Schema dateSchema = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)); + Schema doubleSchema = Schema.create(Schema.Type.DOUBLE); + Schema intSchema = Schema.create(Schema.Type.INT); + Schema longSchema = Schema.create(Schema.Type.LONG); + Schema floatSchema = Schema.create(Schema.Type.FLOAT); + Schema stringSchema = Schema.create(Schema.Type.STRING); + + Schema recordSchema1 = Schema.createRecord("record1", null, "com.example", false, + Arrays.asList(new Schema.Field("decimalField", decimal1, null, null), + new Schema.Field("doubleField", doubleSchema, null, null))); + + Schema recordSchema2 = Schema.createRecord("record2", null, "com.example2", false, + Arrays.asList(new Schema.Field("decimalField", decimal1, null, null), + new Schema.Field("doubleField", doubleSchema, null, null))); + + return Stream.of( + Arguments.of(intSchema, longSchema, false), + Arguments.of(intSchema, floatSchema, false), + Arguments.of(longSchema, intSchema, true), + Arguments.of(longSchema, floatSchema, false), + Arguments.of(decimal1, decimal2, true), + Arguments.of(doubleSchema, decimal1, true), + Arguments.of(decimal1, doubleSchema, true), + Arguments.of(intSchema, stringSchema, true), + Arguments.of(longSchema, doubleSchema, false), + Arguments.of(intSchema, doubleSchema, false), + Arguments.of(longSchema, stringSchema, true), + Arguments.of(floatSchema, stringSchema, true), + Arguments.of(doubleSchema, stringSchema, true), + Arguments.of(decimal1, stringSchema, true), + Arguments.of(stringSchema, decimal2, true), + Arguments.of(stringSchema, intSchema, true), + Arguments.of(floatSchema, doubleSchema, true), + Arguments.of(doubleSchema, floatSchema, true), + Arguments.of(recordSchema1, recordSchema2, false), + Arguments.of(dateSchema, stringSchema, true) + ); + } + + @ParameterizedTest + @MethodSource + void recordNeedsRewriteForExtendedAvroTypePromotion(Schema writerSchema, Schema readerSchema, boolean expected) { + boolean result = HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(writerSchema, readerSchema); + assertEquals(expected, result); + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java b/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java index 41d63434f5d76..44bca13be898c 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.DefaultJavaTypeConverter; @@ -38,6 +39,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.provider.Arguments; import java.io.IOException; @@ -67,6 +69,13 @@ public abstract class SchemaHandlerTestBase { .map(Schema.Field::name).filter(f -> !f.equals("_hoodie_is_deleted")).toArray(String[]::new)); protected static final Schema DATA_COLS_ONLY_SCHEMA = generateProjectionSchema("begin_lat", "tip_history", "rider"); protected static final Schema META_COLS_ONLY_SCHEMA = generateProjectionSchema("_hoodie_commit_seqno", "_hoodie_record_key"); + protected final HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class); + protected final HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class); + + @BeforeEach + void setup() { + when(metaClient.getTableConfig()).thenReturn(hoodieTableConfig); + } static Stream testMorParams(boolean supportsParquetRowIndex) { Stream.Builder b = Stream.builder(); @@ -89,14 +98,13 @@ public void testMor(RecordMergeMode mergeMode, boolean supportsParquetRowIndex, boolean hasBuiltInDelete) throws IOException { Schema dataSchema = hasBuiltInDelete ? DATA_SCHEMA : DATA_SCHEMA_NO_DELETE; - HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class); setupMORTable(mergeMode, hasPrecombine, hoodieTableConfig); HoodieRecordMerger merger = mockRecordMerger(isProjectionCompatible, isProjectionCompatible ? new String[] {"begin_lat", "begin_lon", "_hoodie_record_key", "timestamp"} : new String[] {"begin_lat", "begin_lon", "timestamp"}); HoodieReaderContext readerContext = createReaderContext(hoodieTableConfig, supportsParquetRowIndex, true, false, mergeUseRecordPosition, merger); readerContext.setRecordMerger(Option.of(merger)); Schema requestedSchema = dataSchema; - FileGroupReaderSchemaHandler schemaHandler = createSchemaHandler(readerContext, dataSchema, requestedSchema, hoodieTableConfig, supportsParquetRowIndex); + FileGroupReaderSchemaHandler schemaHandler = createSchemaHandler(readerContext, dataSchema, requestedSchema, supportsParquetRowIndex); Schema expectedRequiredFullSchema = supportsParquetRowIndex && mergeUseRecordPosition ? ParquetRowIndexBasedSchemaHandler.addPositionalMergeCol(requestedSchema) : requestedSchema; @@ -105,7 +113,7 @@ public void testMor(RecordMergeMode mergeMode, //read subset of columns requestedSchema = DATA_COLS_ONLY_SCHEMA; - schemaHandler = createSchemaHandler(readerContext, dataSchema, requestedSchema, hoodieTableConfig, supportsParquetRowIndex); + schemaHandler = createSchemaHandler(readerContext, dataSchema, requestedSchema, supportsParquetRowIndex); Schema expectedRequiredSchema; if (mergeMode == EVENT_TIME_ORDERING && hasPrecombine) { expectedRequiredSchema = generateProjectionSchema(hasBuiltInDelete, "begin_lat", "tip_history", "rider", "_hoodie_record_key", "timestamp"); @@ -130,13 +138,12 @@ public void testMorBootstrap(RecordMergeMode mergeMode, boolean supportsParquetRowIndex, boolean hasBuiltInDelete) throws IOException { Schema dataSchema = hasBuiltInDelete ? DATA_SCHEMA : DATA_SCHEMA_NO_DELETE; - HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class); setupMORTable(mergeMode, hasPrecombine, hoodieTableConfig); HoodieRecordMerger merger = mockRecordMerger(isProjectionCompatible, new String[] {"begin_lat", "begin_lon", "timestamp"}); HoodieReaderContext readerContext = createReaderContext(hoodieTableConfig, supportsParquetRowIndex, true, true, mergeUseRecordPosition, merger); readerContext.setRecordMerger(Option.of(merger)); Schema requestedSchema = dataSchema; - FileGroupReaderSchemaHandler schemaHandler = createSchemaHandler(readerContext, dataSchema, requestedSchema, hoodieTableConfig, supportsParquetRowIndex); + FileGroupReaderSchemaHandler schemaHandler = createSchemaHandler(readerContext, dataSchema, requestedSchema, supportsParquetRowIndex); Schema expectedRequiredFullSchema = supportsParquetRowIndex && mergeUseRecordPosition ? ParquetRowIndexBasedSchemaHandler.addPositionalMergeCol(requestedSchema) : requestedSchema; @@ -153,7 +160,7 @@ public void testMorBootstrap(RecordMergeMode mergeMode, //read subset of columns requestedSchema = generateProjectionSchema("begin_lat", "tip_history", "_hoodie_record_key", "rider"); - schemaHandler = createSchemaHandler(readerContext, dataSchema, requestedSchema, hoodieTableConfig, supportsParquetRowIndex); + schemaHandler = createSchemaHandler(readerContext, dataSchema, requestedSchema, supportsParquetRowIndex); Schema expectedRequiredSchema; if (mergeMode == EVENT_TIME_ORDERING && hasPrecombine) { expectedRequiredSchema = generateProjectionSchema(hasBuiltInDelete, "_hoodie_record_key", "begin_lat", "tip_history", "rider", "timestamp"); @@ -180,7 +187,7 @@ public void testMorBootstrap(RecordMergeMode mergeMode, // request only data cols requestedSchema = DATA_COLS_ONLY_SCHEMA; - schemaHandler = createSchemaHandler(readerContext, dataSchema, requestedSchema, hoodieTableConfig, supportsParquetRowIndex); + schemaHandler = createSchemaHandler(readerContext, dataSchema, requestedSchema, supportsParquetRowIndex); if (mergeMode == EVENT_TIME_ORDERING && hasPrecombine) { expectedRequiredSchema = generateProjectionSchema(hasBuiltInDelete, "_hoodie_record_key", "begin_lat", "tip_history", "rider", "timestamp"); assertTrue(readerContext.getNeedsBootstrapMerge()); @@ -215,7 +222,7 @@ public void testMorBootstrap(RecordMergeMode mergeMode, // request only meta cols requestedSchema = META_COLS_ONLY_SCHEMA; - schemaHandler = createSchemaHandler(readerContext, dataSchema, requestedSchema, hoodieTableConfig, supportsParquetRowIndex); + schemaHandler = createSchemaHandler(readerContext, dataSchema, requestedSchema, supportsParquetRowIndex); if (mergeMode == EVENT_TIME_ORDERING && hasPrecombine) { expectedRequiredSchema = generateProjectionSchema(hasBuiltInDelete, "_hoodie_commit_seqno", "_hoodie_record_key", "timestamp"); assertTrue(readerContext.getNeedsBootstrapMerge()); @@ -283,8 +290,7 @@ static HoodieReaderContext createReaderContext(HoodieTableConfig hoodieT } abstract FileGroupReaderSchemaHandler createSchemaHandler(HoodieReaderContext readerContext, Schema dataSchema, - Schema requestedSchema, HoodieTableConfig hoodieTableConfig, - boolean supportsParquetRowIndex); + Schema requestedSchema, boolean supportsParquetRowIndex); static Schema generateProjectionSchema(String... fields) { return HoodieAvroUtils.generateProjectionSchema(DATA_SCHEMA, Arrays.asList(fields)); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java index 39ff90cc05c5c..2da196905f92d 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java @@ -22,17 +22,23 @@ import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; -import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.testutils.SchemaTestUtil; +import org.apache.hudi.common.util.InternalSchemaCache; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.Types; +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; +import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; import org.junit.jupiter.api.Test; @@ -40,12 +46,17 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY; @@ -60,29 +71,27 @@ public class TestFileGroupReaderSchemaHandler extends SchemaHandlerTestBase { @Test public void testCow() { - HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class); when(hoodieTableConfig.populateMetaFields()).thenReturn(true); HoodieReaderContext readerContext = createReaderContext(hoodieTableConfig, false, false, false, false, null); Schema requestedSchema = DATA_SCHEMA; - FileGroupReaderSchemaHandler schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, hoodieTableConfig, false); + FileGroupReaderSchemaHandler schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, false); assertEquals(requestedSchema, schemaHandler.getRequiredSchema()); //read subset of columns requestedSchema = generateProjectionSchema("begin_lat", "tip_history", "rider"); - schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, hoodieTableConfig, false); + schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, false); assertEquals(requestedSchema, schemaHandler.getRequiredSchema()); assertFalse(readerContext.getNeedsBootstrapMerge()); } @Test public void testCowBootstrap() { - HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class); when(hoodieTableConfig.populateMetaFields()).thenReturn(true); HoodieReaderContext readerContext = createReaderContext(hoodieTableConfig, false, false, true, false, null); Schema requestedSchema = generateProjectionSchema("begin_lat", "tip_history", "_hoodie_record_key", "rider"); //meta cols must go first in the required schema - FileGroupReaderSchemaHandler schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, hoodieTableConfig, false); + FileGroupReaderSchemaHandler schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, false); assertTrue(readerContext.getNeedsBootstrapMerge()); Schema expectedRequiredSchema = generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history", "rider"); assertEquals(expectedRequiredSchema, schemaHandler.getRequiredSchema()); @@ -91,6 +100,34 @@ public void testCowBootstrap() { assertEquals(Arrays.asList(getField("begin_lat"), getField("tip_history"), getField("rider")), bootstrapFields.getRight()); } + @Test + void testGetRequiredSchemaForFileAndRenameColumns() { + when(hoodieTableConfig.populateMetaFields()).thenReturn(true); + HoodieReaderContext readerContext = createReaderContext(hoodieTableConfig, false, false, true, false, null); + Schema requestedSchema = generateProjectionSchema("_hoodie_record_key", "timestamp", "rider"); + + InternalSchema internalSchema = AvroInternalSchemaConverter.convert(DATA_SCHEMA); + InternalSchema originalSchema = new InternalSchema(Types.RecordType.get(internalSchema.columns().stream().map(field -> { + if (field.name().equals("timestamp")) { + // rename timestamp to ts in file schema and change type to int, output schema names and types must match the requested schema + return Types.Field.get(field.fieldId(), "ts", Types.IntType.get()); + } + return field; + }).collect(Collectors.toList()))); + FileGroupReaderSchemaHandler schemaHandler = new FileGroupReaderSchemaHandler<>(readerContext, DATA_SCHEMA, requestedSchema, + Option.of(internalSchema), new TypedProperties(), metaClient); + + try (MockedStatic mockedStatic = Mockito.mockStatic(InternalSchemaCache.class)) { + String instantTime = "20231010101010"; + mockedStatic.when(() -> InternalSchemaCache.searchSchemaAndCache(Long.parseLong(instantTime), metaClient)) + .thenReturn(originalSchema); + StoragePath filePath = new StoragePath("/2023-01-01/" + FSUtils.makeBaseFileName(instantTime, "1-0-1", UUID.randomUUID().toString(), HoodieFileFormat.PARQUET.getFileExtension())); + Pair> requiredSchemaAndRenamedFields = schemaHandler.getRequiredSchemaForFileAndRenamedColumns(filePath); + assertEquals(Collections.singletonMap("timestamp", "ts"), requiredSchemaAndRenamedFields.getRight()); + assertEquals(requestedSchema, requiredSchemaAndRenamedFields.getLeft()); + } + } + private static Stream testMorParams() { return testMorParams(false); } @@ -118,10 +155,10 @@ public void testMorBootstrap(RecordMergeMode mergeMode, } @Override - FileGroupReaderSchemaHandler createSchemaHandler(HoodieReaderContext readerContext, Schema dataSchema, Schema requestedSchema, HoodieTableConfig hoodieTableConfig, + FileGroupReaderSchemaHandler createSchemaHandler(HoodieReaderContext readerContext, Schema dataSchema, Schema requestedSchema, boolean supportsParquetRowIndex) { return new FileGroupReaderSchemaHandler(readerContext, dataSchema, requestedSchema, - Option.empty(), hoodieTableConfig, new TypedProperties()); + Option.empty(), new TypedProperties(), metaClient); } @ParameterizedTest @@ -189,23 +226,22 @@ public void testSchemaForMandatoryFields(boolean setPrecombine, Schema dataSchema = SchemaTestUtil.getSchemaFromFields(dataSchemaFields); Schema requestedSchema = SchemaTestUtil.getSchemaFromFields(Arrays.asList(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD)); - HoodieTableConfig tableConfig = mock(HoodieTableConfig.class); - when(tableConfig.getRecordMergeMode()).thenReturn(mergeMode); - when(tableConfig.populateMetaFields()).thenReturn(true); - when(tableConfig.getPreCombineFieldsStr()).thenReturn(Option.of(setPrecombine ? preCombineField : StringUtils.EMPTY_STRING)); - when(tableConfig.getPreCombineFields()).thenReturn(setPrecombine ? Collections.singletonList(preCombineField) : Collections.emptyList()); - when(tableConfig.getTableVersion()).thenReturn(tableVersion); - if (tableConfig.getTableVersion() == HoodieTableVersion.SIX) { + when(hoodieTableConfig.getRecordMergeMode()).thenReturn(mergeMode); + when(hoodieTableConfig.populateMetaFields()).thenReturn(true); + when(hoodieTableConfig.getPreCombineFieldsStr()).thenReturn(Option.of(setPrecombine ? preCombineField : StringUtils.EMPTY_STRING)); + when(hoodieTableConfig.getPreCombineFields()).thenReturn(setPrecombine ? Collections.singletonList(preCombineField) : Collections.emptyList()); + when(hoodieTableConfig.getTableVersion()).thenReturn(tableVersion); + if (hoodieTableConfig.getTableVersion() == HoodieTableVersion.SIX) { if (mergeMode == RecordMergeMode.EVENT_TIME_ORDERING) { - when(tableConfig.getPayloadClass()).thenReturn(DefaultHoodieRecordPayload.class.getName()); + when(hoodieTableConfig.getPayloadClass()).thenReturn(DefaultHoodieRecordPayload.class.getName()); } else if (mergeMode == RecordMergeMode.COMMIT_TIME_ORDERING) { - when(tableConfig.getPayloadClass()).thenReturn(OverwriteWithLatestAvroPayload.class.getName()); + when(hoodieTableConfig.getPayloadClass()).thenReturn(OverwriteWithLatestAvroPayload.class.getName()); } else { - when(tableConfig.getPayloadClass()).thenReturn(OverwriteNonDefaultsWithLatestAvroPayload.class.getName()); + when(hoodieTableConfig.getPayloadClass()).thenReturn(OverwriteNonDefaultsWithLatestAvroPayload.class.getName()); } } if (mergeMode != null) { - when(tableConfig.getRecordMergeStrategyId()).thenReturn(mergeStrategyId); + when(hoodieTableConfig.getRecordMergeStrategyId()).thenReturn(mergeStrategyId); } TypedProperties props = new TypedProperties(); @@ -227,7 +263,7 @@ public void testSchemaForMandatoryFields(boolean setPrecombine, expectedFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD); } Schema expectedSchema = ((mergeMode == RecordMergeMode.CUSTOM) && !isProjectionCompatible) ? dataSchema : SchemaTestUtil.getSchemaFromFields(expectedFields); - when(recordMerger.getMandatoryFieldsForMerging(dataSchema, tableConfig, props)).thenReturn(expectedFields.toArray(new String[0])); + when(recordMerger.getMandatoryFieldsForMerging(dataSchema, hoodieTableConfig, props)).thenReturn(expectedFields.toArray(new String[0])); DeleteContext deleteContext = new DeleteContext(props, dataSchema); assertEquals(addHoodieIsDeleted, deleteContext.hasBuiltInDeleteField()); @@ -235,7 +271,7 @@ public void testSchemaForMandatoryFields(boolean setPrecombine, ? Option.of(Pair.of(customDeleteKey, customDeleteValue)) : Option.empty(), deleteContext.getCustomDeleteMarkerKeyValue()); FileGroupReaderSchemaHandler fileGroupReaderSchemaHandler = new FileGroupReaderSchemaHandler(readerContext, - dataSchema, requestedSchema, Option.empty(), tableConfig, props); + dataSchema, requestedSchema, Option.empty(), props, metaClient); Schema actualSchema = fileGroupReaderSchemaHandler.generateRequiredSchema(deleteContext); assertEquals(expectedSchema, actualSchema); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestParquetRowIndexBasedSchemaHandler.java b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestParquetRowIndexBasedSchemaHandler.java index 22b28c7acf403..865026f1a1603 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestParquetRowIndexBasedSchemaHandler.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestParquetRowIndexBasedSchemaHandler.java @@ -22,7 +22,6 @@ import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieReaderContext; -import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -41,18 +40,16 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class TestParquetRowIndexBasedSchemaHandler extends SchemaHandlerTestBase { @Test public void testCowBootstrapWithPositionMerge() { - HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class); when(hoodieTableConfig.populateMetaFields()).thenReturn(true); HoodieReaderContext readerContext = createReaderContext(hoodieTableConfig, true, false, true, false, null); Schema requestedSchema = generateProjectionSchema("begin_lat", "tip_history", "_hoodie_record_key", "rider"); - FileGroupReaderSchemaHandler schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, hoodieTableConfig, true); + FileGroupReaderSchemaHandler schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, true); assertTrue(readerContext.getNeedsBootstrapMerge()); //meta cols must go first in the required schema Schema expectedRequiredSchema = generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history", "rider"); @@ -61,14 +58,14 @@ public void testCowBootstrapWithPositionMerge() { assertEquals(Arrays.asList(getField("_hoodie_record_key"), getPositionalMergeField()), bootstrapFields.getLeft()); assertEquals(Arrays.asList(getField("begin_lat"), getField("tip_history"), getField("rider"), getPositionalMergeField()), bootstrapFields.getRight()); - schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA, DATA_COLS_ONLY_SCHEMA, hoodieTableConfig, true); + schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA, DATA_COLS_ONLY_SCHEMA, true); assertFalse(readerContext.getNeedsBootstrapMerge()); assertEquals(DATA_COLS_ONLY_SCHEMA, schemaHandler.getRequiredSchema()); bootstrapFields = schemaHandler.getBootstrapRequiredFields(); assertTrue(bootstrapFields.getLeft().isEmpty()); assertEquals(Arrays.asList(getField("begin_lat"), getField("tip_history"), getField("rider")), bootstrapFields.getRight()); - schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA, META_COLS_ONLY_SCHEMA, hoodieTableConfig, true); + schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA, META_COLS_ONLY_SCHEMA, true); assertFalse(readerContext.getNeedsBootstrapMerge()); assertEquals(META_COLS_ONLY_SCHEMA, schemaHandler.getRequiredSchema()); bootstrapFields = schemaHandler.getBootstrapRequiredFields(); @@ -103,9 +100,9 @@ public void testMorBootstrap(RecordMergeMode mergeMode, } @Override - FileGroupReaderSchemaHandler createSchemaHandler(HoodieReaderContext readerContext, Schema dataSchema, Schema requestedSchema, HoodieTableConfig hoodieTableConfig, + FileGroupReaderSchemaHandler createSchemaHandler(HoodieReaderContext readerContext, Schema dataSchema, Schema requestedSchema, boolean supportsParquetRowIndex) { return new ParquetRowIndexBasedSchemaHandler(readerContext, dataSchema, requestedSchema, - Option.empty(), hoodieTableConfig, new TypedProperties()); + Option.empty(), new TypedProperties(), metaClient); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java index 50eaeb454df2a..b4011a7205a5f 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.PartialUpdateMode; import org.apache.hudi.common.table.log.block.HoodieDataBlock; @@ -153,8 +154,8 @@ void readWithEventTimeOrderingWithRecords() throws IOException { readerContext.setHasLogFiles(false); readerContext.setHasBootstrapBaseFile(false); readerContext.initRecordMerger(properties); - FileGroupReaderSchemaHandler schemaHandler = new FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), tableConfig, - properties); + FileGroupReaderSchemaHandler schemaHandler = new FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), + properties, mock(HoodieTableMetaClient.class)); readerContext.setSchemaHandler(schemaHandler); List inputRecords = convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, testRecord2Update, testRecord3Update, testRecord4EarlierUpdate, testRecord7)); inputRecords.addAll(convertToHoodieRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker, testRecord6DeleteByCustomMarker), false)); @@ -220,8 +221,8 @@ void readWithCommitTimeOrderingWithRecords() throws IOException { readerContext.setHasLogFiles(false); readerContext.setHasBootstrapBaseFile(false); readerContext.initRecordMerger(properties); - FileGroupReaderSchemaHandler schemaHandler = new FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), tableConfig, - properties); + FileGroupReaderSchemaHandler schemaHandler = new FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), + properties, mock(HoodieTableMetaClient.class)); readerContext.setSchemaHandler(schemaHandler); List inputRecords = convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, testRecord2Update, testRecord3Update, testRecord4EarlierUpdate, testRecord7)); @@ -299,8 +300,8 @@ void readWithCustomPayloadWithRecords() throws IOException { readerContext.setHasLogFiles(false); readerContext.setHasBootstrapBaseFile(false); readerContext.initRecordMerger(properties); - FileGroupReaderSchemaHandler schemaHandler = new FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), tableConfig, - properties); + FileGroupReaderSchemaHandler schemaHandler = new FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), + properties, mock(HoodieTableMetaClient.class)); readerContext.setSchemaHandler(schemaHandler); List inputRecords = convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, testRecord2Update, testRecord3Update, testRecord4EarlierUpdate)); inputRecords.addAll(convertToHoodieRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker, testRecord6DeleteByCustomMarker), true)); @@ -374,8 +375,8 @@ void readWithCustomMergerWithRecords() throws IOException { readerContext.setHasLogFiles(false); readerContext.setHasBootstrapBaseFile(false); readerContext.initRecordMerger(properties); - FileGroupReaderSchemaHandler schemaHandler = new FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), tableConfig, - properties); + FileGroupReaderSchemaHandler schemaHandler = new FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), + properties, mock(HoodieTableMetaClient.class)); readerContext.setSchemaHandler(schemaHandler); List inputRecords = convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, testRecord2Update, testRecord3Update, testRecord4EarlierUpdate)); inputRecords.addAll(convertToHoodieRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker, testRecord6DeleteByCustomMarker), true)); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java index 0cbf8c2017e51..347718a6db913 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java @@ -123,8 +123,8 @@ void readWithStreamingRecordBufferLoaderAndEventTimeOrdering() throws IOExceptio HoodieReaderContext readerContext = new HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(), Option.empty()); readerContext.setHasLogFiles(false); readerContext.setHasBootstrapBaseFile(false); - FileGroupReaderSchemaHandler schemaHandler = new FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), tableConfig, - properties); + FileGroupReaderSchemaHandler schemaHandler = new FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), + properties, mock(HoodieTableMetaClient.class)); readerContext.setSchemaHandler(schemaHandler); readerContext.initRecordMerger(properties); List inputRecords = diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java index baf1140cfee2e..8c7e5d65a530f 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.PartialUpdateMode; import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler; @@ -83,8 +84,8 @@ void readWithEventTimeOrdering() throws IOException { readerContext.setHasLogFiles(false); readerContext.setHasBootstrapBaseFile(false); readerContext.initRecordMerger(properties); - FileGroupReaderSchemaHandler schemaHandler = new FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), tableConfig, - properties); + FileGroupReaderSchemaHandler schemaHandler = new FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), + properties, mock(HoodieTableMetaClient.class)); readerContext.setSchemaHandler(schemaHandler); List inputRecords = convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, testRecord2Update, testRecord3Update, testRecord4EarlierUpdate, testRecord7)); inputRecords.addAll(convertToHoodieRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker, testRecord6DeleteByCustomMarker), false)); @@ -120,8 +121,8 @@ void readWithCommitTimeOrdering() throws IOException { readerContext.setHasLogFiles(false); readerContext.setHasBootstrapBaseFile(false); readerContext.initRecordMerger(properties); - FileGroupReaderSchemaHandler schemaHandler = new FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), tableConfig, - properties); + FileGroupReaderSchemaHandler schemaHandler = new FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), + properties, mock(HoodieTableMetaClient.class)); readerContext.setSchemaHandler(schemaHandler); List inputRecords = convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, testRecord2Update, testRecord3Update, testRecord4EarlierUpdate, testRecord7)); inputRecords.addAll(convertToHoodieRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker, testRecord6DeleteByCustomMarker), true)); @@ -159,8 +160,8 @@ void readWithCustomPayload() throws IOException { readerContext.setHasLogFiles(false); readerContext.setHasBootstrapBaseFile(false); readerContext.initRecordMerger(properties); - FileGroupReaderSchemaHandler schemaHandler = new FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), tableConfig, - properties); + FileGroupReaderSchemaHandler schemaHandler = new FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), + properties, mock(HoodieTableMetaClient.class)); readerContext.setSchemaHandler(schemaHandler); List inputRecords = convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, testRecord2Update, testRecord3Update, testRecord4EarlierUpdate)); inputRecords.addAll(convertToHoodieRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker, testRecord6DeleteByCustomMarker), true)); @@ -197,8 +198,8 @@ void readWithCustomMergerWithRecords() throws IOException { readerContext.setHasLogFiles(false); readerContext.setHasBootstrapBaseFile(false); readerContext.initRecordMerger(properties); - FileGroupReaderSchemaHandler schemaHandler = new FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), tableConfig, - properties); + FileGroupReaderSchemaHandler schemaHandler = new FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), + properties, mock(HoodieTableMetaClient.class)); readerContext.setSchemaHandler(schemaHandler); List inputRecords = convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, testRecord2Update, testRecord3Update, testRecord4EarlierUpdate)); inputRecords.addAll(convertToHoodieRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker, testRecord6DeleteByCustomMarker), true)); diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java index c7ddd16cb0713..e805e22e0bff4 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java @@ -47,6 +47,7 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -82,7 +83,7 @@ public Set> filterRowKeys(Set candidateRowKeys) { } @Override - public ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) { + public ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema, Map renamedColumns) { if (!Objects.equals(readerSchema, requestedSchema)) { throw new UnsupportedOperationException("Schema projections are not supported in HFile reader"); } @@ -98,10 +99,10 @@ public ClosableIterator getIndexedRecordIterator(Schema readerSch TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(prunedFileSchema); RecordReader recordReader = reader.rows(new Options(hadoopConf).schema(orcSchema)); ClosableIterator recordIterator = new OrcReaderIterator<>(recordReader, prunedFileSchema, orcSchema); - if (readerSchema.equals(fileSchema)) { + if (renamedColumns.isEmpty() && readerSchema.equals(fileSchema)) { return recordIterator; } else { - return new CloseableMappingIterator<>(recordIterator, data -> HoodieAvroUtils.rewriteRecordWithNewSchema(data, requestedSchema)); + return new CloseableMappingIterator<>(recordIterator, data -> HoodieAvroUtils.rewriteRecordWithNewSchema(data, requestedSchema, renamedColumns)); } } catch (IOException io) { throw new HoodieIOException("Unable to create an ORC reader.", io); diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java index 88ff9273196c0..5881003373973 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java @@ -49,7 +49,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import static org.apache.hudi.common.util.TypeUtils.unsafeCast; @@ -100,12 +102,17 @@ public Set> filterRowKeys(Set candidateRowKeys) { @Override protected ClosableIterator getIndexedRecordIterator(Schema schema) throws IOException { - return getIndexedRecordIteratorInternal(schema); + return getIndexedRecordIteratorInternal(schema, Collections.emptyMap()); } @Override public ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException { - return getIndexedRecordIteratorInternal(requestedSchema); + return getIndexedRecordIteratorInternal(requestedSchema, Collections.emptyMap()); + } + + @Override + public ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema, Map renamedColumns) throws IOException { + return getIndexedRecordIteratorInternal(requestedSchema, renamedColumns); } @Override @@ -166,13 +173,13 @@ private static StorageConfiguration tryOverrideDefaultConfigs(StorageConfigur return conf; } - private ClosableIterator getIndexedRecordIteratorInternal(Schema schema) throws IOException { + private ClosableIterator getIndexedRecordIteratorInternal(Schema schema, Map renamedColumns) throws IOException { // NOTE: We have to set both Avro read-schema and projection schema to make // sure that in case the file-schema is not equal to read-schema we'd still // be able to read that file (in case projection is a proper one) Configuration hadoopConf = storage.getConf().unwrapCopyAs(Configuration.class); Option promotedSchema = Option.empty(); - if (HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(getSchema(), schema)) { + if (!renamedColumns.isEmpty() || HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(getSchema(), schema)) { AvroReadSupport.setAvroReadSchema(hadoopConf, getSchema()); AvroReadSupport.setRequestedProjection(hadoopConf, getSchema()); promotedSchema = Option.of(schema); @@ -186,7 +193,7 @@ private ClosableIterator getIndexedRecordIteratorInternal(Schema .set(ParquetInputFormat.STRICT_TYPE_CHECKING, hadoopConf.get(ParquetInputFormat.STRICT_TYPE_CHECKING)) .build(); ParquetReaderIterator parquetReaderIterator = promotedSchema.isPresent() - ? new HoodieAvroParquetReaderIterator(reader, promotedSchema.get()) + ? new HoodieAvroParquetReaderIterator(reader, promotedSchema.get(), renamedColumns) : new ParquetReaderIterator<>(reader); readerIterators.add(parquetReaderIterator); return parquetReaderIterator; diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java index 2723f4d1900fc..c0eaf16d08b59 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java @@ -26,15 +26,20 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.parquet.hadoop.ParquetReader; +import java.util.Map; + public class HoodieAvroParquetReaderIterator extends ParquetReaderIterator { private final Schema promotedSchema; - public HoodieAvroParquetReaderIterator(ParquetReader parquetReader, Schema promotedSchema) { + private final Map renamedColumns; + + public HoodieAvroParquetReaderIterator(ParquetReader parquetReader, Schema promotedSchema, Map renamedColumns) { super(parquetReader); this.promotedSchema = promotedSchema; + this.renamedColumns = renamedColumns; } @Override public IndexedRecord next() { - return HoodieAvroUtils.rewriteRecordWithNewSchema(super.next(), this.promotedSchema); + return HoodieAvroUtils.rewriteRecordWithNewSchema(super.next(), promotedSchema, renamedColumns); } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala index 18cfbf05fe9db..a9ce1a8163021 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala @@ -508,8 +508,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit, readerContext.setHasBootstrapBaseFile(false) readerContext.setHasLogFiles(true) readerContext.setSchemaHandler( - new FileGroupReaderSchemaHandler[InternalRow](readerContext, avroSchema, avroSchema, - Option.empty(), metaClient.getTableConfig, readerProperties)) + new FileGroupReaderSchemaHandler[InternalRow](readerContext, avroSchema, avroSchema, Option.empty(), readerProperties, metaClient)) val stats = new HoodieReadStats keyBasedFileGroupRecordBuffer.ifPresent(k => k.close()) keyBasedFileGroupRecordBuffer = Option.of(new KeyBasedFileGroupRecordBuffer[InternalRow](readerContext, metaClient, readerContext.getMergeMode, diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java index 7b2d825b55db8..44c13eed32b99 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java @@ -139,8 +139,8 @@ private void prepareBuffer(RecordMergeMode mergeMode, String baseFileInstantTime ctx.setRecordMerger(Option.empty()); } ctx.setSchemaHandler(HoodieSparkUtils.gteqSpark3_5() - ? new ParquetRowIndexBasedSchemaHandler<>(ctx, avroSchema, avroSchema, Option.empty(), metaClient.getTableConfig(), new TypedProperties()) - : new FileGroupReaderSchemaHandler<>(ctx, avroSchema, avroSchema, Option.empty(), metaClient.getTableConfig(), new TypedProperties())); + ? new ParquetRowIndexBasedSchemaHandler<>(ctx, avroSchema, avroSchema, Option.empty(), new TypedProperties(), metaClient) + : new FileGroupReaderSchemaHandler<>(ctx, avroSchema, avroSchema, Option.empty(), new TypedProperties(), metaClient)); TypedProperties props = new TypedProperties(); props.put("hoodie.write.record.merge.mode", mergeMode.name()); props.setProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(),String.valueOf(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.defaultValue()));