Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ public class HoodieWriteConfig extends HoodieConfig {

public static final ConfigProperty<String> MERGER_STRATEGY = ConfigProperty
.key("hoodie.datasource.write.merger.strategy")
.defaultValue(StringUtils.DEFAULT_MERGER_STRATEGY_UUID)
.withDocumentation("Id of merger strategy. Hudi will pick RecordMergers in hoodie.datasource.write.merger.impls which has the same merger strategy id");
.defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
.withDocumentation("Id of merger strategy. Hudi will pick HoodieRecordMerger implementations in hoodie.datasource.write.merger.impls which has the same merger strategy id");

public static final ConfigProperty<String> KEYGENERATOR_CLASS_NAME = ConfigProperty
.key("hoodie.datasource.write.keygenerator.class")
Expand Down Expand Up @@ -517,7 +517,6 @@ public class HoodieWriteConfig extends HoodieConfig {
private HoodieCommonConfig commonConfig;
private HoodieStorageConfig storageConfig;
private EngineType engineType;
private HoodieRecordMerger recordMerger;

/**
* @deprecated Use {@link #TBL_NAME} and its methods instead
Expand Down Expand Up @@ -894,15 +893,13 @@ protected HoodieWriteConfig() {
super();
this.engineType = EngineType.SPARK;
this.clientSpecifiedViewStorageConfig = null;
applyMergerClass();
}

protected HoodieWriteConfig(EngineType engineType, Properties props) {
super(props);
Properties newProps = new Properties();
newProps.putAll(props);
this.engineType = engineType;
applyMergerClass();
this.consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().fromProperties(newProps).build();
this.fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().fromProperties(newProps).build();
this.clientSpecifiedViewStorageConfig = FileSystemViewStorageConfig.newBuilder().fromProperties(newProps).build();
Expand All @@ -914,15 +911,6 @@ protected HoodieWriteConfig(EngineType engineType, Properties props) {
this.storageConfig = HoodieStorageConfig.newBuilder().fromProperties(props).build();
}

private void applyMergerClass() {
List<String> mergers = getSplitStringsOrDefault(MERGER_IMPLS).stream()
.map(String::trim)
.distinct()
.collect(Collectors.toList());
String mergerStrategy = getString(MERGER_STRATEGY);
this.recordMerger = HoodieRecordUtils.generateRecordMerger(getString(BASE_PATH), engineType, mergers, mergerStrategy);
}

public static HoodieWriteConfig.Builder newBuilder() {
return new Builder();
}
Expand All @@ -935,7 +923,12 @@ public String getBasePath() {
}

public HoodieRecordMerger getRecordMerger() {
return recordMerger;
List<String> mergers = getSplitStringsOrDefault(MERGER_IMPLS).stream()
.map(String::trim)
.distinct()
.collect(Collectors.toList());
String mergerStrategy = getString(MERGER_STRATEGY);
return HoodieRecordUtils.createRecordMerger(getString(BASE_PATH), engineType, mergers, mergerStrategy);
}

public String getSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieLogFile;
Expand All @@ -39,6 +38,7 @@
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
Expand Down Expand Up @@ -253,21 +253,21 @@ private Option<HoodieRecord> prepareRecord(HoodieRecord<T> hoodieRecord) {
}

private HoodieRecord populateMetadataFields(HoodieRecord<T> hoodieRecord, Schema schema, Properties prop) throws IOException {
Map<String, String> metadataValues = new HashMap<>();
String seqId =
HoodieRecord.generateSequenceId(instantTime, getPartitionId(), RECORD_COUNTER.getAndIncrement());
MetadataValues metadataValues = new MetadataValues();
if (config.populateMetaFields()) {
metadataValues.put(HoodieRecord.HoodieMetadataField.FILENAME_METADATA_FIELD.getFieldName(), fileId);
metadataValues.put(HoodieRecord.HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.getFieldName(), partitionPath);
metadataValues.put(HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName(), hoodieRecord.getRecordKey());
metadataValues.put(HoodieRecord.HoodieMetadataField.COMMIT_TIME_METADATA_FIELD.getFieldName(), instantTime);
metadataValues.put(HoodieRecord.HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD.getFieldName(), seqId);
String seqId =
HoodieRecord.generateSequenceId(instantTime, getPartitionId(), RECORD_COUNTER.getAndIncrement());
metadataValues.setFileName(fileId);
metadataValues.setPartitionPath(partitionPath);
metadataValues.setRecordKey(hoodieRecord.getRecordKey());
metadataValues.setCommitTime(instantTime);
metadataValues.setCommitSeqno(seqId);
}
if (config.allowOperationMetadataField()) {
metadataValues.put(HoodieRecord.HoodieMetadataField.OPERATION_METADATA_FIELD.getFieldName(), hoodieRecord.getOperation().getName());
metadataValues.setOperation(hoodieRecord.getOperation().getName());
}

return hoodieRecord.updateValues(schema, prop, metadataValues);
return hoodieRecord.updateMetadataValues(schema, prop, metadataValues);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to update meta values if we're not populating them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if config.populateMetaFields=false, then metadataValues is empty. And hoodieRecord.updateMetadataValues will do nothing.

}

private void initNewStatus() {
Expand Down Expand Up @@ -380,7 +380,7 @@ private void processAppendResult(AppendResult result, List<HoodieRecord> recordL

List<IndexedRecord> indexedRecords = new LinkedList<>();
for (HoodieRecord hoodieRecord : recordList) {
indexedRecords.add(((HoodieAvroIndexedRecord) hoodieRecord.toIndexedRecord(tableSchema, config.getProps()).get()).getData());
indexedRecords.add(hoodieRecord.toIndexedRecord(tableSchema, config.getProps()).get().getData());
}

Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangesMetadataMap =
Expand Down Expand Up @@ -511,7 +511,7 @@ private void writeToBuffer(HoodieRecord<T> record) {
record.seal();
}
// fetch the ordering val first in case the record was deflated.
final Comparable<?> orderingVal = record.getOrderingValue(config.getProps());
final Comparable<?> orderingVal = record.getOrderingValue(tableSchema, config.getProps());
Option<HoodieRecord> indexedRecord = prepareRecord(record);
if (indexedRecord.isPresent()) {
// Skip the ignored record.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTa
*/
@Override
public void write(HoodieRecord oldRecord) {
String key = oldRecord.getRecordKey(keyGeneratorOpt);
Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this condition change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wzx140 would need to clarify this one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In previous, it is write(GenericRecord oldRecord). I just want to get the schema. The old condition is wrong.

oldRecord must be read from table. So its schema is config.populateMetaFields() ? tableSchemaWithMetaFields : tableSchema

newRecord from keyToNewRecords must be incomming or compaction. So its schema is useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema

Schema oldSchema = config.populateMetaFields() ? tableSchemaWithMetaFields : tableSchema;
String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
try {
// NOTE: We're enforcing preservation of the record metadata to keep existing semantic
writeToFile(new HoodieKey(key, partitionPath), oldRecord, schema, config.getPayloadConfig().getProps(), true);
writeToFile(new HoodieKey(key, partitionPath), oldRecord, oldSchema, config.getPayloadConfig().getProps(), true);
} catch (IOException | RuntimeException e) {
String errMsg = String.format("Failed to write old record into new file for key %s from old file %s to new file %s with writerSchema %s",
key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
Expand All @@ -109,14 +109,15 @@ public void write(HoodieRecord oldRecord) {

@Override
protected void writeIncomingRecords() throws IOException {
Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
while (recordItr.hasNext()) {
HoodieRecord<T> record = recordItr.next();
if (needsUpdateLocation()) {
record.unseal();
record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
record.seal();
}
writeInsertRecord(record);
writeInsertRecord(record, schema);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
Expand Down Expand Up @@ -143,7 +143,9 @@ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props
} else {
rewriteRecord = record.rewriteRecord(schema, config.getProps(), writeSchemaWithMetaFields);
}
rewriteRecord = rewriteRecord.updateValues(writeSchemaWithMetaFields, config.getProps(), Collections.singletonMap(HoodieMetadataField.FILENAME_METADATA_FIELD.getFieldName(), path.getName()));
MetadataValues metadataValues = new MetadataValues();
metadataValues.setFileName(path.getName());
rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, config.getProps(), metadataValues);
if (preserveMetadata) {
fileWriter.write(record.getRecordKey(), rewriteRecord, writeSchemaWithMetaFields);
} else {
Expand Down Expand Up @@ -185,11 +187,7 @@ public void write() {
while (keyIterator.hasNext()) {
final String key = keyIterator.next();
HoodieRecord<T> record = recordMap.get(key);
if (useWriterSchema) {
write(record, tableSchemaWithMetaFields, config.getProps());
} else {
write(record, useWriterSchema ? tableSchemaWithMetaFields : tableSchema, config.getProps());
}
write(record, useWriterSchema ? tableSchemaWithMetaFields : tableSchema, config.getProps());
}
}

Expand Down
Loading