diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 44a0bcc33e719..3cb149427aa52 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -41,7 +41,6 @@ import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.log.AppendResult; -import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.table.log.block.HoodieDeleteBlock; @@ -471,23 +470,6 @@ public List writeStatuses() { return statuses; } - private Writer createLogWriter(Option fileSlice, String baseCommitTime) - throws IOException { - Option latestLogFile = fileSlice.get().getLatestLogFile(); - - return HoodieLogFormat.newWriterBuilder() - .onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partitionPath)) - .withFileId(fileId) - .overBaseCommit(baseCommitTime) - .withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION)) - .withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L)) - .withSizeThreshold(config.getLogFileMaxSize()) - .withFs(fs) - .withRolloverLogWriteToken(writeToken) - .withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken)) - .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); - } - /** * Whether there is need to update the record location. */ diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java new file mode 100644 index 0000000000000..c93489d890966 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.cdc.HoodieCDCOperation; +import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode; +import org.apache.hudi.common.table.cdc.HoodieCDCUtils; +import org.apache.hudi.common.table.log.AppendResult; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; +import org.apache.hudi.common.util.DefaultSizeEstimator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieUpsertException; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * This class encapsulates all the cdc-writing functions. + */ +public class HoodieCDCLogger implements Closeable { + + private final String commitTime; + + private final String keyField; + + private final Schema dataSchema; + + private final boolean populateMetaFields; + + // writer for cdc data + private final HoodieLogFormat.Writer cdcWriter; + + private final boolean cdcEnabled; + + private final HoodieCDCSupplementalLoggingMode cdcSupplementalLoggingMode; + + private final Schema cdcSchema; + + private final String cdcSchemaString; + + // the cdc data + private final Map cdcData; + + public HoodieCDCLogger( + String commitTime, + HoodieWriteConfig config, + HoodieTableConfig tableConfig, + Schema schema, + HoodieLogFormat.Writer cdcWriter, + long maxInMemorySizeInBytes) { + try { + this.commitTime = commitTime; + this.dataSchema = HoodieAvroUtils.removeMetadataFields(schema); + this.populateMetaFields = config.populateMetaFields(); + this.keyField = populateMetaFields ? HoodieRecord.RECORD_KEY_METADATA_FIELD + : tableConfig.getRecordKeyFieldProp(); + this.cdcWriter = cdcWriter; + + this.cdcEnabled = config.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED); + this.cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse( + config.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE)); + + if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) { + this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA; + this.cdcSchemaString = HoodieCDCUtils.CDC_SCHEMA_STRING; + } else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) { + this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_RECORDKEY_BEFORE; + this.cdcSchemaString = HoodieCDCUtils.CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING; + } else { + this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_AND_RECORDKEY; + this.cdcSchemaString = HoodieCDCUtils.CDC_SCHEMA_OP_AND_RECORDKEY_STRING; + } + + this.cdcData = new ExternalSpillableMap<>( + maxInMemorySizeInBytes, + config.getSpillableMapBasePath(), + new DefaultSizeEstimator<>(), + new DefaultSizeEstimator<>(), + config.getCommonConfig().getSpillableDiskMapType(), + config.getCommonConfig().isBitCaskDiskMapCompressionEnabled() + ); + } catch (IOException e) { + throw new HoodieUpsertException("Failed to initialize HoodieCDCLogger", e); + } + } + + public void put(HoodieRecord hoodieRecord, + GenericRecord oldRecord, + Option newRecord) { + if (cdcEnabled) { + String recordKey = hoodieRecord.getRecordKey(); + GenericData.Record cdcRecord; + if (newRecord.isPresent()) { + GenericRecord record = (GenericRecord) newRecord.get(); + if (oldRecord == null) { + // inserted cdc record + cdcRecord = createCDCRecord(HoodieCDCOperation.INSERT, recordKey, + null, record); + } else { + // updated cdc record + cdcRecord = createCDCRecord(HoodieCDCOperation.UPDATE, recordKey, + oldRecord, record); + } + } else { + // deleted cdc record + cdcRecord = createCDCRecord(HoodieCDCOperation.DELETE, recordKey, + oldRecord, null); + } + cdcData.put(recordKey, new HoodieAvroPayload(Option.of(cdcRecord))); + } + } + + private GenericData.Record createCDCRecord(HoodieCDCOperation operation, + String recordKey, + GenericRecord oldRecord, + GenericRecord newRecord) { + GenericData.Record record; + if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) { + record = HoodieCDCUtils.cdcRecord(operation.getValue(), commitTime, + removeCommitMetadata(oldRecord), newRecord); + } else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) { + record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey, + removeCommitMetadata(oldRecord)); + } else { + record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey); + } + return record; + } + + private GenericRecord removeCommitMetadata(GenericRecord record) { + return HoodieAvroUtils.rewriteRecordWithNewSchema(record, dataSchema, new HashMap<>()); + } + + public boolean isEmpty() { + return !this.cdcEnabled || this.cdcData.isEmpty(); + } + + public Option writeCDCData() { + if (isEmpty()) { + return Option.empty(); + } + + try { + List records = cdcData.values().stream() + .map(record -> { + try { + return record.getInsertValue(cdcSchema).get(); + } catch (IOException e) { + throw new HoodieIOException("Failed to get cdc record", e); + } + }).collect(Collectors.toList()); + + Map header = new HashMap<>(); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, cdcSchemaString); + + HoodieLogBlock block = new HoodieCDCDataBlock(records, header, keyField); + AppendResult result = cdcWriter.appendBlocks(Collections.singletonList(block)); + + // call close to trigger the data flush. + this.close(); + + return Option.of(result); + } catch (Exception e) { + throw new HoodieException("Failed to write the cdc data to " + cdcWriter.getLogFile().getPath(), e); + } + } + + @Override + public void close() { + try { + if (cdcWriter != null) { + cdcWriter.close(); + } + } catch (IOException e) { + throw new HoodieIOException("Failed to close HoodieCDCLogger", e); + } finally { + cdcData.clear(); + } + } + + public static Option writeCDCDataIfNeeded(HoodieCDCLogger cdcLogger, + long recordsWritten, + long insertRecordsWritten) { + if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) { + // the following cases where we do not need to write out the cdc file: + // case 1: all the data from the previous file slice are deleted. and no new data is inserted; + // case 2: all the data are new-coming, + return Option.empty(); + } + return cdcLogger.writeCDCData(); + } + + public static void setCDCStatIfNeeded(HoodieWriteStat stat, + Option cdcResult, + String partitionPath, + FileSystem fs) { + try { + if (cdcResult.isPresent()) { + Path cdcLogFile = cdcResult.get().logFile().getPath(); + String cdcFileName = cdcLogFile.getName(); + String cdcPath = StringUtils.isNullOrEmpty(partitionPath) ? cdcFileName : partitionPath + "/" + cdcFileName; + long cdcFileSizeInBytes = FSUtils.getFileSize(fs, cdcLogFile); + stat.setCdcPath(cdcPath); + stat.setCdcWriteBytes(cdcFileSizeInBytes); + } + } catch (IOException e) { + throw new HoodieUpsertException("Failed to set cdc write stat", e); + } + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 82c6de576149f..da6b1c6071b89 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -18,6 +18,8 @@ package org.apache.hudi.io; +import org.apache.hadoop.fs.Path; + import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; @@ -31,6 +33,9 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.cdc.HoodieCDCUtils; +import org.apache.hudi.common.table.log.AppendResult; import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.HoodieRecordSizeEstimator; import org.apache.hudi.common.util.Option; @@ -50,7 +55,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.Path; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -102,6 +107,8 @@ public class HoodieMergeHandle extends H protected Map> keyToNewRecords; protected Set writtenRecordKeys; protected HoodieFileWriter fileWriter; + protected boolean cdcEnabled = false; + protected HoodieCDCLogger cdcLogger; private boolean preserveMetadata = false; protected Path newFilePath; @@ -203,6 +210,18 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo // Create the writer for writing the new version file fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writeSchemaWithMetaFields, taskContextSupplier); + + // init the cdc logger + this.cdcEnabled = config.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED); + if (cdcEnabled) { + this.cdcLogger = new HoodieCDCLogger( + instantTime, + config, + hoodieTable.getMetaClient().getTableConfig(), + tableSchema, + createLogWriter(Option.empty(), instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX), + IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config)); + } } catch (IOException io) { LOG.error("Error in update task at commit " + instantTime, io); writeStatus.setGlobalError(io); @@ -281,7 +300,11 @@ private boolean writeUpdateRecord(HoodieRecord hoodieRecord, GenericRecord ol return false; } } - return writeRecord(hoodieRecord, indexedRecord, isDelete); + boolean result = writeRecord(hoodieRecord, indexedRecord, isDelete); + if (result && cdcEnabled) { + cdcLogger.put(hoodieRecord, oldRecord, indexedRecord); + } + return result; } protected void writeInsertRecord(HoodieRecord hoodieRecord) throws IOException { @@ -292,6 +315,9 @@ protected void writeInsertRecord(HoodieRecord hoodieRecord) throws IOExceptio return; } if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()))) { + if (cdcEnabled) { + cdcLogger.put(hoodieRecord, null, insertRecord); + } insertRecordsWritten++; } } @@ -402,6 +428,8 @@ protected void writeIncomingRecords() throws IOException { @Override public List close() { try { + HoodieWriteStat stat = writeStatus.getStat(); + writeIncomingRecords(); if (keyToNewRecords instanceof ExternalSpillableMap) { @@ -416,9 +444,12 @@ public List close() { fileWriter = null; } - long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath); - HoodieWriteStat stat = writeStatus.getStat(); + // if there are cdc data written, set the CDC-related information. + Option cdcResult = + HoodieCDCLogger.writeCDCDataIfNeeded(cdcLogger, recordsWritten, insertRecordsWritten); + HoodieCDCLogger.setCDCStatIfNeeded(stat, cdcResult, partitionPath, fs); + long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath); stat.setTotalWriteBytes(fileSizeInBytes); stat.setFileSizeInBytes(fileSizeInBytes); stat.setNumWrites(recordsWritten); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java index 7dce31a4c349b..8240de66d59d4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java @@ -18,6 +18,9 @@ package org.apache.hudi.io; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieBaseFile; @@ -30,8 +33,6 @@ import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.table.HoodieTable; -import org.apache.avro.generic.GenericRecord; - import javax.annotation.concurrent.NotThreadSafe; import java.io.IOException; @@ -93,13 +94,18 @@ public void write(GenericRecord oldRecord) { throw new HoodieUpsertException("Insert/Update not in sorted order"); } try { + Option insertRecord; if (useWriterSchemaForCompaction) { - writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps())); + insertRecord = hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()); } else { - writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps())); + insertRecord = hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()); } + writeRecord(hoodieRecord, insertRecord); insertRecordsWritten++; writtenRecordKeys.add(keyToPreWrite); + if (cdcEnabled) { + cdcLogger.put(hoodieRecord, null, insertRecord); + } } catch (IOException e) { throw new HoodieUpsertException("Failed to write records", e); } @@ -116,12 +122,17 @@ public List close() { String key = newRecordKeysSorted.poll(); HoodieRecord hoodieRecord = keyToNewRecords.get(key); if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) { + Option insertRecord; if (useWriterSchemaForCompaction) { - writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps())); + insertRecord = hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()); } else { - writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps())); + insertRecord = hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()); } + writeRecord(hoodieRecord, insertRecord); insertRecordsWritten++; + if (cdcEnabled) { + cdcLogger.put(hoodieRecord, null, insertRecord); + } } } catch (IOException e) { throw new HoodieUpsertException("Failed to close UpdateHandle", e); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index b7fdbecfd56d1..3864c31ce9aa1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -22,9 +22,12 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; @@ -273,6 +276,39 @@ protected HoodieFileWriter createNewFileWriter(String instantTime, Path path, Ho return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, schema, taskContextSupplier); } + protected HoodieLogFormat.Writer createLogWriter( + Option fileSlice, String baseCommitTime) throws IOException { + return createLogWriter(fileSlice, baseCommitTime, ""); + } + + protected HoodieLogFormat.Writer createLogWriter( + Option fileSlice, String baseCommitTime, String fileSuffix) throws IOException { + int logVersion = HoodieLogFile.LOGFILE_BASE_VERSION; + long logFileSize = 0L; + String logWriteToken = writeToken + fileSuffix; + String rolloverLogWriteToken = writeToken + fileSuffix; + if (fileSlice.isPresent()) { + Option latestLogFileOpt = fileSlice.get().getLatestLogFile(); + if (latestLogFileOpt.isPresent()) { + HoodieLogFile latestLogFile = latestLogFileOpt.get(); + logVersion = latestLogFile.getLogVersion(); + logFileSize = latestLogFile.getFileSize(); + logWriteToken = FSUtils.getWriteTokenFromLogPath(latestLogFile.getPath()); + } + } + return HoodieLogFormat.newWriterBuilder() + .onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partitionPath)) + .withFileId(fileId) + .overBaseCommit(baseCommitTime) + .withLogVersion(logVersion) + .withFileSize(logFileSize) + .withSizeThreshold(config.getLogFileMaxSize()) + .withFs(fs) + .withRolloverLogWriteToken(rolloverLogWriteToken) + .withLogWriteToken(logWriteToken) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); + } + private static class IgnoreRecord implements GenericRecord { @Override diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 529fa914fcd6c..671a522cab42d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -29,9 +29,11 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.cdc.HoodieCDCUtils; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -61,6 +63,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -360,8 +363,20 @@ private Pair> getFilesToCleanKeepingLatestCommits(S }); if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { // If merge on read, then clean the log files for the commits as well - deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) - .collect(Collectors.toList())); + Predicate notCDCLogFile = + hoodieLogFile -> !hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX); + deletePaths.addAll( + aSlice.getLogFiles().filter(notCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) + .collect(Collectors.toList())); + } + if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) { + // The cdc log files will be written out in cdc scenario, no matter the table type is mor or cow. + // Here we need to clean uo these cdc log files. + Predicate isCDCLogFile = + hoodieLogFile -> hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX); + deletePaths.addAll( + aSlice.getLogFiles().filter(isCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) + .collect(Collectors.toList())); } } } @@ -427,8 +442,20 @@ private List getCleanFileInfoForSlice(FileSlice nextSlice) { } if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { // If merge on read, then clean the log files for the commits as well - cleanPaths.addAll(nextSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) - .collect(Collectors.toList())); + Predicate notCDCLogFile = + hoodieLogFile -> !hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX); + cleanPaths.addAll( + nextSlice.getLogFiles().filter(notCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) + .collect(Collectors.toList())); + } + if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) { + // The cdc log files will be written out in cdc scenario, no matter the table type is mor or cow. + // Here we need to clean uo these cdc log files. + Predicate isCDCLogFile = + hoodieLogFile -> hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX); + cleanPaths.addAll( + nextSlice.getLogFiles().filter(isCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) + .collect(Collectors.toList())); } return cleanPaths; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 474d60f8d5d80..d025fc44247bf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -79,7 +79,7 @@ public class FSUtils { private static final Logger LOG = LogManager.getLogger(FSUtils.class); // Log files are of this pattern - .b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1 private static final Pattern LOG_FILE_PATTERN = - Pattern.compile("\\.(.*)_(.*)\\.(.*)\\.([0-9]*)(_(([0-9]*)-([0-9]*)-([0-9]*)))?"); + Pattern.compile("\\.(.*)_(.*)\\.(.*)\\.([0-9]*)(_(([0-9]*)-([0-9]*)-([0-9]*)(-cdc)?))?"); private static final String LOG_FILE_PREFIX = "."; private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10; private static final long MIN_CLEAN_TO_KEEP = 10; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java index ba43526f92dc8..45f7ecf541f22 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java @@ -44,6 +44,11 @@ public class HoodieWriteStat implements Serializable { */ private String path; + /** + * Relative CDC file path that store the CDC data. + */ + private String cdcPath; + /** * The previous version of the file. (null if this is the first version. i.e insert) */ @@ -70,6 +75,11 @@ public class HoodieWriteStat implements Serializable { */ private long numInserts; + /** + * Total number of cdc bytes written. + */ + private long cdcWriteBytes; + /** * Total number of bytes written. */ @@ -195,10 +205,18 @@ public long getTotalWriteBytes() { return totalWriteBytes; } + public long getCdcWriteBytes() { + return cdcWriteBytes; + } + public void setTotalWriteBytes(long totalWriteBytes) { this.totalWriteBytes = totalWriteBytes; } + public void setCdcWriteBytes(long cdcWriteBytes) { + this.cdcWriteBytes = cdcWriteBytes; + } + public long getTotalWriteErrors() { return totalWriteErrors; } @@ -235,6 +253,15 @@ public String getPath() { return path; } + @Nullable + public String getCdcPath() { + return cdcPath; + } + + public void setCdcPath(String cdcPath) { + this.cdcPath = cdcPath; + } + public String getPartitionPath() { return partitionPath; } @@ -360,6 +387,7 @@ public String toString() { return "HoodieWriteStat{fileId='" + fileId + '\'' + ", path='" + path + '\'' + ", prevCommit='" + prevCommit + '\'' + ", numWrites=" + numWrites + ", numDeletes=" + numDeletes + ", numUpdateWrites=" + numUpdateWrites + ", totalWriteBytes=" + totalWriteBytes + ", totalWriteErrors=" + totalWriteErrors + ", tempPath='" + tempPath + + '\'' + ", cdcPath='" + cdcPath + ", cdcWriteBytes=" + cdcWriteBytes + '\'' + ", partitionPath='" + partitionPath + '\'' + ", totalLogRecords=" + totalLogRecords + ", totalLogFilesCompacted=" + totalLogFilesCompacted + ", totalLogSizeCompacted=" + totalLogSizeCompacted + ", totalUpdatedRecordsCompacted=" + totalUpdatedRecordsCompacted + ", totalLogBlocks=" + totalLogBlocks diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 6b64ec4897b7c..9ea61f1d36693 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTimelineTimeZone; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode; import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.util.BinaryUtil; @@ -127,6 +128,22 @@ public class HoodieTableConfig extends HoodieConfig { .withDocumentation("Columns used to uniquely identify the table. Concatenated values of these fields are used as " + " the record key component of HoodieKey."); + public static final ConfigProperty CDC_ENABLED = ConfigProperty + .key("hoodie.table.cdc.enabled") + .defaultValue(false) + .withDocumentation("When enable, persist the change data if necessary, and can be queried as a CDC query mode."); + + public static final ConfigProperty CDC_SUPPLEMENTAL_LOGGING_MODE = ConfigProperty + .key("hoodie.table.cdc.supplemental.logging.mode") + .defaultValue(HoodieCDCSupplementalLoggingMode.OP_KEY.getValue()) + .withValidValues( + HoodieCDCSupplementalLoggingMode.OP_KEY.getValue(), + HoodieCDCSupplementalLoggingMode.WITH_BEFORE.getValue(), + HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER.getValue()) + .withDocumentation("When 'cdc_op_key' persist the 'op' and the record key only," + + " when 'cdc_data_before' persist the additional 'before' image ," + + " and when 'cdc_data_before_after', persist the 'before' and 'after' at the same time."); + public static final ConfigProperty CREATE_SCHEMA = ConfigProperty .key("hoodie.table.create.schema") .noDefaultValue() @@ -589,6 +606,14 @@ public String getRecordKeyFieldProp() { return getStringOrDefault(RECORDKEY_FIELDS, HoodieRecord.RECORD_KEY_METADATA_FIELD); } + public boolean isCDCEnabled() { + return getBooleanOrDefault(CDC_ENABLED); + } + + public String cdcSupplementalLoggingMode() { + return getStringOrDefault(CDC_SUPPLEMENTAL_LOGGING_MODE); + } + public String getKeyGeneratorClassName() { return getString(KEY_GENERATOR_CLASS_NAME); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 16dd373486f61..610a0f4185737 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -730,6 +730,8 @@ public static class PropertyBuilder { private String baseFileFormat; private String preCombineField; private String partitionFields; + private Boolean cdcEnabled; + private String cdcSupplementalLoggingMode; private String bootstrapIndexClass; private String bootstrapBasePath; private Boolean bootstrapIndexEnable; @@ -816,6 +818,16 @@ public PropertyBuilder setPartitionFields(String partitionFields) { return this; } + public PropertyBuilder setCDCEnabled(boolean cdcEnabled) { + this.cdcEnabled = cdcEnabled; + return this; + } + + public PropertyBuilder setCDCSupplementalLoggingMode(String cdcSupplementalLoggingMode) { + this.cdcSupplementalLoggingMode = cdcSupplementalLoggingMode; + return this; + } + public PropertyBuilder setBootstrapIndexClass(String bootstrapIndexClass) { this.bootstrapIndexClass = bootstrapIndexClass; return this; @@ -955,6 +967,12 @@ public PropertyBuilder fromProperties(Properties properties) { if (hoodieConfig.contains(HoodieTableConfig.RECORDKEY_FIELDS)) { setRecordKeyFields(hoodieConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS)); } + if (hoodieConfig.contains(HoodieTableConfig.CDC_ENABLED)) { + setCDCEnabled(hoodieConfig.getBoolean(HoodieTableConfig.CDC_ENABLED)); + } + if (hoodieConfig.contains(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE)) { + setCDCSupplementalLoggingMode(hoodieConfig.getString(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE)); + } if (hoodieConfig.contains(HoodieTableConfig.CREATE_SCHEMA)) { setTableCreateSchema(hoodieConfig.getString(HoodieTableConfig.CREATE_SCHEMA)); } @@ -1045,6 +1063,12 @@ public Properties build() { if (null != recordKeyFields) { tableConfig.setValue(HoodieTableConfig.RECORDKEY_FIELDS, recordKeyFields); } + if (null != cdcEnabled) { + tableConfig.setValue(HoodieTableConfig.CDC_ENABLED, Boolean.toString(cdcEnabled)); + if (cdcEnabled && null != cdcSupplementalLoggingMode) { + tableConfig.setValue(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE, cdcSupplementalLoggingMode); + } + } if (null != populateMetaFields) { tableConfig.setValue(HoodieTableConfig.POPULATE_META_FIELDS, Boolean.toString(populateMetaFields)); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java new file mode 100644 index 0000000000000..edd63f3569414 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.cdc; + +import org.apache.hudi.exception.HoodieNotSupportedException; + +public enum HoodieCDCOperation { + INSERT("i"), + UPDATE("u"), + DELETE("d"); + + private final String value; + + HoodieCDCOperation(String value) { + this.value = value; + } + + public String getValue() { + return this.value; + } + + public static HoodieCDCOperation parse(String value) { + switch (value) { + case "i": + return INSERT; + case "u": + return UPDATE; + case "d": + return DELETE; + default: + throw new HoodieNotSupportedException("Unsupported value: " + value); + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java new file mode 100644 index 0000000000000..b1e92dd2737aa --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.cdc; + +import org.apache.hudi.exception.HoodieNotSupportedException; + +public enum HoodieCDCSupplementalLoggingMode { + OP_KEY("cdc_op_key"), + WITH_BEFORE("cdc_data_before"), + WITH_BEFORE_AFTER("cdc_data_before_after"); + + private final String value; + + HoodieCDCSupplementalLoggingMode(String value) { + this.value = value; + } + + public String getValue() { + return this.value; + } + + public static HoodieCDCSupplementalLoggingMode parse(String value) { + switch (value) { + case "cdc_op_key": + return OP_KEY; + case "cdc_data_before": + return WITH_BEFORE; + case "cdc_data_before_after": + return WITH_BEFORE_AFTER; + default: + throw new HoodieNotSupportedException("Unsupported value: " + value); + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java new file mode 100644 index 0000000000000..3cf8315a5434b --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.cdc; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; + +import org.apache.hudi.exception.HoodieException; + +public class HoodieCDCUtils { + + public static final String CDC_LOGFILE_SUFFIX = "-cdc"; + + /* the `op` column represents how a record is changed. */ + public static final String CDC_OPERATION_TYPE = "op"; + + /* the `ts_ms` column represents when a record is changed. */ + public static final String CDC_COMMIT_TIMESTAMP = "ts_ms"; + + /* the pre-image before one record is changed */ + public static final String CDC_BEFORE_IMAGE = "before"; + + /* the post-image after one record is changed */ + public static final String CDC_AFTER_IMAGE = "after"; + + /* the key of the changed record */ + public static final String CDC_RECORD_KEY = "record_key"; + + public static final String[] CDC_COLUMNS = new String[] { + CDC_OPERATION_TYPE, + CDC_COMMIT_TIMESTAMP, + CDC_BEFORE_IMAGE, + CDC_AFTER_IMAGE + }; + + /** + * This is the standard CDC output format. + * Also, this is the schema of cdc log file in the case `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'. + */ + public static final String CDC_SCHEMA_STRING = "{\"type\":\"record\",\"name\":\"Record\"," + + "\"fields\":[" + + "{\"name\":\"op\",\"type\":[\"string\",\"null\"]}," + + "{\"name\":\"ts_ms\",\"type\":[\"string\",\"null\"]}," + + "{\"name\":\"before\",\"type\":[\"string\",\"null\"]}," + + "{\"name\":\"after\",\"type\":[\"string\",\"null\"]}" + + "]}"; + + public static final Schema CDC_SCHEMA = new Schema.Parser().parse(CDC_SCHEMA_STRING); + + /** + * The schema of cdc log file in the case `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before'. + */ + public static final String CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING = "{\"type\":\"record\",\"name\":\"Record\"," + + "\"fields\":[" + + "{\"name\":\"op\",\"type\":[\"string\",\"null\"]}," + + "{\"name\":\"record_key\",\"type\":[\"string\",\"null\"]}," + + "{\"name\":\"before\",\"type\":[\"string\",\"null\"]}" + + "]}"; + + public static final Schema CDC_SCHEMA_OP_RECORDKEY_BEFORE = + new Schema.Parser().parse(CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING); + + /** + * The schema of cdc log file in the case `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_op_key'. + */ + public static final String CDC_SCHEMA_OP_AND_RECORDKEY_STRING = "{\"type\":\"record\",\"name\":\"Record\"," + + "\"fields\":[" + + "{\"name\":\"op\",\"type\":[\"string\",\"null\"]}," + + "{\"name\":\"record_key\",\"type\":[\"string\",\"null\"]}" + + "]}"; + + public static final Schema CDC_SCHEMA_OP_AND_RECORDKEY = + new Schema.Parser().parse(CDC_SCHEMA_OP_AND_RECORDKEY_STRING); + + public static final Schema schemaBySupplementalLoggingMode(HoodieCDCSupplementalLoggingMode supplementalLoggingMode) { + switch (supplementalLoggingMode) { + case WITH_BEFORE_AFTER: + return CDC_SCHEMA; + case WITH_BEFORE: + return CDC_SCHEMA_OP_RECORDKEY_BEFORE; + case OP_KEY: + return CDC_SCHEMA_OP_AND_RECORDKEY; + default: + throw new HoodieException("not support this supplemental logging mode: " + supplementalLoggingMode); + } + } + + /** + * Build the cdc record which has all the cdc fields when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'. + */ + public static GenericData.Record cdcRecord( + String op, String commitTime, GenericRecord before, GenericRecord after) { + String beforeJsonStr = recordToJson(before); + String afterJsonStr = recordToJson(after); + return cdcRecord(op, commitTime, beforeJsonStr, afterJsonStr); + } + + public static GenericData.Record cdcRecord( + String op, String commitTime, String before, String after) { + GenericData.Record record = new GenericData.Record(CDC_SCHEMA); + record.put(CDC_OPERATION_TYPE, op); + record.put(CDC_COMMIT_TIMESTAMP, commitTime); + record.put(CDC_BEFORE_IMAGE, before); + record.put(CDC_AFTER_IMAGE, after); + return record; + } + + /** + * Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before'. + */ + public static GenericData.Record cdcRecord(String op, String recordKey, GenericRecord before) { + GenericData.Record record = new GenericData.Record(CDC_SCHEMA_OP_RECORDKEY_BEFORE); + record.put(CDC_OPERATION_TYPE, op); + record.put(CDC_RECORD_KEY, recordKey); + String beforeJsonStr = recordToJson(before); + record.put(CDC_BEFORE_IMAGE, beforeJsonStr); + return record; + } + + /** + * Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_op_key'. + */ + public static GenericData.Record cdcRecord(String op, String recordKey) { + GenericData.Record record = new GenericData.Record(CDC_SCHEMA_OP_AND_RECORDKEY); + record.put(CDC_OPERATION_TYPE, op); + record.put(CDC_RECORD_KEY, recordKey); + return record; + } + + public static String recordToJson(GenericRecord record) { + return GenericData.get().toString(record); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index cb16c8b141298..c784684cc029c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; +import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock; import org.apache.hudi.common.table.log.block.HoodieCommandBlock; import org.apache.hudi.common.table.log.block.HoodieCorruptBlock; import org.apache.hudi.common.table.log.block.HoodieDeleteBlock; @@ -233,6 +234,9 @@ private HoodieLogBlock readBlock() throws IOException { case COMMAND_BLOCK: return new HoodieCommandBlock(content, inputStream, readBlockLazily, Option.of(logBlockContentLoc), header, footer); + case CDC_DATA_BLOCK: + return new HoodieCDCDataBlock(inputStream, content, readBlockLazily, logBlockContentLoc, readerSchema, header, keyField); + default: throw new HoodieNotSupportedException("Unsupported Block " + blockType); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index 679a0e6f7e312..d7e725544aa65 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.cdc.HoodieCDCUtils; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.HoodieRecordSizeEstimator; @@ -47,6 +48,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath; import static org.apache.hudi.common.util.ValidationUtils.checkState; @@ -236,7 +238,9 @@ public Builder withBasePath(String basePath) { @Override public Builder withLogFilePaths(List logFilePaths) { - this.logFilePaths = logFilePaths; + this.logFilePaths = logFilePaths.stream() + .filter(p -> !p.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX)) + .collect(Collectors.toList()); return this; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java new file mode 100644 index 0000000000000..1c0f6e4b6c3be --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.log.block; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; + +import org.apache.hadoop.fs.FSDataInputStream; + +import org.apache.hudi.common.util.Option; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class HoodieCDCDataBlock extends HoodieAvroDataBlock { + + public HoodieCDCDataBlock( + FSDataInputStream inputStream, + Option content, + boolean readBlockLazily, + HoodieLogBlockContentLocation logBlockContentLocation, + Schema readerSchema, + Map header, + String keyField) { + super(inputStream, content, readBlockLazily, logBlockContentLocation, + Option.of(readerSchema), header, new HashMap<>(), keyField, null); + } + + public HoodieCDCDataBlock(List records, + Map header, + String keyField) { + super(records, header, keyField); + } + + @Override + public HoodieLogBlockType getBlockType() { + return HoodieLogBlockType.CDC_DATA_BLOCK; + } + +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java index 71336be883781..1718e7dd02457 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java @@ -122,7 +122,8 @@ public enum HoodieLogBlockType { CORRUPT_BLOCK(":corrupted"), AVRO_DATA_BLOCK("avro"), HFILE_DATA_BLOCK("hfile"), - PARQUET_DATA_BLOCK("parquet"); + PARQUET_DATA_BLOCK("parquet"), + CDC_DATA_BLOCK("cdc"); private static final Map ID_TO_ENUM_MAP = TypeUtils.getValueToEnumMap(HoodieLogBlockType.class, e -> e.id); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 19634700fb927..1fb872f6835e1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -333,6 +333,10 @@ public static List convertMetadataToFilesPartitionRecords(HoodieCo // monotonically increasing (ie file-size never goes down, unless deleted) map.merge(fileName, stat.getFileSizeInBytes(), Math::max); + String cdcPath = stat.getCdcPath(); + if (cdcPath != null) { + map.put(cdcPath, stat.getCdcWriteBytes()); + } return map; }, CollectionUtils::combine); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 4fa53bb41f9f8..c037c79dd82f5 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -18,6 +18,17 @@ package org.apache.hudi.common.functional; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.compress.Compression; + import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.DeleteRecord; @@ -26,6 +37,7 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.cdc.HoodieCDCUtils; import org.apache.hudi.common.table.log.AppendResult; import org.apache.hudi.common.table.log.HoodieLogFileReader; import org.apache.hudi.common.table.log.HoodieLogFormat; @@ -33,6 +45,7 @@ import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; +import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock; import org.apache.hudi.common.table.log.block.HoodieCommandBlock; import org.apache.hudi.common.table.log.block.HoodieDataBlock; import org.apache.hudi.common.table.log.block.HoodieDeleteBlock; @@ -53,17 +66,9 @@ import org.apache.hudi.exception.CorruptedLogFileException; import org.apache.hudi.exception.HoodieIOException; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.util.counters.BenchmarkCounter; + import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -79,6 +84,7 @@ import java.io.UncheckedIOException; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -94,6 +100,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.params.provider.Arguments.arguments; @@ -544,6 +551,60 @@ public void testBasicAppendAndRead(HoodieLogBlockType dataBlockType) throws IOEx reader.close(); } + @Test + public void testCDCBlock() throws IOException, InterruptedException { + Writer writer = HoodieLogFormat.newWriterBuilder() + .onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION) + .withFileId("test-fileid1") + .overBaseCommit("100") + .withFs(fs) + .build(); + + GenericRecord record1 = HoodieCDCUtils.cdcRecord("i", "100", + null, "{\"uuid\": 1, \"name\": \"apple\"}, \"ts\": 1100}"); + GenericRecord record2 = HoodieCDCUtils.cdcRecord("u", "100", + "{\"uuid\": 2, \"name\": \"banana\"}, \"ts\": 1000}", + "{\"uuid\": 2, \"name\": \"blueberry\"}, \"ts\": 1100}"); + GenericRecord record3 = HoodieCDCUtils.cdcRecord("d", "100", + "{\"uuid\": 3, \"name\": \"cherry\"}, \"ts\": 1000}", null); + List records = new ArrayList<>(Arrays.asList(record1, record2, record3)); + Map header = new HashMap<>(); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, HoodieCDCUtils.CDC_SCHEMA_STRING); + HoodieDataBlock dataBlock = getDataBlock(HoodieLogBlockType.CDC_DATA_BLOCK, records, header); + writer.appendBlock(dataBlock); + writer.close(); + + Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), HoodieCDCUtils.CDC_SCHEMA); + assertTrue(reader.hasNext()); + HoodieLogBlock block = reader.next(); + HoodieDataBlock dataBlockRead = (HoodieDataBlock) block; + List recordsRead = getRecords(dataBlockRead); + assertEquals(3, recordsRead.size(), + "Read records size should be equal to the written records size"); + assertEquals(dataBlockRead.getSchema(), HoodieCDCUtils.CDC_SCHEMA); + + GenericRecord insert = (GenericRecord) recordsRead.stream() + .filter(record -> record.get(0).toString().equals("i")).findFirst().get(); + assertNull(insert.get("before")); + assertNotNull(insert.get("after")); + + GenericRecord update = (GenericRecord) recordsRead.stream() + .filter(record -> record.get(0).toString().equals("u")).findFirst().get(); + assertNotNull(update.get("before")); + assertNotNull(update.get("after")); + assertTrue(update.get("before").toString().contains("banana")); + assertTrue(update.get("after").toString().contains("blueberry")); + + GenericRecord delete = (GenericRecord) recordsRead.stream() + .filter(record -> record.get(0).toString().equals("d")).findFirst().get(); + assertNotNull(delete.get("before")); + assertNull(delete.get("after")); + + reader.close(); + } + @ParameterizedTest @MethodSource("testArguments") public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType diskMapType, @@ -2009,6 +2070,8 @@ private HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType, List records, Map header, Path pathForReader) { switch (dataBlockType) { + case CDC_DATA_BLOCK: + return new HoodieCDCDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD); case AVRO_DATA_BLOCK: return new HoodieAvroDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD); case HFILE_DATA_BLOCK: diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index b9ff4c0d1a3ef..7bbc64782c37c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -140,7 +140,6 @@ object HoodieSparkSqlWriter { if (!tableExists) { val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT) val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER) - val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD) val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS) val useBaseFormatMetaFile = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT); @@ -148,7 +147,6 @@ object HoodieSparkSqlWriter { .setTableType(tableType) .setDatabaseName(databaseName) .setTableName(tblName) - .setRecordKeyFields(recordKeyFields) .setBaseFileFormat(baseFileFormat) .setArchiveLogFolder(archiveLogFolder) .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_NAME)) @@ -158,6 +156,8 @@ object HoodieSparkSqlWriter { .setPartitionFields(partitionColumns) .setPopulateMetaFields(populateMetaFields) .setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD)) + .setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED)) + .setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE)) .setKeyGeneratorClassProp(originKeyGeneratorClassName) .set(timestampKeyGeneratorConfigs) .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) @@ -494,6 +494,8 @@ object HoodieSparkSqlWriter { .setBaseFileFormat(baseFileFormat) .setBootstrapBasePath(bootstrapBasePath) .setPartitionFields(partitionColumns) + .setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED)) + .setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE)) .setPopulateMetaFields(populateMetaFields) .setKeyGeneratorClassProp(keyGenProp) .set(timestampKeyGeneratorConfigs) diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 7b58f5c3f37ff..ba9d33a662c21 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -160,8 +160,8 @@ public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode) assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should exist after sync completes"); assertEquals("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", - hiveClient.getTable(HiveTestUtil.TABLE_NAME).getSd().getSerdeInfo().getSerializationLib(), - "SerDe info not updated or does not match"); + hiveClient.getTable(HiveTestUtil.TABLE_NAME).getSd().getSerdeInfo().getSerializationLib(), + "SerDe info not updated or does not match"); assertEquals(hiveClient.getMetastoreSchema(HiveTestUtil.TABLE_NAME).size(), hiveClient.getStorageSchema().getColumns().size() + 1, "Hive Schema should match the table schema + partition field");