Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
Expand Down Expand Up @@ -465,23 +464,6 @@ public List<WriteStatus> writeStatuses() {
return statuses;
}

private Writer createLogWriter(Option<FileSlice> fileSlice, String baseCommitTime)
throws IOException {
Option<HoodieLogFile> latestLogFile = fileSlice.get().getLatestLogFile();

return HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partitionPath))
.withFileId(fileId)
.overBaseCommit(baseCommitTime)
.withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
.withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L))
.withSizeThreshold(config.getLogFileMaxSize())
.withFs(fs)
.withRolloverLogWriteToken(writeToken)
.withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken))
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
}

/**
* Whether there is need to update the record location.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.hudi.io;

import org.apache.avro.generic.GenericData;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.SerializableRecord;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
Expand All @@ -30,14 +34,23 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.cdc.CDCUtils;
import org.apache.hudi.common.table.cdc.CDCOperationEnum;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.storage.HoodieFileReader;
Expand All @@ -58,12 +71,15 @@

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

@SuppressWarnings("Duplicates")
/**
Expand Down Expand Up @@ -102,6 +118,15 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
protected Map<String, HoodieRecord<T>> keyToNewRecords;
protected Set<String> writtenRecordKeys;
protected HoodieFileWriter<IndexedRecord> fileWriter;
// a flag that indicate whether allow the change data to write out a cdc log file.
protected boolean cdcEnabled = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a sub-class of HoodieAppendHandle - HoodieChangeTrackingAppendHandle and move all the code related to persisting row-level change tracking metadata to the subclass. I prefer naming all methods/parameters as changeTracking instead of CDC. CDC is a feature, ChangeTracking is the action you do during write.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you mean HoodieChangeTrackingMergeHandle?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @prasannarajaperumal
i try to create some sub-classes HoodieChangeTrackingMergeHandle, HoodieChangeTrackingSortedMergeHandle and HoodieChangeTrackingConcatHandle, and add the logical to judge whether HoodieChangeTrackingXXXHandle should be created at all the places where HoodieMergeHandle and other classes are created before. I think it is maybe less clear.

protected boolean cdcSupplementalLogging = true;
// writer for cdc data
protected HoodieLogFormat.Writer cdcWriter;
// the cdc data
protected Map<String, SerializableRecord> cdcData;
//
private final AtomicLong writtenRecordCount = new AtomicLong(-1);
private boolean preserveMetadata = false;

protected Path newFilePath;
Expand Down Expand Up @@ -203,6 +228,18 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo
// Create the writer for writing the new version file
fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config,
writeSchemaWithMetaFields, taskContextSupplier);

// init the writer for cdc data and the flag
cdcEnabled = config.getBoolean(HoodieTableConfig.CDC_ENABLED);
cdcSupplementalLogging = config.getBoolean(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_ENABLED);
if (cdcEnabled) {
cdcWriter = createLogWriter(Option.empty(), instantTime);
long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config);
cdcData = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(),
new DefaultSizeEstimator(), new DefaultSizeEstimator(),
config.getCommonConfig().getSpillableDiskMapType(),
config.getCommonConfig().isBitCaskDiskMapCompressionEnabled());
}
} catch (IOException io) {
LOG.error("Error in update task at commit " + instantTime, io);
writeStatus.setGlobalError(io);
Expand Down Expand Up @@ -281,7 +318,17 @@ private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord ol
return false;
}
}
return writeRecord(hoodieRecord, indexedRecord, isDelete);
boolean result = writeRecord(hoodieRecord, indexedRecord, isDelete);
if (cdcEnabled) {
String recordKey = oldRecord.get(keyField).toString();
if (indexedRecord.isPresent()) {
GenericRecord record = (GenericRecord) indexedRecord.get();
cdcData.put(recordKey, cdcRecord(CDCOperationEnum.UPDATE, recordKey, hoodieRecord.getPartitionPath(), oldRecord, record));
} else {
cdcData.put(recordKey, cdcRecord(CDCOperationEnum.DELETE, recordKey, partitionPath, oldRecord, null));
}
}
return result;
}

protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
Expand All @@ -292,6 +339,10 @@ protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOExceptio
return;
}
if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
if (cdcEnabled && insertRecord.isPresent()) {
cdcData.put(hoodieRecord.getRecordKey(), cdcRecord(CDCOperationEnum.INSERT,
hoodieRecord.getRecordKey(), partitionPath, null, (GenericRecord) insertRecord.get()));
}
insertRecordsWritten++;
}
}
Expand Down Expand Up @@ -385,6 +436,7 @@ protected void writeToFile(HoodieKey key, GenericRecord avroRecord, boolean shou
} else {
fileWriter.writeAvroWithMetadata(key, rewriteRecord(avroRecord));
}
writtenRecordCount.getAndIncrement();
}

protected void writeIncomingRecords() throws IOException {
Expand All @@ -399,9 +451,61 @@ protected void writeIncomingRecords() throws IOException {
}
}

protected SerializableRecord cdcRecord(CDCOperationEnum operation, String recordKey, String partitionPath,
GenericRecord oldRecord, GenericRecord newRecord) {
GenericData.Record record;
if (cdcSupplementalLogging) {
record = CDCUtils.cdcRecord(operation.getValue(), instantTime,
oldRecord, addCommitMetadata(newRecord, recordKey, partitionPath));
} else {
record = CDCUtils.cdcRecord(operation.getValue(), recordKey);
}
return new SerializableRecord(record);
}

protected GenericRecord addCommitMetadata(GenericRecord record, String recordKey, String partitionPath) {
if (record != null && config.populateMetaFields()) {
GenericRecord rewriteRecord = rewriteRecord(record);
String seqId = HoodieRecord.generateSequenceId(instantTime, getPartitionId(), writtenRecordCount.get());
HoodieAvroUtils.addCommitMetadataToRecord(rewriteRecord, instantTime, seqId);
HoodieAvroUtils.addHoodieKeyToRecord(rewriteRecord, recordKey, partitionPath, newFilePath.getName());
return rewriteRecord;
}
return record;
}

protected AppendResult writeCDCData() {
if (!cdcEnabled || cdcData.isEmpty() || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) {
// the following cases where we do not need to write out the cdc file:
// case 1: all the data from the previous file slice are deleted. and no new data is inserted;
// case 2: all the data are new-coming,
return null;
}
try {
String keyField = config.populateMetaFields()
? HoodieRecord.RECORD_KEY_METADATA_FIELD
: hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
if (cdcSupplementalLogging) {
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, CDCUtils.CDC_SCHEMA_STRING);
} else {
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, CDCUtils.CDC_SCHEMA_ONLY_OP_AND_RECORDKEY_STRING);
}
List<IndexedRecord> records = cdcData.values().stream()
.map(SerializableRecord::getRecord).collect(Collectors.toList());
HoodieLogBlock block = new HoodieCDCDataBlock(records, header, keyField);
return cdcWriter.appendBlocks(Collections.singletonList(block));
} catch (Exception e) {
throw new HoodieException("Failed to write the cdc data to " + cdcWriter.getLogFile().getPath(), e);
}
}

@Override
public List<WriteStatus> close() {
try {
HoodieWriteStat stat = writeStatus.getStat();

writeIncomingRecords();

if (keyToNewRecords instanceof ExternalSpillableMap) {
Expand All @@ -416,9 +520,21 @@ public List<WriteStatus> close() {
fileWriter = null;
}

long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);
HoodieWriteStat stat = writeStatus.getStat();
AppendResult result = writeCDCData();
if (cdcWriter != null) {
cdcWriter.close();
cdcWriter = null;
cdcData.clear();
}
if (result != null) {
String cdcFileName = result.logFile().getPath().getName();
String cdcPath = StringUtils.isNullOrEmpty(partitionPath) ? cdcFileName : partitionPath + "/" + cdcFileName;
long cdcFileSizeInBytes = FSUtils.getFileSize(fs, result.logFile().getPath());
stat.setCdcPath(cdcPath);
stat.setCdcWriteBytes(cdcFileSizeInBytes);
}

long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);
stat.setTotalWriteBytes(fileSizeInBytes);
stat.setFileSizeInBytes(fileSizeInBytes);
stat.setNumWrites(recordsWritten);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

package org.apache.hudi.io;

import org.apache.avro.generic.IndexedRecord;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.cdc.CDCOperationEnum;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
Expand Down Expand Up @@ -93,13 +96,19 @@ public void write(GenericRecord oldRecord) {
throw new HoodieUpsertException("Insert/Update not in sorted order");
}
try {
Option<IndexedRecord> insertRecord;
if (useWriterSchemaForCompaction) {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
insertRecord = hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps());
} else {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
insertRecord = hoodieRecord.getData().getInsertValue(tableSchema, config.getProps());
}
writeRecord(hoodieRecord, insertRecord);
insertRecordsWritten++;
writtenRecordKeys.add(keyToPreWrite);
if (cdcEnabled) {
cdcData.put(hoodieRecord.getRecordKey(), cdcRecord(CDCOperationEnum.INSERT,
hoodieRecord.getRecordKey(), partitionPath, null, (GenericRecord) insertRecord.get()));
}
} catch (IOException e) {
throw new HoodieUpsertException("Failed to write records", e);
}
Expand All @@ -116,12 +125,18 @@ public List<WriteStatus> close() {
String key = newRecordKeysSorted.poll();
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
Option<IndexedRecord> insertRecord;
if (useWriterSchemaForCompaction) {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
insertRecord = hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps());
} else {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
insertRecord = hoodieRecord.getData().getInsertValue(tableSchema, config.getProps());
}
writeRecord(hoodieRecord, insertRecord);
insertRecordsWritten++;
if (cdcEnabled) {
cdcData.put(hoodieRecord.getRecordKey(), cdcRecord(CDCOperationEnum.INSERT,
hoodieRecord.getRecordKey(), partitionPath, null, (GenericRecord) insertRecord.get()));
}
}
} catch (IOException e) {
throw new HoodieUpsertException("Failed to close UpdateHandle", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
Expand Down Expand Up @@ -94,6 +97,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
*/
protected final Schema writeSchema;
protected final Schema writeSchemaWithMetaFields;
protected final String keyField;

