diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 3e7e0b16e2cf8..0bc349125043f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -59,7 +59,7 @@ public class HoodieCreateHandle extends protected long recordsDeleted = 0; private Map> recordMap; private boolean useWriterSchema = false; - private boolean preserveHoodieMetadata = false; + private final boolean preserveMetadata; public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) { @@ -69,9 +69,9 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier, - boolean preserveHoodieMetadata) { + boolean preserveMetadata) { this(config, instantTime, hoodieTable, partitionPath, fileId, Option.empty(), - taskContextSupplier, preserveHoodieMetadata); + taskContextSupplier, preserveMetadata); } public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, @@ -82,10 +82,10 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, String partitionPath, String fileId, Option overriddenSchema, - TaskContextSupplier taskContextSupplier, boolean preserveHoodieMetadata) { + TaskContextSupplier taskContextSupplier, boolean preserveMetadata) { super(config, instantTime, partitionPath, fileId, hoodieTable, overriddenSchema, taskContextSupplier); - this.preserveHoodieMetadata = preserveHoodieMetadata; + this.preserveMetadata = preserveMetadata; writeStatus.setFileId(fileId); writeStatus.setPartitionPath(partitionPath); writeStatus.setStat(new HoodieWriteStat()); @@ -111,7 +111,7 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, String partitionPath, String fileId, Map> recordMap, TaskContextSupplier taskContextSupplier) { - this(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier); + this(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier, config.isPreserveHoodieCommitMetadataForCompaction()); this.recordMap = recordMap; this.useWriterSchema = true; } @@ -137,13 +137,11 @@ public void write(HoodieRecord record, Option avroRecord) { return; } // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema - IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get()); - if (preserveHoodieMetadata) { - // do not preserve FILENAME_METADATA_FIELD - recordWithMetadataInSchema.put(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD), path.getName()); - fileWriter.writeAvro(record.getRecordKey(), recordWithMetadataInSchema); + if (preserveMetadata) { + fileWriter.writeAvro(record.getRecordKey(), + rewriteRecordWithMetadata((GenericRecord) avroRecord.get(), path.getName())); } else { - fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record); + fileWriter.writeAvroWithMetadata(rewriteRecord((GenericRecord) avroRecord.get()), record); } // update the new location of record, so we know where to find it next record.unseal(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index d38f66a86f912..cbcf382390da0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -61,8 +61,6 @@ import java.util.Map; import java.util.Set; -import static org.apache.hudi.common.model.HoodieRecord.FILENAME_METADATA_FIELD_POS; - @SuppressWarnings("Duplicates") /** * Handle to merge incoming records to those in storage. @@ -264,7 +262,7 @@ private boolean writeUpdateRecord(HoodieRecord hoodieRecord, GenericRecord ol isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation()); } } - return writeRecord(hoodieRecord, indexedRecord, isDelete, oldRecord); + return writeRecord(hoodieRecord, indexedRecord, isDelete); } protected void writeInsertRecord(HoodieRecord hoodieRecord) throws IOException { @@ -274,16 +272,16 @@ protected void writeInsertRecord(HoodieRecord hoodieRecord) throws IOExceptio if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) { return; } - if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()), null)) { + if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()))) { insertRecordsWritten++; } } protected boolean writeRecord(HoodieRecord hoodieRecord, Option indexedRecord) { - return writeRecord(hoodieRecord, indexedRecord, false, null); + return writeRecord(hoodieRecord, indexedRecord, false); } - protected boolean writeRecord(HoodieRecord hoodieRecord, Option indexedRecord, boolean isDelete, GenericRecord oldRecord) { + protected boolean writeRecord(HoodieRecord hoodieRecord, Option indexedRecord, boolean isDelete) { Option recordMetadata = hoodieRecord.getData().getMetadata(); if (!partitionPath.equals(hoodieRecord.getPartitionPath())) { HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: " @@ -294,13 +292,11 @@ protected boolean writeRecord(HoodieRecord hoodieRecord, Option close(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 2f4bca81b18dc..ce167f7c2c750 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -259,7 +259,11 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi .withInlineCompaction(false) .withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()) // we will trigger archive manually, to ensure only regular writer invokes it - .withAutoArchive(false).build()) + .withAutoArchive(false) + // by default, the HFile does not keep the metadata fields, set up as false + // to always use the metadata of the new record. + .withPreserveCommitMetadata(false) + .build()) .withParallelism(parallelism, parallelism) .withDeleteParallelism(parallelism) .withRollbackParallelism(parallelism) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java index 70f5e9f3bfd1d..a5926196ea396 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.testutils.RawTripTestPayload; import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.io.HoodieCreateHandle; @@ -77,6 +78,7 @@ public void tearDown() throws IOException { private WriteStatus prepareFirstRecordCommit(List recordsStrs) throws IOException { // Create a bunch of records with an old version of schema final HoodieWriteConfig config = makeHoodieClientConfig("/exampleSchema.avsc"); + config.setValue(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA, "false"); final HoodieSparkTable table = HoodieSparkTable.create(config, context); final List statuses = jsc.parallelize(Arrays.asList(1)).map(x -> { List insertRecords = new ArrayList<>(); diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 5cb18dc8d1509..e427422c24d8f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -382,23 +382,13 @@ public static GenericRecord rewriteRecord(GenericRecord oldRecord, Schema newSch return newRecord; } - public static GenericRecord rewriteRecord(GenericRecord genericRecord, Schema newSchema, boolean copyOverMetaFields, GenericRecord fallbackRecord) { + public static GenericRecord rewriteRecordWithMetadata(GenericRecord genericRecord, Schema newSchema, String fileName) { GenericRecord newRecord = new GenericData.Record(newSchema); - boolean isSpecificRecord = genericRecord instanceof SpecificRecordBase; for (Schema.Field f : newSchema.getFields()) { - if (!(isSpecificRecord && isMetadataField(f.name()))) { - copyOldValueOrSetDefault(genericRecord, newRecord, f); - } - if (isMetadataField(f.name()) && copyOverMetaFields) { - // if meta field exists in primary generic record, copy over. - if (genericRecord.getSchema().getField(f.name()) != null) { - copyOldValueOrSetDefault(genericRecord, newRecord, f); - } else if (fallbackRecord != null && fallbackRecord.getSchema().getField(f.name()) != null) { - // if not, try to copy from the fallback record. - copyOldValueOrSetDefault(fallbackRecord, newRecord, f); - } - } + copyOldValueOrSetDefault(genericRecord, newRecord, f); } + // do not preserve FILENAME_METADATA_FIELD + newRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, fileName); if (!GenericData.get().validate(newSchema, newRecord)) { throw new SchemaCompatibilityException( "Unable to validate the rewritten record " + genericRecord + " against schema " + newSchema);