Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
HoodieTable table = createTable(config, hadoopConf);

HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds, extraMetadata, operationType, config.getSchema(), commitActionType);
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds,
extraMetadata, operationType, table.getTableSchema(config, false).toString(),
commitActionType);
// Finalize write
finalizeWrite(table, instantTime, stats);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private void init(HoodieRecord record) {
private Option<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) {
Option<Map<String, String>> recordMetadata = hoodieRecord.getData().getMetadata();
try {
Option<IndexedRecord> avroRecord = hoodieRecord.getData().getInsertValue(writerSchema);
Option<IndexedRecord> avroRecord = hoodieRecord.getData().getInsertValue(inputSchema);
if (avroRecord.isPresent()) {
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
avroRecord = Option.of(rewriteRecord((GenericRecord) avroRecord.get()));
Expand Down Expand Up @@ -327,7 +327,7 @@ public void doAppend() {
private void appendDataAndDeleteBlocks(Map<HeaderMetadataType, String> header) {
try {
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writerSchemaWithMetafields.toString());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, inputSchemaWithMetaFields.toString());
List<HoodieLogBlock> blocks = new ArrayList<>(2);
if (recordList.size() > 0) {
blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;

Expand All @@ -38,8 +38,7 @@ public class HoodieBootstrapHandle<T extends HoodieRecordPayload, I, K, O> exten
public HoodieBootstrapHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
super(config, commitTime, hoodieTable, partitionPath, fileId,
Pair.of(HoodieAvroUtils.RECORD_KEY_SCHEMA,
HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.RECORD_KEY_SCHEMA)), taskContextSupplier);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bvaradar : can you confirm that this change looks good.

Option.of(HoodieAvroUtils.RECORD_KEY_SCHEMA), taskContextSupplier);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.io.storage.HoodieFileWriter;
Expand Down Expand Up @@ -59,18 +58,13 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
private long insertRecordsWritten = 0;
private long recordsDeleted = 0;
private Map<String, HoodieRecord<T>> recordMap;
private boolean useWriterSchema = false;
private boolean isCompactor = false;

public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, getWriterSchemaIncludingAndExcludingMetadataPair(config),
taskContextSupplier);
}

public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, Pair<Schema, Schema> writerSchemaIncludingAndExcludingMetadataPair,
TaskContextSupplier taskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable, writerSchemaIncludingAndExcludingMetadataPair,
protected HoodieCreateHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable, String partitionPath,
String fileId, Option<Schema> schemaOption,
TaskContextSupplier taskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable, schemaOption,
taskContextSupplier);
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
Expand All @@ -82,13 +76,21 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
partitionMetadata.trySave(getPartitionId());
createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension()));
this.fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, writerSchemaWithMetafields, this.taskContextSupplier);
this.fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config,
tableSchemaWithMetaFields, this.taskContextSupplier);
} catch (IOException e) {
throw new HoodieInsertException("Failed to initialize HoodieStorageWriter for path " + path, e);
}
LOG.info("New CreateHandle for partition :" + partitionPath + " with fileId " + fileId);
}

public HoodieCreateHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable, String partitionPath,
String fileId, TaskContextSupplier taskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, Option.empty(),
taskContextSupplier);
}

/**
* Called by the compactor code path.
*/
Expand All @@ -97,7 +99,7 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa
TaskContextSupplier taskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier);
this.recordMap = recordMap;
this.useWriterSchema = true;
this.isCompactor = true;
}

