Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
protected long recordsDeleted = 0;
private Map<String, HoodieRecord<T>> recordMap;
private boolean useWriterSchema = false;
private boolean preserveHoodieMetadata = false;
private final boolean preserveMetadata;

public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
Expand All @@ -69,9 +69,9 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa

public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier,
boolean preserveHoodieMetadata) {
boolean preserveMetadata) {
this(config, instantTime, hoodieTable, partitionPath, fileId, Option.empty(),
taskContextSupplier, preserveHoodieMetadata);
taskContextSupplier, preserveMetadata);
}

public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Expand All @@ -82,10 +82,10 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa

public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, Option<Schema> overriddenSchema,
TaskContextSupplier taskContextSupplier, boolean preserveHoodieMetadata) {
TaskContextSupplier taskContextSupplier, boolean preserveMetadata) {
super(config, instantTime, partitionPath, fileId, hoodieTable, overriddenSchema,
taskContextSupplier);
this.preserveHoodieMetadata = preserveHoodieMetadata;
this.preserveMetadata = preserveMetadata;
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
writeStatus.setStat(new HoodieWriteStat());
Expand All @@ -111,7 +111,7 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, Map<String, HoodieRecord<T>> recordMap,
TaskContextSupplier taskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier);
this(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier, config.isPreserveHoodieCommitMetadataForCompaction());
this.recordMap = recordMap;
this.useWriterSchema = true;
}
Expand All @@ -137,13 +137,11 @@ public void write(HoodieRecord record, Option<IndexedRecord> avroRecord) {
return;
}
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get());
if (preserveHoodieMetadata) {
// do not preserve FILENAME_METADATA_FIELD
recordWithMetadataInSchema.put(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD), path.getName());
fileWriter.writeAvro(record.getRecordKey(), recordWithMetadataInSchema);
if (preserveMetadata) {
fileWriter.writeAvro(record.getRecordKey(),
rewriteRecordWithMetadata((GenericRecord) avroRecord.get(), path.getName()));
} else {
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
fileWriter.writeAvroWithMetadata(rewriteRecord((GenericRecord) avroRecord.get()), record);
}
// update the new location of record, so we know where to find it next
record.unseal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@
import java.util.Map;
import java.util.Set;

import static org.apache.hudi.common.model.HoodieRecord.FILENAME_METADATA_FIELD_POS;

