diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 7f805410ea962..032d790d636c5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -40,6 +40,7 @@ import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode; import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; @@ -2160,6 +2161,14 @@ public boolean isMetastoreEnabled() { return metastoreConfig.enableMetastore(); } + /** + * CDC supplemental logging mode. + */ + public HoodieCDCSupplementalLoggingMode getCDCSupplementalLoggingMode() { + return HoodieCDCSupplementalLoggingMode.parse( + getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE)); + } + public static class Builder { protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java index e4f1e14252afb..f57b195c763eb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java @@ -18,14 +18,6 @@ package org.apache.hudi.io; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieAvroPayload; @@ -48,6 +40,13 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieUpsertException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + import java.io.Closeable; import java.io.IOException; import java.util.Collections; @@ -67,13 +66,9 @@ public class HoodieCDCLogger implements Closeable { private final Schema dataSchema; - private final boolean populateMetaFields; - // writer for cdc data private final HoodieLogFormat.Writer cdcWriter; - private final boolean cdcEnabled; - private final HoodieCDCSupplementalLoggingMode cdcSupplementalLoggingMode; private final Schema cdcSchema; @@ -83,6 +78,9 @@ public class HoodieCDCLogger implements Closeable { // the cdc data private final Map cdcData; + // the cdc record transformer + private final CDCTransformer transformer; + public HoodieCDCLogger( String commitTime, HoodieWriteConfig config, @@ -93,15 +91,11 @@ public HoodieCDCLogger( try { this.commitTime = commitTime; this.dataSchema = HoodieAvroUtils.removeMetadataFields(schema); - this.populateMetaFields = config.populateMetaFields(); - this.keyField = populateMetaFields ? HoodieRecord.RECORD_KEY_METADATA_FIELD + this.keyField = config.populateMetaFields() + ? HoodieRecord.RECORD_KEY_METADATA_FIELD : tableConfig.getRecordKeyFieldProp(); this.cdcWriter = cdcWriter; - - this.cdcEnabled = config.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED); - this.cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse( - config.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE)); - + this.cdcSupplementalLoggingMode = config.getCDCSupplementalLoggingMode(); this.cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode( cdcSupplementalLoggingMode, dataSchema @@ -114,8 +108,8 @@ public HoodieCDCLogger( new DefaultSizeEstimator<>(), new DefaultSizeEstimator<>(), config.getCommonConfig().getSpillableDiskMapType(), - config.getCommonConfig().isBitCaskDiskMapCompressionEnabled() - ); + config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()); + this.transformer = getTransformer(); } catch (IOException e) { throw new HoodieUpsertException("Failed to initialize HoodieCDCLogger", e); } @@ -124,55 +118,25 @@ public HoodieCDCLogger( public void put(HoodieRecord hoodieRecord, GenericRecord oldRecord, Option newRecord) { - if (cdcEnabled) { - String recordKey = hoodieRecord.getRecordKey(); - GenericData.Record cdcRecord; - if (newRecord.isPresent()) { - GenericRecord record = (GenericRecord) newRecord.get(); - if (oldRecord == null) { - // inserted cdc record - cdcRecord = createCDCRecord(HoodieCDCOperation.INSERT, recordKey, - null, record); - } else { - // updated cdc record - cdcRecord = createCDCRecord(HoodieCDCOperation.UPDATE, recordKey, - oldRecord, record); - } + String recordKey = hoodieRecord.getRecordKey(); + GenericData.Record cdcRecord; + if (newRecord.isPresent()) { + GenericRecord record = (GenericRecord) newRecord.get(); + if (oldRecord == null) { + // INSERT cdc record + cdcRecord = this.transformer.transform(HoodieCDCOperation.INSERT, recordKey, + null, record); } else { - // deleted cdc record - cdcRecord = createCDCRecord(HoodieCDCOperation.DELETE, recordKey, - oldRecord, null); + // UPDATE cdc record + cdcRecord = this.transformer.transform(HoodieCDCOperation.UPDATE, recordKey, + oldRecord, record); } - cdcData.put(recordKey, new HoodieAvroPayload(Option.of(cdcRecord))); - } - } - - private GenericData.Record createCDCRecord(HoodieCDCOperation operation, - String recordKey, - GenericRecord oldRecord, - GenericRecord newRecord) { - GenericData.Record record; - if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) { - record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), commitTime, - removeCommitMetadata(oldRecord), newRecord); - } else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) { - record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey, - removeCommitMetadata(oldRecord)); } else { - record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey); - } - return record; - } - - private GenericRecord removeCommitMetadata(GenericRecord record) { - if (record == null) { - return null; + // DELETE cdc record + cdcRecord = this.transformer.transform(HoodieCDCOperation.DELETE, recordKey, + oldRecord, null); } - return HoodieAvroUtils.rewriteRecordWithNewSchema(record, dataSchema, new HashMap<>()); - } - - public boolean isEmpty() { - return !this.cdcEnabled || this.cdcData.isEmpty(); + cdcData.put(recordKey, new HoodieAvroPayload(Option.of(cdcRecord))); } public Option writeCDCData() { @@ -219,6 +183,43 @@ public void close() { } } + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + private CDCTransformer getTransformer() { + if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) { + return (operation, recordKey, oldRecord, newRecord) -> + HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), commitTime, removeCommitMetadata(oldRecord), newRecord); + } else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) { + return (operation, recordKey, oldRecord, newRecord) -> + HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey, removeCommitMetadata(oldRecord)); + } else { + return (operation, recordKey, oldRecord, newRecord) -> + HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey); + } + } + + private GenericRecord removeCommitMetadata(GenericRecord record) { + return record == null ? null : HoodieAvroUtils.rewriteRecordWithNewSchema(record, dataSchema, Collections.emptyMap()); + } + + public boolean isEmpty() { + return this.cdcData.isEmpty(); + } + + public static Option writeCDCDataIfNeeded(HoodieCDCLogger cdcLogger, + long recordsWritten, + long insertRecordsWritten) { + if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) { + // the following cases where we do not need to write out the cdc file: + // case 1: all the data from the previous file slice are deleted. and no new data is inserted; + // case 2: all the incoming data is INSERT. + return Option.empty(); + } + return cdcLogger.writeCDCData(); + } + public static void setCDCStatIfNeeded(HoodieWriteStat stat, Option cdcResult, String partitionPath, @@ -236,4 +237,19 @@ public static void setCDCStatIfNeeded(HoodieWriteStat stat, throw new HoodieUpsertException("Failed to set cdc write stat", e); } } + + // ------------------------------------------------------------------------- + // Inner Class + // ------------------------------------------------------------------------- + + /** + * A transformer that transforms normal data records into cdc records. + */ + private interface CDCTransformer { + GenericData.Record transform(HoodieCDCOperation operation, + String recordKey, + GenericRecord oldRecord, + GenericRecord newRecord); + + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 442256ade348d..5515c2552e1ea 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -18,8 +18,6 @@ package org.apache.hudi.io; -import org.apache.hadoop.fs.Path; - import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; @@ -33,9 +31,6 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; import org.apache.hudi.common.model.IOType; -import org.apache.hudi.common.table.HoodieTableConfig; -import org.apache.hudi.common.table.cdc.HoodieCDCUtils; -import org.apache.hudi.common.table.log.AppendResult; import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.HoodieRecordSizeEstimator; import org.apache.hudi.common.util.Option; @@ -55,7 +50,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; - +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -66,8 +61,8 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.NoSuchElementException; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Set; @SuppressWarnings("Duplicates") @@ -107,8 +102,6 @@ public class HoodieMergeHandle extends H protected Map> keyToNewRecords; protected Set writtenRecordKeys; protected HoodieFileWriter fileWriter; - protected boolean cdcEnabled = false; - protected HoodieCDCLogger cdcLogger; private boolean preserveMetadata = false; protected Path newFilePath; @@ -210,18 +203,6 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo // Create the writer for writing the new version file fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writeSchemaWithMetaFields, taskContextSupplier); - - // init the cdc logger - this.cdcEnabled = config.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED); - if (cdcEnabled) { - this.cdcLogger = new HoodieCDCLogger( - instantTime, - config, - hoodieTable.getMetaClient().getTableConfig(), - tableSchema, - createLogWriter(Option.empty(), instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX), - IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config)); - } } catch (IOException io) { LOG.error("Error in update task at commit " + instantTime, io); writeStatus.setGlobalError(io); @@ -287,7 +268,7 @@ protected void init(String fileId, Iterator> newRecordsItr) { + ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes()); } - private boolean writeUpdateRecord(HoodieRecord hoodieRecord, GenericRecord oldRecord, Option indexedRecord) { + protected boolean writeUpdateRecord(HoodieRecord hoodieRecord, GenericRecord oldRecord, Option indexedRecord) { boolean isDelete = false; if (indexedRecord.isPresent()) { updatedRecordsWritten++; @@ -300,11 +281,7 @@ private boolean writeUpdateRecord(HoodieRecord hoodieRecord, GenericRecord ol return false; } } - boolean result = writeRecord(hoodieRecord, indexedRecord, isDelete); - if (result && cdcEnabled) { - cdcLogger.put(hoodieRecord, oldRecord, indexedRecord); - } - return result; + return writeRecord(hoodieRecord, indexedRecord, isDelete); } protected void writeInsertRecord(HoodieRecord hoodieRecord) throws IOException { @@ -314,10 +291,11 @@ protected void writeInsertRecord(HoodieRecord hoodieRecord) throws IOExceptio if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) { return; } + writeInsertRecord(hoodieRecord, insertRecord); + } + + protected void writeInsertRecord(HoodieRecord hoodieRecord, Option insertRecord) { if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()))) { - if (cdcEnabled) { - cdcLogger.put(hoodieRecord, null, insertRecord); - } insertRecordsWritten++; } } @@ -425,21 +403,9 @@ protected void writeIncomingRecords() throws IOException { } } - private Option writeCDCDataIfNeeded() { - if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) { - // the following cases where we do not need to write out the cdc file: - // case 1: all the data from the previous file slice are deleted. and no new data is inserted; - // case 2: all the data are new-coming, - return Option.empty(); - } - return cdcLogger.writeCDCData(); - } - @Override public List close() { try { - HoodieWriteStat stat = writeStatus.getStat(); - writeIncomingRecords(); if (keyToNewRecords instanceof ExternalSpillableMap) { @@ -454,11 +420,9 @@ public List close() { fileWriter = null; } - // if there are cdc data written, set the CDC-related information. - Option cdcResult = writeCDCDataIfNeeded(); - HoodieCDCLogger.setCDCStatIfNeeded(stat, cdcResult, partitionPath, fs); - long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath); + HoodieWriteStat stat = writeStatus.getStat(); + stat.setTotalWriteBytes(fileSizeInBytes); stat.setFileSizeInBytes(fileSizeInBytes); stat.setNumWrites(recordsWritten); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java new file mode 100644 index 0000000000000..436eff5dac54d --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.keygen.BaseKeyGenerator; +import org.apache.hudi.table.HoodieTable; + +import java.util.Iterator; +import java.util.Map; + +/** + * Factory class for hoodie merge handle. + */ +public class HoodieMergeHandleFactory { + /** + * Creates a merge handle for normal write path. + */ + public static HoodieMergeHandle create( + WriteOperationType operationType, + HoodieWriteConfig writeConfig, + String instantTime, + HoodieTable table, + Iterator> recordItr, + String partitionPath, + String fileId, + TaskContextSupplier taskContextSupplier, + Option keyGeneratorOpt) { + if (table.requireSortedRecords()) { + if (table.getMetaClient().getTableConfig().isCDCEnabled()) { + return new HoodieSortedMergeHandleWithChangeLog<>(writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, + keyGeneratorOpt); + } else { + return new HoodieSortedMergeHandle<>(writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, + keyGeneratorOpt); + } + } else if (!WriteOperationType.isChangingRecords(operationType) && writeConfig.allowDuplicateInserts()) { + return new HoodieConcatHandle<>(writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt); + } else { + if (table.getMetaClient().getTableConfig().isCDCEnabled()) { + return new HoodieMergeHandleWithChangeLog<>(writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt); + } else { + return new HoodieMergeHandle<>(writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt); + } + } + } + + /** + * Creates a merge handle for compaction path. + */ + public static HoodieMergeHandle create( + HoodieWriteConfig writeConfig, + String instantTime, + HoodieTable table, + Map> keyToNewRecords, + String partitionPath, + String fileId, + HoodieBaseFile dataFileToBeMerged, + TaskContextSupplier taskContextSupplier, + Option keyGeneratorOpt) { + if (table.requireSortedRecords()) { + if (table.getMetaClient().getTableConfig().isCDCEnabled()) { + return new HoodieSortedMergeHandleWithChangeLog<>(writeConfig, instantTime, table, keyToNewRecords, partitionPath, fileId, + dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt); + } else { + return new HoodieSortedMergeHandle<>(writeConfig, instantTime, table, keyToNewRecords, partitionPath, fileId, + dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt); + } + } else { + if (table.getMetaClient().getTableConfig().isCDCEnabled()) { + return new HoodieMergeHandleWithChangeLog<>(writeConfig, instantTime, table, keyToNewRecords, partitionPath, fileId, + dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt); + } else { + return new HoodieMergeHandle<>(writeConfig, instantTime, table, keyToNewRecords, partitionPath, fileId, + dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt); + } + } + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java new file mode 100644 index 0000000000000..12e48ffbb4568 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.cdc.HoodieCDCUtils; +import org.apache.hudi.common.table.log.AppendResult; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.keygen.BaseKeyGenerator; +import org.apache.hudi.table.HoodieTable; + +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A merge handle that supports logging change logs. + */ +public class HoodieMergeHandleWithChangeLog extends HoodieMergeHandle { + protected final HoodieCDCLogger cdcLogger; + + public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + Iterator> recordItr, String partitionPath, String fileId, + TaskContextSupplier taskContextSupplier, Option keyGeneratorOpt) { + super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt); + this.cdcLogger = new HoodieCDCLogger( + instantTime, + config, + hoodieTable.getMetaClient().getTableConfig(), + tableSchema, + createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX), + IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config)); + } + + /** + * Called by compactor code path. + */ + public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + Map> keyToNewRecords, String partitionPath, String fileId, + HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier, Option keyGeneratorOpt) { + super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt); + this.cdcLogger = new HoodieCDCLogger( + instantTime, + config, + hoodieTable.getMetaClient().getTableConfig(), + tableSchema, + createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX), + IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config)); + } + + protected boolean writeUpdateRecord(HoodieRecord hoodieRecord, GenericRecord oldRecord, Option indexedRecord) { + final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, indexedRecord); + if (result) { + cdcLogger.put(hoodieRecord, oldRecord, indexedRecord); + } + return result; + } + + protected void writeInsertRecord(HoodieRecord hoodieRecord, Option insertRecord) { + super.writeInsertRecord(hoodieRecord, insertRecord); + cdcLogger.put(hoodieRecord, null, insertRecord); + } + + @Override + public List close() { + List writeStatuses = super.close(); + // if there are cdc data written, set the CDC-related information. + Option cdcResult = + HoodieCDCLogger.writeCDCDataIfNeeded(cdcLogger, recordsWritten, insertRecordsWritten); + HoodieCDCLogger.setCDCStatIfNeeded(writeStatuses.get(0).getStat(), cdcResult, partitionPath, fs); + return writeStatuses; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java index 8240de66d59d4..7dce31a4c349b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java @@ -18,9 +18,6 @@ package org.apache.hudi.io; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; - import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieBaseFile; @@ -33,6 +30,8 @@ import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.table.HoodieTable; +import org.apache.avro.generic.GenericRecord; + import javax.annotation.concurrent.NotThreadSafe; import java.io.IOException; @@ -94,18 +93,13 @@ public void write(GenericRecord oldRecord) { throw new HoodieUpsertException("Insert/Update not in sorted order"); } try { - Option insertRecord; if (useWriterSchemaForCompaction) { - insertRecord = hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()); + writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps())); } else { - insertRecord = hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()); + writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps())); } - writeRecord(hoodieRecord, insertRecord); insertRecordsWritten++; writtenRecordKeys.add(keyToPreWrite); - if (cdcEnabled) { - cdcLogger.put(hoodieRecord, null, insertRecord); - } } catch (IOException e) { throw new HoodieUpsertException("Failed to write records", e); } @@ -122,17 +116,12 @@ public List close() { String key = newRecordKeysSorted.poll(); HoodieRecord hoodieRecord = keyToNewRecords.get(key); if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) { - Option insertRecord; if (useWriterSchemaForCompaction) { - insertRecord = hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()); + writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps())); } else { - insertRecord = hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()); + writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps())); } - writeRecord(hoodieRecord, insertRecord); insertRecordsWritten++; - if (cdcEnabled) { - cdcLogger.put(hoodieRecord, null, insertRecord); - } } } catch (IOException e) { throw new HoodieUpsertException("Failed to close UpdateHandle", e); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java new file mode 100644 index 0000000000000..8d317b709a4f2 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.keygen.BaseKeyGenerator; +import org.apache.hudi.table.HoodieTable; + +import org.apache.avro.generic.IndexedRecord; + +import java.util.Iterator; +import java.util.Map; + +/** + * A sorted merge handle that supports logging change logs. + */ +public class HoodieSortedMergeHandleWithChangeLog extends HoodieMergeHandleWithChangeLog { + public HoodieSortedMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + Iterator> recordItr, String partitionPath, String fileId, + TaskContextSupplier taskContextSupplier, Option keyGeneratorOpt) { + super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt); + } + + /** + * Called by compactor code path. + */ + public HoodieSortedMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + Map> keyToNewRecords, String partitionPath, String fileId, + HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier, Option keyGeneratorOpt) { + super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt); + } + + protected boolean writeRecord(HoodieRecord hoodieRecord, Option insertRecord) { + final boolean result = super.writeRecord(hoodieRecord, insertRecord); + this.cdcLogger.put(hoodieRecord, null, insertRecord); + return result; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 3864c31ce9aa1..abf5c0face155 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.storage.HoodieFileWriter; import org.apache.hudi.io.storage.HoodieFileWriterFactory; @@ -309,6 +310,16 @@ protected HoodieLogFormat.Writer createLogWriter( .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); } + protected HoodieLogFormat.Writer createLogWriter(String baseCommitTime, String fileSuffix) { + try { + return createLogWriter(Option.empty(),baseCommitTime, fileSuffix); + } catch (IOException e) { + throw new HoodieException("Creating logger writer with fileId: " + fileId + ", " + + "base commit time: " + baseCommitTime + ", " + + "file suffix: " + fileSuffix + " error"); + } + } + private static class IgnoreRecord implements GenericRecord { @Override diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 5ed53f7ae0472..e5d90b5e9d37d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -369,22 +369,12 @@ private Map>> getFilesToCleanKeepingLa deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true)); } }); - if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { - // If merge on read, then clean the log files for the commits as well - Predicate notCDCLogFile = - hoodieLogFile -> !hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX); - deletePaths.addAll( - aSlice.getLogFiles().filter(notCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) - .collect(Collectors.toList())); - } - if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) { - // The cdc log files will be written out in cdc scenario, no matter the table type is mor or cow. - // Here we need to clean uo these cdc log files. - Predicate isCDCLogFile = - hoodieLogFile -> hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX); - deletePaths.addAll( - aSlice.getLogFiles().filter(isCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) - .collect(Collectors.toList())); + if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ + || hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) { + // 1. If merge on read, then clean the log files for the commits as well; + // 2. If change log capture is enabled, clean the log files no matter the table type is mor or cow. + deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) + .collect(Collectors.toList())); } } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 2c8a3c4e49c3a..7d2be6cb9323e 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -46,7 +46,7 @@ import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.io.HoodieCreateHandle; import org.apache.hudi.io.HoodieMergeHandle; -import org.apache.hudi.io.HoodieSortedMergeHandle; +import org.apache.hudi.io.HoodieMergeHandleFactory; import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory; @@ -401,13 +401,8 @@ protected HoodieMergeHandle getUpdateHandle(String instantTime, String partition + "columns are disabled. Please choose the right key generator if you wish to disable meta fields.", e); } } - if (requireSortedRecords()) { - return new HoodieSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId, - dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt); - } else { - return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId, - dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt); - } + return HoodieMergeHandleFactory.create(config, instantTime, this, keyToNewRecords, partitionPath, fileId, + dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt); } @Override diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index 88921334980ed..8e72682725c3b 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -45,7 +45,7 @@ import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.io.HoodieCreateHandle; import org.apache.hudi.io.HoodieMergeHandle; -import org.apache.hudi.io.HoodieSortedMergeHandle; +import org.apache.hudi.io.HoodieMergeHandleFactory; import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata; @@ -299,13 +299,8 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) { - if (requireSortedRecords()) { - return new HoodieSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId, - dataFileToBeMerged, taskContextSupplier, Option.empty()); - } else { - return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId, - dataFileToBeMerged, taskContextSupplier, Option.empty()); - } + return HoodieMergeHandleFactory.create(config, instantTime, this, keyToNewRecords, partitionPath, fileId, + dataFileToBeMerged, taskContextSupplier, Option.empty()); } @Override diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java index 22c90eb8bb445..7762fd5ea3f44 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java @@ -21,7 +21,6 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -41,8 +40,7 @@ import org.apache.hudi.execution.JavaLazyInsertIterable; import org.apache.hudi.io.CreateHandleFactory; import org.apache.hudi.io.HoodieMergeHandle; -import org.apache.hudi.io.HoodieSortedMergeHandle; -import org.apache.hudi.io.HoodieConcatHandle; +import org.apache.hudi.io.HoodieMergeHandleFactory; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadProfile; import org.apache.hudi.table.WorkloadStat; @@ -291,20 +289,8 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle> recordItr) { - if (table.requireSortedRecords()) { - return new HoodieSortedMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, Option.empty()); - } else if (!WriteOperationType.isChangingRecords(operationType) && config.allowDuplicateInserts()) { - return new HoodieConcatHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, Option.empty()); - } else { - return new HoodieMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, Option.empty()); - } - } - - protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, - Map> keyToNewRecords, - HoodieBaseFile dataFileToBeMerged) { - return new HoodieMergeHandle<>(config, instantTime, table, keyToNewRecords, - partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, Option.empty()); + return HoodieMergeHandleFactory.create(operationType, config, instantTime, table, recordItr, partitionPath, fileId, + taskContextSupplier, Option.empty()); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index a88ca65c35a94..115aea06f2a2f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -48,7 +48,7 @@ import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.io.HoodieCreateHandle; import org.apache.hudi.io.HoodieMergeHandle; -import org.apache.hudi.io.HoodieSortedMergeHandle; +import org.apache.hudi.io.HoodieMergeHandleFactory; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.metadata.MetadataPartitionType; @@ -250,13 +250,8 @@ protected HoodieMergeHandle getUpdateHandle(String instantTime, String partition + "columns are disabled. Please choose the right key generator if you wish to disable meta fields.", e); } } - if (requireSortedRecords()) { - return new HoodieSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId, - dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt); - } else { - return new HoodieMergeHandle(config, instantTime, this, keyToNewRecords, partitionPath, fileId, - dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt); - } + return HoodieMergeHandleFactory.create(config, instantTime, this, keyToNewRecords, partitionPath, fileId, + dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index af72c14efc99c..8c7d9e41ea5f5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -45,12 +45,10 @@ import org.apache.hudi.execution.SparkLazyInsertIterable; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.io.CreateHandleFactory; -import org.apache.hudi.io.HoodieConcatHandle; import org.apache.hudi.io.HoodieMergeHandle; -import org.apache.hudi.io.HoodieSortedMergeHandle; +import org.apache.hudi.io.HoodieMergeHandleFactory; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; -import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadProfile; import org.apache.hudi.table.WorkloadStat; @@ -385,14 +383,8 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle> recordItr) { - if (table.requireSortedRecords()) { - return new HoodieSortedMergeHandle<>(config, instantTime, (HoodieSparkTable) table, recordItr, partitionPath, fileId, taskContextSupplier, - keyGeneratorOpt); - } else if (!WriteOperationType.isChangingRecords(operationType) && config.allowDuplicateInserts()) { - return new HoodieConcatHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt); - } else { - return new HoodieMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt); - } + return HoodieMergeHandleFactory.create(operationType, config, instantTime, table, recordItr, partitionPath, fileId, + taskContextSupplier, keyGeneratorOpt); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java index edd63f3569414..90540bc05a69b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java @@ -20,6 +20,9 @@ import org.apache.hudi.exception.HoodieNotSupportedException; +/** + * Enumeration of change log operation. + */ public enum HoodieCDCOperation { INSERT("i"), UPDATE("u"), diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java index b1e92dd2737aa..35a232206f3f9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java @@ -20,6 +20,18 @@ import org.apache.hudi.exception.HoodieNotSupportedException; +/** + * Change log capture supplemental logging mode. The supplemental log is used for + * accelerating the generation of change log details. + * + *

Three modes are supported:

+ * + *
    + *
  • OP_KEY: record keys, the reader needs to figure out the update before image and after image;
  • + *
  • OP_KEY: before images, the reader needs to figure out the update after images;
  • + *
  • OP_KEY: before and after images, the reader can generate the details directly from the log.
  • + *
+ */ public enum HoodieCDCSupplementalLoggingMode { OP_KEY("cdc_op_key"), WITH_BEFORE("cdc_data_before"), diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java index a741181d4d516..042e95cfd66bf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java @@ -18,17 +18,20 @@ package org.apache.hudi.common.table.cdc; +import org.apache.hudi.avro.AvroSchemaUtils; +import org.apache.hudi.exception.HoodieException; + import org.apache.avro.JsonProperties; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; -import org.apache.hudi.avro.AvroSchemaUtils; -import org.apache.hudi.exception.HoodieException; - import java.util.Arrays; import java.util.List; +/** + * Utilities for change log capture. + */ public class HoodieCDCUtils { public static final String CDC_LOGFILE_SUFFIX = "-cdc"; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java index 1c0f6e4b6c3be..cc5663262c1ce 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java @@ -29,6 +29,9 @@ import java.util.List; import java.util.Map; +/** + * Change log supplemental log data block. + */ public class HoodieCDCDataBlock extends HoodieAvroDataBlock { public HoodieCDCDataBlock( @@ -53,5 +56,4 @@ public HoodieCDCDataBlock(List records, public HoodieLogBlockType getBlockType() { return HoodieLogBlockType.CDC_DATA_BLOCK; } - }