@Override
Expand Down Expand Up @@ -141,6 +143,7 @@ public void write(HoodieRecord record, Option<IndexedRecord> avroRecord) {
/**
* Writes all records passed.
*/
@SuppressWarnings("unchecked")
public void write() {
Iterator<String> keyIterator;
if (hoodieTable.requireSortedRecords()) {
Expand All @@ -153,10 +156,10 @@ public void write() {
while (keyIterator.hasNext()) {
final String key = keyIterator.next();
HoodieRecord<T> record = recordMap.get(key);
if (useWriterSchema) {
write(record, record.getData().getInsertValue(writerSchemaWithMetafields));
if (isCompactor) {
write(record, record.getData().getInsertValue(inputSchemaWithMetaFields));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor. not exactly related to this patch. But can we rename "useWriterSchema" to something like "isCompactor" so that its explicit and comprehensible.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice suggestion!isCompactor well be better understanding.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually disagree completely :) . Leaking such call hierarchy into a lower level class will lead to more confusions, if say one more code path uses this code.

} else {
write(record, record.getData().getInsertValue(writerSchema));
write(record, record.getData().getInsertValue(inputSchema));
}
}
} catch (IOException io) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,14 @@ public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTab
init(fileId, this.partitionPath, dataFileToBeMerged);
}

@Override
public Schema getWriterSchemaWithMetafields() {
return writerSchemaWithMetafields;
}

public Schema getWriterSchema() {
return writerSchema;
public Schema getReaderSchema() {
return tableSchema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel we could rename this method to getReaderSchema() since this is used only at one place

bootstrapReadSchema = mergeHandle.getWriterSchema();

And we assign it to a var to store read schema anyways.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems reasonable!

}

/**
* Extract old file path, initialize StorageWriter and WriteStatus.
*/
@SuppressWarnings("unchecked")
private void init(String fileId, String partitionPath, HoodieBaseFile baseFileToMerge) {
LOG.info("partitionPath:" + partitionPath + ", fileId to be merged:" + fileId);
this.baseFileToMerge = baseFileToMerge;
Expand Down Expand Up @@ -139,7 +135,8 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo
createMarkerFile(partitionPath, newFileName);

// Create the writer for writing the new version file
fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchemaWithMetafields, taskContextSupplier);
fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config,
tableSchemaWithMetaFields, taskContextSupplier);
} catch (IOException io) {
LOG.error("Error in update task at commit " + instantTime, io);
writeStatus.setGlobalError(io);
Expand All @@ -157,7 +154,7 @@ private void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config.getProps());
LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(),
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(writerSchema));
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(inputSchema));
} catch (IOException io) {
throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
}
Expand Down Expand Up @@ -218,6 +215,7 @@ protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord
/**
* Go through an old record. Here if we detect a newer version shows up, we write the new one to the file.
*/
@SuppressWarnings("unchecked")
public void write(GenericRecord oldRecord) {
String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
boolean copyOldRecord = true;
Expand All @@ -227,8 +225,8 @@ public void write(GenericRecord oldRecord) {
HoodieRecord<T> hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(key));
try {
Option<IndexedRecord> combinedAvroRecord =
hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, useWriterSchema ? writerSchemaWithMetafields : writerSchema,
config.getPayloadConfig().getProps());
hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, useWriterSchema
? inputSchemaWithMetaFields : inputSchema, config.getPayloadConfig().getProps());
if (writeUpdateRecord(hoodieRecord, combinedAvroRecord)) {
/*
* ONLY WHEN 1) we have an update for this key AND 2) We are able to successfully write the the combined new
Expand All @@ -251,7 +249,7 @@ public void write(GenericRecord oldRecord) {
fileWriter.writeAvro(key, oldRecord);
} catch (IOException | RuntimeException e) {
String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s",
key, getOldFilePath(), newFilePath, writerSchemaWithMetafields.toString(true));
key, getOldFilePath(), newFilePath, inputSchemaWithMetaFields.toString(true));
LOG.debug("Old record is " + oldRecord);
throw new HoodieUpsertException(errMsg, e);
}
Expand All @@ -269,9 +267,9 @@ public List<WriteStatus> close() {
HoodieRecord<T> hoodieRecord = newRecordsItr.next();
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
if (useWriterSchema) {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchemaWithMetafields));
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(inputSchemaWithMetaFields));
} else {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchema));
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(inputSchema));
}
insertRecordsWritten++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ public void write(GenericRecord oldRecord) {
}
try {
if (useWriterSchema) {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchemaWithMetafields));
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(inputSchemaWithMetaFields));
} else {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchema));
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(inputSchema));
}
insertRecordsWritten++;
writtenRecordKeys.add(keyToPreWrite);
Expand All @@ -109,9 +109,9 @@ public List<WriteStatus> close() {
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
if (useWriterSchema) {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchemaWithMetafields));
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(inputSchemaWithMetaFields));
} else {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchema));
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(inputSchema));
}
insertRecordsWritten++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieFileWriter;
Expand All @@ -55,47 +54,70 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>

private static final Logger LOG = LogManager.getLogger(HoodieWriteHandle.class);

protected final Schema writerSchema;
protected final Schema writerSchemaWithMetafields;
/**
* The table schema is the schema of the table which used to read/write record from table.
*/
protected final Schema tableSchema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for this change. Definitely looks more readable now and avoids any confusion

/**
* The table schema with meta fields.
*/
protected final Schema tableSchemaWithMetaFields;
/**
* The input schema is the input data schema which used to parse data from incoming record.
*/
protected final Schema inputSchema;
/**
* The input schema with meta fields.
*/
protected final Schema inputSchemaWithMetaFields;

protected HoodieTimer timer;
protected WriteStatus writeStatus;
protected final String partitionPath;
protected final String fileId;
protected final String writeToken;
protected final TaskContextSupplier taskContextSupplier;

public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath,
String fileId, HoodieTable<T, I, K, O> hoodieTable, TaskContextSupplier taskContextSupplier) {
this(config, instantTime, partitionPath, fileId, hoodieTable,
getWriterSchemaIncludingAndExcludingMetadataPair(config), taskContextSupplier);
}
/**
*
* @param config the write config
* @param instantTime the instance time
* @param partitionPath the partition path
* @param fileId the file id
* @param hoodieTable the hoodie table
* @param schemaOption the option schema specified for HoodieBootstrapHandle
* @param taskContextSupplier
*/
protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath,
String fileId, HoodieTable<T, I, K, O> hoodieTable,
Option<Schema> schemaOption,
TaskContextSupplier taskContextSupplier) {

protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath, String fileId,
HoodieTable<T, I, K, O> hoodieTable, Pair<Schema, Schema> writerSchemaIncludingAndExcludingMetadataPair,
TaskContextSupplier taskContextSupplier) {
super(config, instantTime, hoodieTable);
this.partitionPath = partitionPath;
this.fileId = fileId;
this.writerSchema = writerSchemaIncludingAndExcludingMetadataPair.getKey();
this.writerSchemaWithMetafields = writerSchemaIncludingAndExcludingMetadataPair.getValue();
// If the schemaOption has specified,use it,otherwise load the schema from the table.
// The schemaOption will be specified only for HoodieBootstrapHandle.
this.tableSchema = schemaOption.orElseGet(() -> hoodieTable.getTableSchema(config, false));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trying to confirm my understanding. Only for bootstrap code path, we need the table schema to be initialized w/ incoming Schema and can't rely on hoodieTable.getTableSchema(). If not, we should not be looking at schemaOption only to set tableSchema(in every other call paths).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes @nsivabalan ,the schemaOption is only used for HoodieBootstrapHandle to pass it's schema.

this.tableSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(tableSchema);
this.inputSchema = schemaOption.orElseGet(() -> getInputSchema(config));
this.inputSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(inputSchema);
this.timer = new HoodieTimer().startTimer();
this.writeStatus = (WriteStatus) ReflectionUtils.loadClass(config.getWriteStatusClassName(),
!hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction());
!hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction());
this.taskContextSupplier = taskContextSupplier;
this.writeToken = makeWriteToken();
}

