diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index b9d7c800250a0..0f7a9c1cf44f7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -2364,7 +2364,7 @@ public boolean isMetaserverEnabled() { * CDC supplemental logging mode. */ public HoodieCDCSupplementalLoggingMode getCDCSupplementalLoggingMode() { - return HoodieCDCSupplementalLoggingMode.parse( + return HoodieCDCSupplementalLoggingMode.valueOf( getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE)); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java index be646df85be4d..fd2dc60b58bc1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java @@ -58,6 +58,9 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before; +import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before_after; + /** * This class encapsulates all the cdc-writing functions. */ @@ -240,10 +243,10 @@ public void close() { // ------------------------------------------------------------------------- private CDCTransformer getTransformer() { - if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) { + if (cdcSupplementalLoggingMode == data_before_after) { return (operation, recordKey, oldRecord, newRecord) -> HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), commitTime, removeCommitMetadata(oldRecord), newRecord); - } else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) { + } else if (cdcSupplementalLoggingMode == data_before) { return (operation, recordKey, oldRecord, newRecord) -> HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey, removeCommitMetadata(oldRecord)); } else { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index e450614e2b61b..e34aa1e6dad67 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -64,6 +64,9 @@ import java.util.stream.Collectors; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before; +import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before_after; +import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.op_key_only; /** * Configurations on the Hoodie Table like type of ingestion, storage formats, hive table name etc Configurations are loaded from hoodie.properties, these properties are usually set during @@ -137,15 +140,15 @@ public class HoodieTableConfig extends HoodieConfig { public static final ConfigProperty CDC_SUPPLEMENTAL_LOGGING_MODE = ConfigProperty .key("hoodie.table.cdc.supplemental.logging.mode") - .defaultValue(HoodieCDCSupplementalLoggingMode.OP_KEY.getValue()) + .defaultValue(op_key_only.name()) .withValidValues( - HoodieCDCSupplementalLoggingMode.OP_KEY.getValue(), - HoodieCDCSupplementalLoggingMode.WITH_BEFORE.getValue(), - HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER.getValue()) + op_key_only.name(), + data_before.name(), + data_before_after.name()) .sinceVersion("0.13.0") - .withDocumentation("When 'cdc_op_key' persist the 'op' and the record key only," - + " when 'cdc_data_before' persist the additional 'before' image ," - + " and when 'cdc_data_before_after', persist the 'before' and 'after' at the same time."); + .withDocumentation("Setting 'op_key_only' persists the 'op' and the record key only, " + + "setting 'data_before' persists the additional 'before' image, " + + "and setting 'data_before_after' persists the additional 'before' and 'after' images."); public static final ConfigProperty CREATE_SCHEMA = ConfigProperty .key("hoodie.table.create.schema") @@ -659,7 +662,7 @@ public boolean isCDCEnabled() { } public HoodieCDCSupplementalLoggingMode cdcSupplementalLoggingMode() { - return HoodieCDCSupplementalLoggingMode.parse(getStringOrDefault(CDC_SUPPLEMENTAL_LOGGING_MODE)); + return HoodieCDCSupplementalLoggingMode.valueOf(getStringOrDefault(CDC_SUPPLEMENTAL_LOGGING_MODE)); } public String getKeyGeneratorClassName() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java index 994b7ea477e0e..506680dc3b2de 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java @@ -61,6 +61,8 @@ import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.BASE_FILE_INSERT; import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.LOG_FILE; import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.REPLACE_COMMIT; +import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before_after; +import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.op_key_only; import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; @@ -280,7 +282,7 @@ private HoodieCDCFileSplit parseWriteStat( } } else { // this is a cdc log - if (supplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) { + if (supplementalLoggingMode == data_before_after) { cdcFileSplit = new HoodieCDCFileSplit(instantTs, AS_IS, writeStat.getCdcStats().keySet()); } else { try { @@ -292,7 +294,7 @@ private HoodieCDCFileSplit parseWriteStat( FileSlice beforeFileSlice = null; FileSlice currentFileSlice = new FileSlice(fileGroupId, instant.getTimestamp(), new HoodieBaseFile(fs.getFileStatus(new Path(basePath, writeStat.getPath()))), new ArrayList<>()); - if (supplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.OP_KEY)) { + if (supplementalLoggingMode == op_key_only) { beforeFileSlice = new FileSlice(fileGroupId, writeStat.getPrevCommit(), beforeBaseFile, new ArrayList<>()); } cdcFileSplit = new HoodieCDCFileSplit(instantTs, AS_IS, writeStat.getCdcStats().keySet(), diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java index c6005c601012a..dfcb08a84cd40 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java @@ -24,10 +24,10 @@ * * AS_IS: * For this type, there must be a real cdc log file from which we get the whole/part change data. - * when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after', it keeps all the fields about the + * when `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#data_before_after}, it keeps all the fields about the * change data, including `op`, `ts_ms`, `before` and `after`. So read it and return directly, * no more other files need to be loaded. - * when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before', it keeps the `op`, the key and the + * when `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#data_before}, it keeps the `op`, the key and the * `before` of the changing record. When `op` is equal to 'i' or 'u', need to get the current record from the * current base/log file as `after`. * when `hoodie.table.cdc.supplemental.logging.mode` is 'op_key', it just keeps the `op` and the key of diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java index 13a51a4f07295..b52d1432fc11a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java @@ -18,8 +18,6 @@ package org.apache.hudi.common.table.cdc; -import org.apache.hudi.exception.HoodieNotSupportedException; - /** * Change log capture supplemental logging mode. The supplemental log is used for * accelerating the generation of change log details. @@ -27,36 +25,13 @@ *

Three modes are supported:

* * */ public enum HoodieCDCSupplementalLoggingMode { - OP_KEY("cdc_op_key"), - WITH_BEFORE("cdc_data_before"), - WITH_BEFORE_AFTER("cdc_data_before_after"); - - private final String value; - - HoodieCDCSupplementalLoggingMode(String value) { - this.value = value; - } - - public String getValue() { - return this.value; - } - - public static HoodieCDCSupplementalLoggingMode parse(String value) { - switch (value) { - case "cdc_op_key": - return OP_KEY; - case "cdc_data_before": - return WITH_BEFORE; - case "cdc_data_before_after": - return WITH_BEFORE_AFTER; - default: - throw new HoodieNotSupportedException("Unsupported value: " + value); - } - } + op_key_only, + data_before, + data_before_after } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java index 6ca5869fdfd49..069567208b024 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java @@ -59,7 +59,7 @@ public class HoodieCDCUtils { }; /** - * The schema of cdc log file in the case `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_op_key'. + * The schema of cdc log file in the case `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#op_key_only}. */ public static final String CDC_SCHEMA_OP_AND_RECORDKEY_STRING = "{\"type\":\"record\",\"name\":\"Record\"," + "\"fields\":[" @@ -73,11 +73,11 @@ public class HoodieCDCUtils { public static Schema schemaBySupplementalLoggingMode( HoodieCDCSupplementalLoggingMode supplementalLoggingMode, Schema tableSchema) { - if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.OP_KEY) { + if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.op_key_only) { return CDC_SCHEMA_OP_AND_RECORDKEY; - } else if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.WITH_BEFORE) { + } else if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.data_before) { return createCDCSchema(tableSchema, false); - } else if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER) { + } else if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.data_before_after) { return createCDCSchema(tableSchema, true); } else { throw new HoodieException("not support this supplemental logging mode: " + supplementalLoggingMode); @@ -109,7 +109,7 @@ private static Schema createCDCSchema(Schema tableSchema, boolean withAfterImage } /** - * Build the cdc record which has all the cdc fields when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'. + * Build the cdc record which has all the cdc fields when `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#data_before_after}. */ public static GenericData.Record cdcRecord(Schema cdcSchema, String op, String commitTime, GenericRecord before, GenericRecord after) { @@ -122,7 +122,7 @@ public static GenericData.Record cdcRecord(Schema cdcSchema, String op, String c } /** - * Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before'. + * Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#data_before}. */ public static GenericData.Record cdcRecord(Schema cdcSchema, String op, String recordKey, GenericRecord before) { @@ -134,7 +134,7 @@ public static GenericData.Record cdcRecord(Schema cdcSchema, String op, } /** - * Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_op_key'. + * Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#op_key_only}. */ public static GenericData.Record cdcRecord(Schema cdcSchema, String op, String recordKey) { GenericData.Record record = new GenericData.Record(cdcSchema); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index a8828514eebe9..9250429b3773c 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -551,7 +551,7 @@ public void testCDCBlock() throws IOException, InterruptedException { + "]}"; Schema dataSchema = new Schema.Parser().parse(dataSchameString); Schema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode( - HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER, dataSchema); + HoodieCDCSupplementalLoggingMode.data_before_after, dataSchema); GenericRecord insertedRecord = new GenericData.Record(dataSchema); insertedRecord.put("uuid", 1); insertedRecord.put("name", "apple"); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index c1812b79e3819..63bb0d365a2c7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -25,8 +25,8 @@ import org.apache.hudi.common.model.EventTimeAvroPayload; import org.apache.hudi.common.model.HoodieAvroRecordMerger; import org.apache.hudi.common.model.HoodieCleaningPolicy; -import org.apache.hudi.common.model.HoodieSyncTableStrategy; import org.apache.hudi.common.model.HoodieRecordMerger; +import org.apache.hudi.common.model.HoodieSyncTableStrategy; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; @@ -49,6 +49,7 @@ import java.util.Map; import java.util.Set; +import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before_after; import static org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH; import static org.apache.hudi.config.HoodieClusteringConfig.DAYBASED_LOOKBACK_PARTITIONS; import static org.apache.hudi.config.HoodieClusteringConfig.PARTITION_FILTER_BEGIN_PARTITION; @@ -168,12 +169,11 @@ private FlinkOptions() { public static final ConfigOption SUPPLEMENTAL_LOGGING_MODE = ConfigOptions .key("cdc.supplemental.logging.mode") .stringType() - .defaultValue("cdc_data_before_after") // default record all the change log images + .defaultValue(data_before_after.name()) .withFallbackKeys(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key()) - .withDescription("The supplemental logging mode:" - + "1) 'cdc_op_key': persist the 'op' and the record key only," - + "2) 'cdc_data_before': persist the additional 'before' image," - + "3) 'cdc_data_before_after': persist the 'before' and 'after' images at the same time"); + .withDescription("Setting 'op_key_only' persists the 'op' and the record key only, " + + "setting 'data_before' persists the additional 'before' image, " + + "and setting 'data_before_after' persists the additional 'before' and 'after' images."); // ------------------------------------------------------------------------ // Metadata table Options diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java index af3e25ef2c0bc..42b94b58351a3 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java @@ -210,7 +210,7 @@ public static boolean hasNoSpecificReadCommits(Configuration conf) { */ public static HoodieCDCSupplementalLoggingMode getCDCSupplementalLoggingMode(Configuration conf) { String mode = conf.getString(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE); - return HoodieCDCSupplementalLoggingMode.parse(mode); + return HoodieCDCSupplementalLoggingMode.valueOf(mode); } /** diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java index 4e162d8e2b8c7..8474c2a797ad1 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java @@ -173,11 +173,11 @@ private ClosableIterator getRecordIterator( Schema dataSchema = HoodieAvroUtils.removeMetadataFields(new Schema.Parser().parse(tableState.getAvroSchema())); Schema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(mode, dataSchema); switch (mode) { - case WITH_BEFORE_AFTER: + case data_before_after: return new BeforeAfterImageIterator(tablePath, tableState, hadoopConf, cdcSchema, fileSplit); - case WITH_BEFORE: + case data_before: return new BeforeImageIterator(conf, hadoopConf, tablePath, tableState, cdcSchema, fileSplit, imageManager); - case OP_KEY: + case op_key_only: return new RecordKeyImageIterator(conf, hadoopConf, tablePath, tableState, cdcSchema, fileSplit, imageManager); default: throw new AssertionError("Unexpected mode" + mode); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 10f2bcd095a42..d733965b2ebe4 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -162,7 +162,7 @@ void testStreamReadFromSpecifiedCommitWithChangelog(HoodieCDCSupplementalLogging .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.READ_AS_STREAMING, true) .option(FlinkOptions.CDC_ENABLED, true) - .option(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE, mode.getValue()) + .option(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE, mode.name()) .end(); streamTableEnv.executeSql(hoodieTableDDL); String insertInto = "insert into t1 select * from source"; diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index 6d0bf731ccc35..7563498bbb61b 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -324,7 +324,7 @@ void testReadWithDeletesCOW() throws Exception { void testReadWithChangeLogCOW(HoodieCDCSupplementalLoggingMode mode) throws Exception { Map options = new HashMap<>(); options.put(FlinkOptions.CDC_ENABLED.key(), "true"); - options.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), mode.getValue()); + options.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), mode.name()); beforeEach(HoodieTableType.COPY_ON_WRITE, options); // write the insert data sets @@ -365,7 +365,7 @@ void testReadWithChangeLogCOW(HoodieCDCSupplementalLoggingMode mode) throws Exce void testReadFromEarliestWithChangeLogCOW(HoodieCDCSupplementalLoggingMode mode) throws Exception { Map options = new HashMap<>(); options.put(FlinkOptions.CDC_ENABLED.key(), "true"); - options.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), mode.getValue()); + options.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), mode.name()); options.put(FlinkOptions.READ_START_COMMIT.key(), "earliest"); beforeEach(HoodieTableType.COPY_ON_WRITE, options); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala index 4e90303b3b2d5..5b12a2ab218c9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala @@ -18,24 +18,21 @@ package org.apache.hudi.cdc -import org.apache.hudi.AvroConversionUtils -import org.apache.hudi.DataSourceReadOptions -import org.apache.hudi.HoodieDataSourceHelper -import org.apache.hudi.HoodieTableSchema -import org.apache.hudi.common.table.cdc.HoodieCDCUtils._ -import org.apache.hudi.common.table.cdc.HoodieCDCOperation._ -import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.hudi.common.table.TableSchemaResolver import org.apache.hudi.common.table.cdc.HoodieCDCExtractor +import org.apache.hudi.common.table.cdc.HoodieCDCOperation._ +import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode._ +import org.apache.hudi.common.table.cdc.HoodieCDCUtils._ import org.apache.hudi.common.table.log.InstantRange +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.exception.HoodieException import org.apache.hudi.internal.schema.InternalSchema +import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, HoodieDataSourceHelper, HoodieTableSchema} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.{Row, SQLContext, SparkSession} import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.{Row, SQLContext, SparkSession} import org.apache.spark.unsafe.types.UTF8String import scala.collection.JavaConverters._ @@ -131,7 +128,7 @@ object CDCRelation { /** * CDC Schema For Spark. - * Also it's schema when `hoodie.table.cdc.supplemental.logging.mode` is `cdc_data_before_after`. + * Also it's schema when `hoodie.table.cdc.supplemental.logging.mode` is [[data_before_after]]. * Here we use the debezium format. */ val FULL_CDC_SPARK_SCHEMA: StructType = { @@ -146,7 +143,7 @@ object CDCRelation { } /** - * CDC Schema For Spark when `hoodie.table.cdc.supplemental.logging.mode` is `op_key`. + * CDC Schema For Spark when `hoodie.table.cdc.supplemental.logging.mode` is [[op_key_only]]. */ val MIN_CDC_SPARK_SCHEMA: StructType = { StructType( @@ -158,7 +155,7 @@ object CDCRelation { } /** - * CDC Schema For Spark when `hoodie.table.cdc.supplemental.logging.mode` is `cdc_data_before`. + * CDC Schema For Spark when `hoodie.table.cdc.supplemental.logging.mode` is [[data_before]]. */ val CDC_WITH_BEFORE_SPARK_SCHEMA: StructType = { StructType( diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala index 4768e4a3d8785..767300f36591c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala @@ -18,40 +18,40 @@ package org.apache.hudi.cdc +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.hudi.HoodieBaseRelation.BaseFileReader -import org.apache.hudi.{AvroConversionUtils, AvroProjection, HoodieFileIndex, HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, HoodieUnsafeRDD, LogFileIterator, RecordMergingFileIterator, SparkAdapterSupport} import org.apache.hudi.HoodieConversionUtils._ import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} -import org.apache.hudi.common.model.{FileSlice, HoodieAvroRecordMerger, HoodieLogFile, HoodieRecord, HoodieRecordMerger, HoodieRecordPayload} +import org.apache.hudi.common.model._ import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.cdc.HoodieCDCInferCase._ import org.apache.hudi.common.table.cdc.HoodieCDCOperation._ -import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils} +import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode._ +import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCUtils} import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig} import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory -import com.fasterxml.jackson.annotation.JsonInclude.Include -import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} -import com.fasterxml.jackson.module.scala.DefaultScalaModule -import org.apache.avro.Schema -import org.apache.avro.generic.{GenericData, GenericRecord, GenericRecordBuilder, IndexedRecord} -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.spark.{Partition, SerializableWritable, TaskContext} +import org.apache.hudi.{AvroConversionUtils, AvroProjection, HoodieFileIndex, HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, HoodieUnsafeRDD, LogFileIterator, RecordMergingFileIterator, SparkAdapterSupport} import org.apache.spark.rdd.RDD import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.PartitionedFile -import org.apache.spark.sql.types.StructType import org.apache.spark.sql.SparkSession import org.apache.spark.sql.avro.HoodieAvroDeserializer +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Projection -import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.{Partition, SerializableWritable, TaskContext} import java.io.Closeable import java.util.Properties @@ -244,7 +244,7 @@ class HoodieCDCRDD( /** * Keep the after-image data. Only one case will use this: - * the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]] is 'op_key' or 'cdc_data_before'. + * the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]] is [[op_key_only]] or [[data_before]]. */ private var afterImageRecords: mutable.Map[String, InternalRow] = mutable.Map.empty @@ -306,13 +306,13 @@ class HoodieCDCRDD( case AS_IS => val record = cdcLogRecordIterator.next().asInstanceOf[GenericRecord] cdcSupplementalLoggingMode match { - case HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER => + case `data_before_after` => recordToLoad.update(0, convertToUTF8String(String.valueOf(record.get(0)))) val before = record.get(2).asInstanceOf[GenericRecord] recordToLoad.update(2, recordToJsonAsUTF8String(before)) val after = record.get(3).asInstanceOf[GenericRecord] recordToLoad.update(3, recordToJsonAsUTF8String(after)) - case HoodieCDCSupplementalLoggingMode.WITH_BEFORE => + case `data_before` => val row = cdcRecordDeserializer.deserialize(record).get.asInstanceOf[InternalRow] val op = row.getString(0) val recordKey = row.getString(1) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala index 7ad7ec1024778..fce3f2289e691 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala @@ -21,7 +21,7 @@ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieKey, HoodieLogFile, HoodieRecord} -import org.apache.hudi.common.table.cdc.{HoodieCDCOperation, HoodieCDCUtils} +import org.apache.hudi.common.table.cdc.{HoodieCDCOperation, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.table.log.HoodieLogFormat import org.apache.hudi.common.table.log.block.HoodieDataBlock @@ -33,9 +33,11 @@ import org.apache.avro.Schema import org.apache.avro.generic.{GenericRecord, IndexedRecord} import org.apache.hadoop.fs.Path import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType +import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.{data_before, op_key_only} import org.apache.spark.sql.{DataFrame, SparkSession} import org.junit.jupiter.api.{AfterEach, BeforeEach} import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertNull} + import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -133,19 +135,19 @@ abstract class HoodieCDCTestBase extends HoodieClientTestBase { records.toList } - protected def checkCDCDataForInsertOrUpdate(cdcSupplementalLoggingMode: String, - cdcSchema: Schema, - dataSchema: Schema, - cdcRecords: Seq[HoodieRecord[_]], - newHoodieRecords: java.util.List[HoodieRecord[_]], - op: HoodieCDCOperation): Unit = { + protected def checkCDCDataForInsertOrUpdate(loggingMode: HoodieCDCSupplementalLoggingMode, + cdcSchema: Schema, + dataSchema: Schema, + cdcRecords: Seq[HoodieRecord[_]], + newHoodieRecords: java.util.List[HoodieRecord[_]], + op: HoodieCDCOperation): Unit = { val cdcRecord = cdcRecords.head.getData.asInstanceOf[GenericRecord] // check schema assertEquals(cdcRecord.getSchema, cdcSchema) - if (cdcSupplementalLoggingMode == "cdc_op_key") { + if (loggingMode == op_key_only) { // check record key assert(cdcRecords.map(_.getData.asInstanceOf[GenericRecord].get(1).toString).sorted == newHoodieRecords.map(_.getKey.getRecordKey).sorted) - } else if (cdcSupplementalLoggingMode == "cdc_data_before") { + } else if (loggingMode == data_before) { // check record key assert(cdcRecords.map(_.getData.asInstanceOf[GenericRecord].get(1).toString).sorted == newHoodieRecords.map(_.getKey.getRecordKey).sorted) // check before @@ -181,17 +183,17 @@ abstract class HoodieCDCTestBase extends HoodieClientTestBase { } } - protected def checkCDCDataForDelete(cdcSupplementalLoggingMode: String, - cdcSchema: Schema, - cdcRecords: Seq[IndexedRecord], - deletedKeys: java.util.List[HoodieKey]): Unit = { + protected def checkCDCDataForDelete(loggingMode: HoodieCDCSupplementalLoggingMode, + cdcSchema: Schema, + cdcRecords: Seq[IndexedRecord], + deletedKeys: java.util.List[HoodieKey]): Unit = { val cdcRecord = cdcRecords.head.asInstanceOf[GenericRecord] // check schema assertEquals(cdcRecord.getSchema, cdcSchema) - if (cdcSupplementalLoggingMode == "cdc_op_key") { + if (loggingMode == op_key_only) { // check record key assert(cdcRecords.map(_.get(1).toString).sorted == deletedKeys.map(_.getRecordKey).sorted) - } else if (cdcSupplementalLoggingMode == "cdc_data_before") { + } else if (loggingMode == data_before) { // check record key assert(cdcRecords.map(_.get(1).toString).sorted == deletedKeys.map(_.getRecordKey).sorted) } else { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala index bc90485380613..14b4f50700a8c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala @@ -20,16 +20,16 @@ package org.apache.hudi.functional.cdc import org.apache.avro.generic.GenericRecord import org.apache.hudi.DataSourceWriteOptions -import org.apache.hudi.common.table.cdc.{HoodieCDCOperation, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils} +import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.op_key_only +import org.apache.hudi.common.table.cdc.HoodieCDCUtils.schemaBySupplementalLoggingMode +import org.apache.hudi.common.table.cdc.{HoodieCDCOperation, HoodieCDCSupplementalLoggingMode} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings} - import org.apache.spark.sql.SaveMode - import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.CsvSource +import org.junit.jupiter.params.provider.{CsvSource, EnumSource} import scala.collection.JavaConversions._ @@ -44,10 +44,10 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { * Step6: Bluk_Insert 20 */ @ParameterizedTest - @CsvSource(Array("cdc_op_key", "cdc_data_before", "cdc_data_before_after")) - def testCOWDataSourceWrite(cdcSupplementalLoggingMode: String): Unit = { + @EnumSource(classOf[HoodieCDCSupplementalLoggingMode]) + def testCOWDataSourceWrite(loggingMode: HoodieCDCSupplementalLoggingMode): Unit = { val options = commonOpts ++ Map( - HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> cdcSupplementalLoggingMode + HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> loggingMode.name() ) var totalInsertedCnt = 0L @@ -70,8 +70,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { val schemaResolver = new TableSchemaResolver(metaClient) val dataSchema = schemaResolver.getTableAvroSchema(false) - val cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode( - HoodieCDCSupplementalLoggingMode.parse(cdcSupplementalLoggingMode), dataSchema) + val cdcSchema = schemaBySupplementalLoggingMode(loggingMode, dataSchema) totalInsertedCnt += 100 val instant1 = metaClient.reloadActiveTimeline.lastInstant().get() @@ -98,7 +97,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { // check the num of cdc data assertEquals(cdcDataFromCDCLogFile2.size, 50) // check record key, before, after according to the supplemental logging mode - checkCDCDataForInsertOrUpdate(cdcSupplementalLoggingMode, cdcSchema, dataSchema, + checkCDCDataForInsertOrUpdate(loggingMode, cdcSchema, dataSchema, cdcDataFromCDCLogFile2, hoodieRecords2, HoodieCDCOperation.UPDATE) val commitTime2 = instant2.getTimestamp @@ -225,11 +224,11 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { * Step7: Upsert 30 With CLean */ @ParameterizedTest - @CsvSource(Array("cdc_op_key", "cdc_data_before", "cdc_data_before_after")) - def testMORDataSourceWrite(cdcSupplementalLoggingMode: String): Unit = { + @EnumSource(classOf[HoodieCDCSupplementalLoggingMode]) + def testMORDataSourceWrite(loggingMode: HoodieCDCSupplementalLoggingMode): Unit = { val options = commonOpts ++ Map( DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, - HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> cdcSupplementalLoggingMode + HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> loggingMode.name() ) var totalInsertedCnt = 0L @@ -252,8 +251,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { val schemaResolver = new TableSchemaResolver(metaClient) val dataSchema = schemaResolver.getTableAvroSchema(false) - val cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode( - HoodieCDCSupplementalLoggingMode.parse(cdcSupplementalLoggingMode), dataSchema) + val cdcSchema = schemaBySupplementalLoggingMode(loggingMode, dataSchema) totalInsertedCnt += 100 val instant1 = metaClient.reloadActiveTimeline.lastInstant().get() @@ -429,14 +427,14 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { */ @ParameterizedTest @CsvSource(Array( - "COPY_ON_WRITE,cdc_data_before_after", "MERGE_ON_READ,cdc_data_before_after", - "COPY_ON_WRITE,cdc_data_before", "MERGE_ON_READ,cdc_data_before", - "COPY_ON_WRITE,cdc_op_key", "MERGE_ON_READ,cdc_op_key")) - def testDataSourceWriteWithPartitionField(tableType: String, cdcSupplementalLoggingMode: String): Unit = { + "COPY_ON_WRITE,data_before_after", "MERGE_ON_READ,data_before_after", + "COPY_ON_WRITE,data_before", "MERGE_ON_READ,data_before", + "COPY_ON_WRITE,op_key_only", "MERGE_ON_READ,op_key_only")) + def testDataSourceWriteWithPartitionField(tableType: String, loggingMode: String): Unit = { val options = commonOpts ++ Map( DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", DataSourceWriteOptions.TABLE_TYPE.key -> tableType, - HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> cdcSupplementalLoggingMode + HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> loggingMode ) var totalInsertedCnt = 0L @@ -545,9 +543,9 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { } @ParameterizedTest - @CsvSource(Array("cdc_op_key", "cdc_data_before", "cdc_data_before_after")) - def testCDCWithMultiBlocksAndLogFiles(cdcSupplementalLoggingMode: String): Unit = { - val (blockSize, logFileSize) = if (cdcSupplementalLoggingMode == "cdc_op_key") { + @EnumSource(classOf[HoodieCDCSupplementalLoggingMode]) + def testCDCWithMultiBlocksAndLogFiles(loggingMode: HoodieCDCSupplementalLoggingMode): Unit = { + val (blockSize, logFileSize) = if (loggingMode == op_key_only) { // only op and key will be stored in cdc log file, we set the smaller values for the two configs. // so that it can also write out more than one cdc log file // and each of cdc log file has more that one data block as we expect. @@ -556,7 +554,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { (2048, 5120) } val options = commonOpts ++ Map( - HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> cdcSupplementalLoggingMode, + HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> loggingMode.name(), "hoodie.logfile.data.block.max.size" -> blockSize.toString, "hoodie.logfile.max.size" -> logFileSize.toString ) @@ -576,8 +574,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { val schemaResolver = new TableSchemaResolver(metaClient) val dataSchema = schemaResolver.getTableAvroSchema(false) - val cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode( - HoodieCDCSupplementalLoggingMode.parse(cdcSupplementalLoggingMode), dataSchema) + val cdcSchema = schemaBySupplementalLoggingMode(loggingMode, dataSchema) // Upsert Operation val hoodieRecords2 = dataGen.generateUniqueUpdates("001", 50) @@ -595,7 +592,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { // check the num of cdc data assertEquals(cdcDataFromCDCLogFile2.size, 50) // check record key, before, after according to the supplemental logging mode - checkCDCDataForInsertOrUpdate(cdcSupplementalLoggingMode, cdcSchema, dataSchema, + checkCDCDataForInsertOrUpdate(loggingMode, cdcSchema, dataSchema, cdcDataFromCDCLogFile2, hoodieRecords2, HoodieCDCOperation.UPDATE) val commitTime2 = instant2.getTimestamp diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCStreamingSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCStreamingSuite.scala index 873d55d64cd89..28a993e0510a3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCStreamingSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCStreamingSuite.scala @@ -17,19 +17,17 @@ package org.apache.hudi.functional.cdc -import org.apache.hudi.DataSourceReadOptions -import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.config.HoodieWriteConfig - +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} import org.apache.spark.sql.QueryTest.checkAnswer -import org.apache.spark.sql.{Column, Dataset, Row, SaveMode} import org.apache.spark.sql.catalyst.expressions.{Add, If, Literal} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions._ - +import org.apache.spark.sql.{Column, Dataset, Row, SaveMode} import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.CsvSource +import org.junit.jupiter.params.provider.EnumSource class TestCDCStreamingSuite extends HoodieCDCTestBase { @@ -45,8 +43,8 @@ class TestCDCStreamingSuite extends HoodieCDCTestBase { * and write to country_to_population_tbl. */ @ParameterizedTest - @CsvSource(Array("cdc_op_key", "cdc_data_before", "cdc_data_before_after")) - def cdcStreaming(cdcSupplementalLoggingMode: String): Unit = { + @EnumSource(classOf[HoodieCDCSupplementalLoggingMode]) + def cdcStreaming(loggingMode: HoodieCDCSupplementalLoggingMode): Unit = { val commonOptions = Map( "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4", @@ -69,7 +67,7 @@ class TestCDCStreamingSuite extends HoodieCDCTestBase { userToCountryDF.write.format("hudi") .options(commonOptions) .option(HoodieTableConfig.CDC_ENABLED.key, "true") - .option(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key, cdcSupplementalLoggingMode) + .option(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key, loggingMode.name()) .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "userid") .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts") .option(HoodieWriteConfig.TBL_NAME.key, "user_to_country") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala index 60aa3c3e0774f..bec2230e5ab57 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala @@ -19,10 +19,9 @@ package org.apache.spark.sql.hudi import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.common.table.HoodieTableMetaClient - +import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.{data_before, op_key_only} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions._ - import org.junit.jupiter.api.Assertions.assertEquals class TestCDCForSparkSQL extends HoodieSparkSqlTestBase { @@ -54,7 +53,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase { spark.sql(s"use $databaseName") Seq("cow", "mor").foreach { tableType => - Seq("cdc_op_key", "cdc_data_before").foreach { cdcSupplementalLoggingMode => + Seq(op_key_only, data_before).foreach { loggingMode => withTempDir { tmp => val tableName = generateTableName val basePath = s"${tmp.getCanonicalPath}/$tableName" @@ -70,7 +69,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase { | 'primaryKey' = 'id', | 'preCombineField' = 'ts', | 'hoodie.table.cdc.enabled' = 'true', - | 'hoodie.table.cdc.supplemental.logging.mode' = '$cdcSupplementalLoggingMode', + | 'hoodie.table.cdc.supplemental.logging.mode' = '${loggingMode.name()}', | type = '$tableType' | ) | location '$basePath' @@ -174,7 +173,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase { spark.sql(s"use $databaseName") Seq("cow", "mor").foreach { tableType => - Seq("cdc_op_key", "cdc_data_before").foreach { cdcSupplementalLoggingMode => + Seq(op_key_only, data_before).foreach { loggingMode => withTempDir { tmp => val tableName = generateTableName val basePath = s"${tmp.getCanonicalPath}/$tableName" @@ -192,7 +191,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase { | 'primaryKey' = 'id', | 'preCombineField' = 'ts', | 'hoodie.table.cdc.enabled' = 'true', - | 'hoodie.table.cdc.supplemental.logging.mode' = '$cdcSupplementalLoggingMode', + | 'hoodie.table.cdc.supplemental.logging.mode' = '${loggingMode.name()}', | 'type' = '$tableType' | ) | location '$basePath'