Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2364,7 +2364,7 @@ public boolean isMetaserverEnabled() {
* CDC supplemental logging mode.
*/
public HoodieCDCSupplementalLoggingMode getCDCSupplementalLoggingMode() {
return HoodieCDCSupplementalLoggingMode.parse(
return HoodieCDCSupplementalLoggingMode.valueOf(
getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before;
import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before_after;

/**
* This class encapsulates all the cdc-writing functions.
*/
Expand Down Expand Up @@ -240,10 +243,10 @@ public void close() {
// -------------------------------------------------------------------------

private CDCTransformer getTransformer() {
if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) {
if (cdcSupplementalLoggingMode == data_before_after) {
return (operation, recordKey, oldRecord, newRecord) ->
HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), commitTime, removeCommitMetadata(oldRecord), newRecord);
} else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) {
} else if (cdcSupplementalLoggingMode == data_before) {
return (operation, recordKey, oldRecord, newRecord) ->
HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey, removeCommitMetadata(oldRecord));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@
import java.util.stream.Collectors;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before;
import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before_after;
import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.op_key_only;

/**
* Configurations on the Hoodie Table like type of ingestion, storage formats, hive table name etc Configurations are loaded from hoodie.properties, these properties are usually set during
Expand Down Expand Up @@ -137,15 +140,15 @@ public class HoodieTableConfig extends HoodieConfig {

public static final ConfigProperty<String> CDC_SUPPLEMENTAL_LOGGING_MODE = ConfigProperty
.key("hoodie.table.cdc.supplemental.logging.mode")
.defaultValue(HoodieCDCSupplementalLoggingMode.OP_KEY.getValue())
.defaultValue(op_key_only.name())
.withValidValues(
HoodieCDCSupplementalLoggingMode.OP_KEY.getValue(),
HoodieCDCSupplementalLoggingMode.WITH_BEFORE.getValue(),
HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER.getValue())
op_key_only.name(),
data_before.name(),
data_before_after.name())
.sinceVersion("0.13.0")
.withDocumentation("When 'cdc_op_key' persist the 'op' and the record key only,"
+ " when 'cdc_data_before' persist the additional 'before' image ,"
+ " and when 'cdc_data_before_after', persist the 'before' and 'after' at the same time.");
.withDocumentation("Setting 'op_key_only' persists the 'op' and the record key only, "
+ "setting 'data_before' persists the additional 'before' image, "
+ "and setting 'data_before_after' persists the additional 'before' and 'after' images.");

public static final ConfigProperty<String> CREATE_SCHEMA = ConfigProperty
.key("hoodie.table.create.schema")
Expand Down Expand Up @@ -659,7 +662,7 @@ public boolean isCDCEnabled() {
}

public HoodieCDCSupplementalLoggingMode cdcSupplementalLoggingMode() {
return HoodieCDCSupplementalLoggingMode.parse(getStringOrDefault(CDC_SUPPLEMENTAL_LOGGING_MODE));
return HoodieCDCSupplementalLoggingMode.valueOf(getStringOrDefault(CDC_SUPPLEMENTAL_LOGGING_MODE));
}

public String getKeyGeneratorClassName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.BASE_FILE_INSERT;
import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.LOG_FILE;
import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.REPLACE_COMMIT;
import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before_after;
import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.op_key_only;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
Expand Down Expand Up @@ -280,7 +282,7 @@ private HoodieCDCFileSplit parseWriteStat(
}
} else {
// this is a cdc log
if (supplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) {
if (supplementalLoggingMode == data_before_after) {
cdcFileSplit = new HoodieCDCFileSplit(instantTs, AS_IS, writeStat.getCdcStats().keySet());
} else {
try {
Expand All @@ -292,7 +294,7 @@ private HoodieCDCFileSplit parseWriteStat(
FileSlice beforeFileSlice = null;
FileSlice currentFileSlice = new FileSlice(fileGroupId, instant.getTimestamp(),
new HoodieBaseFile(fs.getFileStatus(new Path(basePath, writeStat.getPath()))), new ArrayList<>());
if (supplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.OP_KEY)) {
if (supplementalLoggingMode == op_key_only) {
beforeFileSlice = new FileSlice(fileGroupId, writeStat.getPrevCommit(), beforeBaseFile, new ArrayList<>());
}
cdcFileSplit = new HoodieCDCFileSplit(instantTs, AS_IS, writeStat.getCdcStats().keySet(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
*
* AS_IS:
* For this type, there must be a real cdc log file from which we get the whole/part change data.
* when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after', it keeps all the fields about the
* when `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#data_before_after}, it keeps all the fields about the
* change data, including `op`, `ts_ms`, `before` and `after`. So read it and return directly,
* no more other files need to be loaded.
* when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before', it keeps the `op`, the key and the
* when `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#data_before}, it keeps the `op`, the key and the
* `before` of the changing record. When `op` is equal to 'i' or 'u', need to get the current record from the
* current base/log file as `after`.
* when `hoodie.table.cdc.supplemental.logging.mode` is 'op_key', it just keeps the `op` and the key of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,20 @@

package org.apache.hudi.common.table.cdc;

import org.apache.hudi.exception.HoodieNotSupportedException;

/**
* Change log capture supplemental logging mode. The supplemental log is used for
* accelerating the generation of change log details.
*
* <p>Three modes are supported:</p>
*
* <ul>
* <li>OP_KEY: record keys, the reader needs to figure out the update before image and after image;</li>
* <li>WITH_BEFORE: before images, the reader needs to figure out the update after images;</li>
* <li>WITH_BEFORE_AFTER: before and after images, the reader can generate the details directly from the log.</li>
* <li>op_key_only: record keys, the reader needs to figure out the update before image and after image;</li>
* <li>data_before: before images, the reader needs to figure out the update after images;</li>
* <li>data_before_after: before and after images, the reader can generate the details directly from the log.</li>
* </ul>
*/
public enum HoodieCDCSupplementalLoggingMode {
OP_KEY("cdc_op_key"),
WITH_BEFORE("cdc_data_before"),
WITH_BEFORE_AFTER("cdc_data_before_after");

private final String value;

HoodieCDCSupplementalLoggingMode(String value) {
this.value = value;
}

public String getValue() {
return this.value;
}

public static HoodieCDCSupplementalLoggingMode parse(String value) {
switch (value) {
case "cdc_op_key":
return OP_KEY;
case "cdc_data_before":
return WITH_BEFORE;
case "cdc_data_before_after":
return WITH_BEFORE_AFTER;
default:
throw new HoodieNotSupportedException("Unsupported value: " + value);
}
}
op_key_only,
data_before,
data_before_after
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class HoodieCDCUtils {
};

/**
* The schema of cdc log file in the case `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_op_key'.
* The schema of cdc log file in the case `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#op_key_only}.
*/
public static final String CDC_SCHEMA_OP_AND_RECORDKEY_STRING = "{\"type\":\"record\",\"name\":\"Record\","
+ "\"fields\":["
Expand All @@ -73,11 +73,11 @@ public class HoodieCDCUtils {
public static Schema schemaBySupplementalLoggingMode(
HoodieCDCSupplementalLoggingMode supplementalLoggingMode,
Schema tableSchema) {
if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.OP_KEY) {
if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.op_key_only) {
return CDC_SCHEMA_OP_AND_RECORDKEY;
} else if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.WITH_BEFORE) {
} else if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.data_before) {
return createCDCSchema(tableSchema, false);
} else if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER) {
} else if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.data_before_after) {
return createCDCSchema(tableSchema, true);
} else {
throw new HoodieException("not support this supplemental logging mode: " + supplementalLoggingMode);
Expand Down Expand Up @@ -109,7 +109,7 @@ private static Schema createCDCSchema(Schema tableSchema, boolean withAfterImage
}

/**
* Build the cdc record which has all the cdc fields when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'.
* Build the cdc record which has all the cdc fields when `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#data_before_after}.
*/
public static GenericData.Record cdcRecord(Schema cdcSchema, String op, String commitTime,
GenericRecord before, GenericRecord after) {
Expand All @@ -122,7 +122,7 @@ public static GenericData.Record cdcRecord(Schema cdcSchema, String op, String c
}

/**
* Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before'.
* Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#data_before}.
*/
public static GenericData.Record cdcRecord(Schema cdcSchema, String op,
String recordKey, GenericRecord before) {
Expand All @@ -134,7 +134,7 @@ public static GenericData.Record cdcRecord(Schema cdcSchema, String op,
}

/**
* Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_op_key'.
* Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#op_key_only}.
*/
public static GenericData.Record cdcRecord(Schema cdcSchema, String op, String recordKey) {
GenericData.Record record = new GenericData.Record(cdcSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ public void testCDCBlock() throws IOException, InterruptedException {
+ "]}";
Schema dataSchema = new Schema.Parser().parse(dataSchameString);
Schema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER, dataSchema);
HoodieCDCSupplementalLoggingMode.data_before_after, dataSchema);
GenericRecord insertedRecord = new GenericData.Record(dataSchema);
insertedRecord.put("uuid", 1);
insertedRecord.put("name", "apple");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieSyncTableStrategy;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieSyncTableStrategy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
Expand All @@ -49,6 +49,7 @@
import java.util.Map;
import java.util.Set;

import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before_after;
import static org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
import static org.apache.hudi.config.HoodieClusteringConfig.DAYBASED_LOOKBACK_PARTITIONS;
import static org.apache.hudi.config.HoodieClusteringConfig.PARTITION_FILTER_BEGIN_PARTITION;
Expand Down Expand Up @@ -168,12 +169,11 @@ private FlinkOptions() {
public static final ConfigOption<String> SUPPLEMENTAL_LOGGING_MODE = ConfigOptions
.key("cdc.supplemental.logging.mode")
.stringType()
.defaultValue("cdc_data_before_after") // default record all the change log images
.defaultValue(data_before_after.name())
.withFallbackKeys(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the default value of Flink as data_before_after, there are some issues maybe for the other 2 modes because Flink can trigger eager flush, the intermediate data file may cause changes loss, we can switch to other modes when I fix that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok got it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 do you have jira to track this work? i'd link it from the code as todo

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.withDescription("The supplemental logging mode:"
+ "1) 'cdc_op_key': persist the 'op' and the record key only,"
+ "2) 'cdc_data_before': persist the additional 'before' image,"
+ "3) 'cdc_data_before_after': persist the 'before' and 'after' images at the same time");
.withDescription("Setting 'op_key_only' persists the 'op' and the record key only, "
+ "setting 'data_before' persists the additional 'before' image, "
+ "and setting 'data_before_after' persists the additional 'before' and 'after' images.");

// ------------------------------------------------------------------------
// Metadata table Options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public static boolean hasNoSpecificReadCommits(Configuration conf) {
*/
public static HoodieCDCSupplementalLoggingMode getCDCSupplementalLoggingMode(Configuration conf) {
String mode = conf.getString(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE);
return HoodieCDCSupplementalLoggingMode.parse(mode);
return HoodieCDCSupplementalLoggingMode.valueOf(mode);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,11 @@ private ClosableIterator<RowData> getRecordIterator(
Schema dataSchema = HoodieAvroUtils.removeMetadataFields(new Schema.Parser().parse(tableState.getAvroSchema()));
Schema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(mode, dataSchema);
switch (mode) {
case WITH_BEFORE_AFTER:
case data_before_after:
return new BeforeAfterImageIterator(tablePath, tableState, hadoopConf, cdcSchema, fileSplit);
case WITH_BEFORE:
case data_before:
return new BeforeImageIterator(conf, hadoopConf, tablePath, tableState, cdcSchema, fileSplit, imageManager);
case OP_KEY:
case op_key_only:
return new RecordKeyImageIterator(conf, hadoopConf, tablePath, tableState, cdcSchema, fileSplit, imageManager);
default:
throw new AssertionError("Unexpected mode" + mode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ void testStreamReadFromSpecifiedCommitWithChangelog(HoodieCDCSupplementalLogging
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.CDC_ENABLED, true)
.option(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE, mode.getValue())
.option(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE, mode.name())
.end();
streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 select * from source";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ void testReadWithDeletesCOW() throws Exception {
void testReadWithChangeLogCOW(HoodieCDCSupplementalLoggingMode mode) throws Exception {
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.CDC_ENABLED.key(), "true");
options.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), mode.getValue());
options.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), mode.name());
beforeEach(HoodieTableType.COPY_ON_WRITE, options);

// write the insert data sets
Expand Down Expand Up @@ -365,7 +365,7 @@ void testReadWithChangeLogCOW(HoodieCDCSupplementalLoggingMode mode) throws Exce
void testReadFromEarliestWithChangeLogCOW(HoodieCDCSupplementalLoggingMode mode) throws Exception {
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.CDC_ENABLED.key(), "true");
options.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), mode.getValue());
options.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), mode.name());
options.put(FlinkOptions.READ_START_COMMIT.key(), "earliest");
beforeEach(HoodieTableType.COPY_ON_WRITE, options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,21 @@

package org.apache.hudi.cdc

import org.apache.hudi.AvroConversionUtils
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.HoodieDataSourceHelper
import org.apache.hudi.HoodieTableSchema
import org.apache.hudi.common.table.cdc.HoodieCDCUtils._
import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.TableSchemaResolver
import org.apache.hudi.common.table.cdc.HoodieCDCExtractor
import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode._
import org.apache.hudi.common.table.cdc.HoodieCDCUtils._
import org.apache.hudi.common.table.log.InstantRange
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, HoodieDataSourceHelper, HoodieTableSchema}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import org.apache.spark.unsafe.types.UTF8String

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -131,7 +128,7 @@ object CDCRelation {

/**
* CDC Schema For Spark.
* Also it's schema when `hoodie.table.cdc.supplemental.logging.mode` is `cdc_data_before_after`.
* Also it's schema when `hoodie.table.cdc.supplemental.logging.mode` is [[data_before_after]].
* Here we use the debezium format.
*/
val FULL_CDC_SPARK_SCHEMA: StructType = {
Expand All @@ -146,7 +143,7 @@ object CDCRelation {
}

/**
* CDC Schema For Spark when `hoodie.table.cdc.supplemental.logging.mode` is `op_key`.
* CDC Schema For Spark when `hoodie.table.cdc.supplemental.logging.mode` is [[op_key_only]].
*/
val MIN_CDC_SPARK_SCHEMA: StructType = {
StructType(
Expand All @@ -158,7 +155,7 @@ object CDCRelation {
}

/**
* CDC Schema For Spark when `hoodie.table.cdc.supplemental.logging.mode` is `cdc_data_before`.
* CDC Schema For Spark when `hoodie.table.cdc.supplemental.logging.mode` is [[data_before]].
*/
val CDC_WITH_BEFORE_SPARK_SCHEMA: StructType = {
StructType(
Expand Down
Loading