Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
Expand Down Expand Up @@ -2160,6 +2161,14 @@ public boolean isMetastoreEnabled() {
return metastoreConfig.enableMetastore();
}

/**
* CDC supplemental logging mode.
*/
public HoodieCDCSupplementalLoggingMode getCDCSupplementalLoggingMode() {
return HoodieCDCSupplementalLoggingMode.parse(
getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE));
}

public static class Builder {

protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,6 @@

package org.apache.hudi.io;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
Expand All @@ -48,6 +40,13 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
Expand All @@ -67,13 +66,9 @@ public class HoodieCDCLogger implements Closeable {

private final Schema dataSchema;

private final boolean populateMetaFields;

// writer for cdc data
private final HoodieLogFormat.Writer cdcWriter;

private final boolean cdcEnabled;

private final HoodieCDCSupplementalLoggingMode cdcSupplementalLoggingMode;

private final Schema cdcSchema;
Expand All @@ -83,6 +78,9 @@ public class HoodieCDCLogger implements Closeable {
// the cdc data
private final Map<String, HoodieAvroPayload> cdcData;

// the cdc record transformer
private final CDCTransformer transformer;

public HoodieCDCLogger(
String commitTime,
HoodieWriteConfig config,
Expand All @@ -93,15 +91,11 @@ public HoodieCDCLogger(
try {
this.commitTime = commitTime;
this.dataSchema = HoodieAvroUtils.removeMetadataFields(schema);
this.populateMetaFields = config.populateMetaFields();
this.keyField = populateMetaFields ? HoodieRecord.RECORD_KEY_METADATA_FIELD
this.keyField = config.populateMetaFields()
? HoodieRecord.RECORD_KEY_METADATA_FIELD
: tableConfig.getRecordKeyFieldProp();
this.cdcWriter = cdcWriter;

this.cdcEnabled = config.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED);
this.cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse(
config.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE));

this.cdcSupplementalLoggingMode = config.getCDCSupplementalLoggingMode();
this.cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
cdcSupplementalLoggingMode,
dataSchema
Expand All @@ -114,8 +108,8 @@ public HoodieCDCLogger(
new DefaultSizeEstimator<>(),
new DefaultSizeEstimator<>(),
config.getCommonConfig().getSpillableDiskMapType(),
config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()
);
config.getCommonConfig().isBitCaskDiskMapCompressionEnabled());
this.transformer = getTransformer();
} catch (IOException e) {
throw new HoodieUpsertException("Failed to initialize HoodieCDCLogger", e);
}
Expand All @@ -124,55 +118,25 @@ public HoodieCDCLogger(
public void put(HoodieRecord hoodieRecord,
GenericRecord oldRecord,
Option<IndexedRecord> newRecord) {
if (cdcEnabled) {
String recordKey = hoodieRecord.getRecordKey();
GenericData.Record cdcRecord;
if (newRecord.isPresent()) {
GenericRecord record = (GenericRecord) newRecord.get();
if (oldRecord == null) {
// inserted cdc record
cdcRecord = createCDCRecord(HoodieCDCOperation.INSERT, recordKey,
null, record);
} else {
// updated cdc record
cdcRecord = createCDCRecord(HoodieCDCOperation.UPDATE, recordKey,
oldRecord, record);
}
String recordKey = hoodieRecord.getRecordKey();
GenericData.Record cdcRecord;
if (newRecord.isPresent()) {
GenericRecord record = (GenericRecord) newRecord.get();
if (oldRecord == null) {
// INSERT cdc record
cdcRecord = this.transformer.transform(HoodieCDCOperation.INSERT, recordKey,
null, record);
} else {
// deleted cdc record
cdcRecord = createCDCRecord(HoodieCDCOperation.DELETE, recordKey,
oldRecord, null);
// UPDATE cdc record
cdcRecord = this.transformer.transform(HoodieCDCOperation.UPDATE, recordKey,
oldRecord, record);
}
cdcData.put(recordKey, new HoodieAvroPayload(Option.of(cdcRecord)));
}
}

private GenericData.Record createCDCRecord(HoodieCDCOperation operation,
String recordKey,
GenericRecord oldRecord,
GenericRecord newRecord) {
GenericData.Record record;
if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) {
record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), commitTime,
removeCommitMetadata(oldRecord), newRecord);
} else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) {
record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey,
removeCommitMetadata(oldRecord));
} else {
record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey);
}
return record;
}

private GenericRecord removeCommitMetadata(GenericRecord record) {
if (record == null) {
return null;
// DELETE cdc record
cdcRecord = this.transformer.transform(HoodieCDCOperation.DELETE, recordKey,
oldRecord, null);
}
return HoodieAvroUtils.rewriteRecordWithNewSchema(record, dataSchema, new HashMap<>());
}

public boolean isEmpty() {
return !this.cdcEnabled || this.cdcData.isEmpty();
cdcData.put(recordKey, new HoodieAvroPayload(Option.of(cdcRecord)));
}

public Option<AppendResult> writeCDCData() {
Expand Down Expand Up @@ -219,6 +183,43 @@ public void close() {
}
}

// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------

private CDCTransformer getTransformer() {
if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) {
return (operation, recordKey, oldRecord, newRecord) ->
HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), commitTime, removeCommitMetadata(oldRecord), newRecord);
} else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) {
return (operation, recordKey, oldRecord, newRecord) ->
HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey, removeCommitMetadata(oldRecord));
} else {
return (operation, recordKey, oldRecord, newRecord) ->
HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey);
}
}

private GenericRecord removeCommitMetadata(GenericRecord record) {
return record == null ? null : HoodieAvroUtils.rewriteRecordWithNewSchema(record, dataSchema, Collections.emptyMap());
}