/**
* Returns writer schema pairs containing
* (a) Writer Schema from client
* (b) (a) with hoodie metadata fields.
* @param config Write Config
* @return
*/
protected static Pair<Schema, Schema> getWriterSchemaIncludingAndExcludingMetadataPair(HoodieWriteConfig config) {
Schema originalSchema = new Schema.Parser().parse(config.getSchema());
Schema hoodieSchema = HoodieAvroUtils.addMetadataFields(originalSchema);
return Pair.of(originalSchema, hoodieSchema);
public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath,
String fileId, HoodieTable<T, I, K, O> hoodieTable,
TaskContextSupplier taskContextSupplier) {
this(config, instantTime, partitionPath, fileId, hoodieTable,
Option.empty(), taskContextSupplier);
}

private static Schema getInputSchema(HoodieWriteConfig config) {
return new Schema.Parser().parse(config.getSchema());
}

/**
Expand Down Expand Up @@ -127,8 +149,12 @@ protected void createMarkerFile(String partitionPath, String dataFileName) {
markerFiles.create(partitionPath, dataFileName, getIOType());
}

public Schema getWriterSchemaWithMetafields() {
return writerSchemaWithMetafields;
public Schema getTableSchemaWithMetaFields() {
return tableSchemaWithMetaFields;
}

public Schema getInputSchemaWithMetaFields() {
return inputSchemaWithMetaFields;
}

/**
Expand Down Expand Up @@ -166,7 +192,7 @@ public void write(HoodieRecord record, Option<IndexedRecord> avroRecord, Option<
* Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields.
*/
protected GenericRecord rewriteRecord(GenericRecord record) {
return HoodieAvroUtils.rewriteRecord(record, writerSchemaWithMetafields);
return HoodieAvroUtils.rewriteRecord(record, inputSchemaWithMetaFields);
}

public abstract List<WriteStatus> close();
Expand Down
Loading