protected HoodieTimer timer;
protected WriteStatus writeStatus;
Expand All @@ -114,6 +118,8 @@ protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String
HoodieTable<T, I, K, O> hoodieTable, Option<Schema> overriddenSchema,
TaskContextSupplier taskContextSupplier) {
super(config, Option.of(instantTime), hoodieTable);
this.keyField = config.populateMetaFields() ? HoodieRecord.RECORD_KEY_METADATA_FIELD
: hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
this.partitionPath = partitionPath;
this.fileId = fileId;
this.tableSchema = overriddenSchema.orElseGet(() -> getSpecifiedTableSchema(config));
Expand Down Expand Up @@ -273,6 +279,33 @@ protected HoodieFileWriter createNewFileWriter(String instantTime, Path path, Ho
return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, schema, taskContextSupplier);
}

protected HoodieLogFormat.Writer createLogWriter(
Option<FileSlice> fileSlice, String baseCommitTime) throws IOException {
int logVersion = HoodieLogFile.LOGFILE_BASE_VERSION;
long logFileSize = 0L;
String logWriteToken = writeToken;
if (fileSlice.isPresent()) {
Option<HoodieLogFile> latestLogFileOpt = fileSlice.get().getLatestLogFile();
if (latestLogFileOpt.isPresent()) {
HoodieLogFile latestLogFile = latestLogFileOpt.get();
logVersion = latestLogFile.getLogVersion();
logFileSize = latestLogFile.getFileSize();
logWriteToken = FSUtils.getWriteTokenFromLogPath(latestLogFile.getPath());
}
}
return HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partitionPath))
.withFileId(fileId)
.overBaseCommit(baseCommitTime)
.withLogVersion(logVersion)
.withFileSize(logFileSize)
.withSizeThreshold(config.getLogFileMaxSize())
.withFs(fs)
.withRolloverLogWriteToken(writeToken)
.withLogWriteToken(logWriteToken)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
}

private static class IgnoreRecord implements GenericRecord {

@Override
Expand Down
Loading