@SuppressWarnings("Duplicates")
/**
* Handle to merge incoming records to those in storage.
Expand Down Expand Up @@ -264,7 +262,7 @@ private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord ol
isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
}
}
return writeRecord(hoodieRecord, indexedRecord, isDelete, oldRecord);
return writeRecord(hoodieRecord, indexedRecord, isDelete);
}

protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
Expand All @@ -274,16 +272,16 @@ protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOExceptio
if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) {
return;
}
if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()), null)) {
if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
insertRecordsWritten++;
}
}

protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord) {
return writeRecord(hoodieRecord, indexedRecord, false, null);
return writeRecord(hoodieRecord, indexedRecord, false);
}

protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord, boolean isDelete, GenericRecord oldRecord) {
protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord, boolean isDelete) {
Option recordMetadata = hoodieRecord.getData().getMetadata();
if (!partitionPath.equals(hoodieRecord.getPartitionPath())) {
HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "
Expand All @@ -294,13 +292,11 @@ protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord
try {
if (indexedRecord.isPresent() && !isDelete) {
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) indexedRecord.get(), preserveMetadata, oldRecord);
if (preserveMetadata && useWriterSchema) { // useWriteSchema will be true only incase of compaction.
// do not preserve FILENAME_METADATA_FIELD
recordWithMetadataInSchema.put(FILENAME_METADATA_FIELD_POS, newFilePath.getName());
fileWriter.writeAvro(hoodieRecord.getRecordKey(), recordWithMetadataInSchema);
if (preserveMetadata && useWriterSchema) { // useWriteSchema will be true only in case of compaction.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reason why I had to rewriteRecord is that, indexexRecord did not have meta fields with update path from the caller. Hence passed in the oldRecord from which we can deduce the meta fields.

For eg,

the return value from combineAndGetUpdate(...) does have the meta fields.

not sure if we can remove it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically I am talking about this code snippet in HoodieMergeHandle. combinedAvroRecord below does not contain any meta fields. So, if you remove rewriting w/ meta columns, not sure how that would pan out

  public void write(GenericRecord oldRecord) {
    String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt);
    boolean copyOldRecord = true;
    if (keyToNewRecords.containsKey(key)) {
      // If we have duplicate records that we are updating, then the hoodie record will be deflated after
      // writing the first record. So make a copy of the record to be merged
      HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key).newInstance();
      try {
        Option<IndexedRecord> combinedAvroRecord =
            hoodieRecord.getData().combineAndGetUpdateValue(oldRecord,
              useWriterSchema ? tableSchemaWithMetaFields : tableSchema,
                config.getPayloadConfig().getProps());
.
.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we read the old records from base file with metadata fields in schema, see:

And for new records in the in-coming dataset, the schema also includes the metadata fields.
The combineAndGetUpdateValue method also handle the records with write schema(schema with metadata fields)

fileWriter.writeAvro(hoodieRecord.getRecordKey(),
rewriteRecordWithMetadata((GenericRecord) indexedRecord.get(), newFilePath.getName()));
} else {
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, hoodieRecord);
fileWriter.writeAvroWithMetadata(rewriteRecord((GenericRecord) indexedRecord.get()), hoodieRecord);
}
recordsWritten++;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ protected GenericRecord rewriteRecord(GenericRecord record) {
return HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields);
}

protected GenericRecord rewriteRecord(GenericRecord record, boolean copyOverMetaFields, GenericRecord fallbackRecord) {
return HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields, copyOverMetaFields, fallbackRecord);
protected GenericRecord rewriteRecordWithMetadata(GenericRecord record, String fileName) {
return HoodieAvroUtils.rewriteRecordWithMetadata(record, writeSchemaWithMetaFields, fileName);
}

public abstract List<WriteStatus> close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,11 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi
.withInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax())
// we will trigger archive manually, to ensure only regular writer invokes it
.withAutoArchive(false).build())
.withAutoArchive(false)
// by default, the HFile does not keep the metadata fields, set up as false
// to always use the metadata of the new record.
.withPreserveCommitMetadata(false)
.build())
.withParallelism(parallelism, parallelism)
.withDeleteParallelism(parallelism)
.withRollbackParallelism(parallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.HoodieCreateHandle;
Expand Down Expand Up @@ -77,6 +78,7 @@ public void tearDown() throws IOException {
private WriteStatus prepareFirstRecordCommit(List<String> recordsStrs) throws IOException {
// Create a bunch of records with an old version of schema
final HoodieWriteConfig config = makeHoodieClientConfig("/exampleSchema.avsc");
config.setValue(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA, "false");
final HoodieSparkTable table = HoodieSparkTable.create(config, context);
final List<WriteStatus> statuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
List<HoodieRecord> insertRecords = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,23 +382,13 @@ public static GenericRecord rewriteRecord(GenericRecord oldRecord, Schema newSch
return newRecord;
}

public static GenericRecord rewriteRecord(GenericRecord genericRecord, Schema newSchema, boolean copyOverMetaFields, GenericRecord fallbackRecord) {
public static GenericRecord rewriteRecordWithMetadata(GenericRecord genericRecord, Schema newSchema, String fileName) {
GenericRecord newRecord = new GenericData.Record(newSchema);
boolean isSpecificRecord = genericRecord instanceof SpecificRecordBase;
for (Schema.Field f : newSchema.getFields()) {
if (!(isSpecificRecord && isMetadataField(f.name()))) {
copyOldValueOrSetDefault(genericRecord, newRecord, f);
}
if (isMetadataField(f.name()) && copyOverMetaFields) {
// if meta field exists in primary generic record, copy over.
if (genericRecord.getSchema().getField(f.name()) != null) {
copyOldValueOrSetDefault(genericRecord, newRecord, f);
} else if (fallbackRecord != null && fallbackRecord.getSchema().getField(f.name()) != null) {
// if not, try to copy from the fallback record.
copyOldValueOrSetDefault(fallbackRecord, newRecord, f);
}
}
copyOldValueOrSetDefault(genericRecord, newRecord, f);
}
// do not preserve FILENAME_METADATA_FIELD
newRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, fileName);
if (!GenericData.get().validate(newSchema, newRecord)) {
throw new SchemaCompatibilityException(
"Unable to validate the rewritten record " + genericRecord + " against schema " + newSchema);
Expand Down