public boolean isEmpty() {
return this.cdcData.isEmpty();
}

public static Option<AppendResult> writeCDCDataIfNeeded(HoodieCDCLogger cdcLogger,
long recordsWritten,
long insertRecordsWritten) {
if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) {
// the following cases where we do not need to write out the cdc file:
// case 1: all the data from the previous file slice are deleted. and no new data is inserted;
// case 2: all the incoming data is INSERT.
return Option.empty();
}
return cdcLogger.writeCDCData();
}

public static void setCDCStatIfNeeded(HoodieWriteStat stat,
Option<AppendResult> cdcResult,
String partitionPath,
Expand All @@ -236,4 +237,19 @@ public static void setCDCStatIfNeeded(HoodieWriteStat stat,
throw new HoodieUpsertException("Failed to set cdc write stat", e);
}
}

// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------

/**
* A transformer that transforms normal data records into cdc records.
*/
private interface CDCTransformer {
GenericData.Record transform(HoodieCDCOperation operation,
String recordKey,
GenericRecord oldRecord,
GenericRecord newRecord);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hudi.io;

import org.apache.hadoop.fs.Path;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
Expand All @@ -33,9 +31,6 @@
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
Expand All @@ -55,7 +50,7 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -66,8 +61,8 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;

@SuppressWarnings("Duplicates")
Expand Down Expand Up @@ -107,8 +102,6 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
protected Map<String, HoodieRecord<T>> keyToNewRecords;
protected Set<String> writtenRecordKeys;
protected HoodieFileWriter<IndexedRecord> fileWriter;
protected boolean cdcEnabled = false;
protected HoodieCDCLogger cdcLogger;
private boolean preserveMetadata = false;

protected Path newFilePath;
Expand Down Expand Up @@ -210,18 +203,6 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo
// Create the writer for writing the new version file
fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config,
writeSchemaWithMetaFields, taskContextSupplier);

// init the cdc logger
this.cdcEnabled = config.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 appreciate your viewpoint regarding separation of concerns in regard to CDC logging, but i don't think this change make sense to me:

  • Instead of having CDC logging centralized in one place we now have it spread across every class that is extending HoodieMergeHandle class (HoodieMergeHandleWithChangelog, HoodieSortedMergeHandleWithChangelog). As i called out on the original PR this is not scalable approach -- we can't introduce sub-classes for every feature we introduce and we need to balance this consideration against other factors such as including, for ex, how many features we have to support (for N features if we do subclassing we will have to do up ton 2^N subclasses). Not long ago i had to inline sub-classes for WriteHandle impls for exactly that reason -- w/o it with this new feature we would have to essentially double the number of classes.

  • Previously, CDC logic was implemented w/in a single class, this impl now introduces 5 new classes to achieve the same thing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes everything into one class is hard to extend for project with huge code bases. And i don't like your refactoring to the SparkRDDRelation and HoodieWriteClient based on the same reason. I have fixed so many bugs/degression after your refactoring in flink side, that makes me feel bad and hard to go on with this project.

Do you think to do a per-record logic switching for non-cdc write path is reasonable ? Sorry i don't think so.

So ignored.

if (cdcEnabled) {
this.cdcLogger = new HoodieCDCLogger(
instantTime,
config,
hoodieTable.getMetaClient().getTableConfig(),
tableSchema,
createLogWriter(Option.empty(), instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
} catch (IOException io) {
LOG.error("Error in update task at commit " + instantTime, io);
writeStatus.setGlobalError(io);
Expand Down Expand Up @@ -287,7 +268,7 @@ protected void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
+ ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes());
}

private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
boolean isDelete = false;
if (indexedRecord.isPresent()) {
updatedRecordsWritten++;
Expand All @@ -300,11 +281,7 @@ private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord ol
return false;
}
}
boolean result = writeRecord(hoodieRecord, indexedRecord, isDelete);
if (result && cdcEnabled) {
cdcLogger.put(hoodieRecord, oldRecord, indexedRecord);
}
return result;
return writeRecord(hoodieRecord, indexedRecord, isDelete);
}

protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
Expand All @@ -314,10 +291,11 @@ protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOExceptio
if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) {
return;
}
writeInsertRecord(hoodieRecord, insertRecord);
}

protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> insertRecord) {
if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
if (cdcEnabled) {
cdcLogger.put(hoodieRecord, null, insertRecord);
}
insertRecordsWritten++;
}
}
Expand Down Expand Up @@ -425,21 +403,9 @@ protected void writeIncomingRecords() throws IOException {
}
}

private Option<AppendResult> writeCDCDataIfNeeded() {
if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) {
// the following cases where we do not need to write out the cdc file:
// case 1: all the data from the previous file slice are deleted. and no new data is inserted;
// case 2: all the data are new-coming,
return Option.empty();
}
return cdcLogger.writeCDCData();
}

@Override
public List<WriteStatus> close() {
try {
HoodieWriteStat stat = writeStatus.getStat();

writeIncomingRecords();

if (keyToNewRecords instanceof ExternalSpillableMap) {
Expand All @@ -454,11 +420,9 @@ public List<WriteStatus> close() {
fileWriter = null;
}

// if there are cdc data written, set the CDC-related information.
Option<AppendResult> cdcResult = writeCDCDataIfNeeded();
HoodieCDCLogger.setCDCStatIfNeeded(stat, cdcResult, partitionPath, fs);

long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);
HoodieWriteStat stat = writeStatus.getStat();

stat.setTotalWriteBytes(fileSizeInBytes);
stat.setFileSizeInBytes(fileSizeInBytes);
stat.setNumWrites(recordsWritten);
Expand Down
Loading