Skip to content

Conversation

@lvyanquan
Copy link
Contributor

@lvyanquan lvyanquan commented Sep 30, 2025

Purpose

Linked issue: close #6357

Tests

API and Format

Documentation

Copy link
Contributor

@yunfengzhou-hub yunfengzhou-hub left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Left some comments as below.

case INSERT:
case UPDATE_AFTER:
{
return DataChangeEvent.insertEvent(tableId, binaryRecordData, new HashMap<>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be DataChangeEvent.updateEvent? Same for the UPDATE_BEFORE below.

Copy link
Member

@yuxiqian yuxiqian Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink Row encodes an update event as two separated rows (-U and +U), while CDC UpdateEvent carries both as one event. Seems it's not possible to build a complete DataChangeEvent.updateEvent with given UPDATE_BEFORE or UPDATE_AFTER row here.

return DataChangeEvent.deleteEvent(tableId, binaryRecordData, new HashMap<>());
}
default:
throw new IllegalArgumentException("don't support type of " + row.getRowKind());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Capitalize the first letter of don't. Same as FlinkCDCToPaimonDataConverter.

fieldGetter = row -> row.getInt(fieldPos);
break;
case DATE:
fieldGetter = row -> row.getInt(fieldPos);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the result type be something like DateTime or Instant? Same for below

"don't support type of " + fieldType.getTypeRoot());
}
if (!fieldType.isNullable()) {
return fieldGetter;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a test to verify the behavior when values are null but types require not null.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some quick experiments show that a "default" value will be filled when trying to write a null value into NOT NULL fields, like false for BOOLEAN and 1970-01-01 for DATE.

Shall we a) keep this behavior and add test to cover this behavior, or b) validate nullability violations and throw an exception if occurs?

org.apache.flink.cdc.common.event.DataChangeEvent dataChangeEvent =
DataChangeEvent.insertEvent(tableId, recordDataGenerator.generate(testData));

Assertions.assertEquals(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also verify the converted data in the middle phase, after the original CDC data is converted to paimon data and before it's converted back.

} else if (schema.partitionKeys() != null && !schema.partitionKeys().isEmpty()) {
partitionKeys.addAll(schema.partitionKeys());
}
builder.primaryKey(primaryKeys)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that in Flink CDC, this implementation originally looks like this

            for (String partitionColumn : partitionKeys) {
                if (!primaryKeys.contains(partitionColumn)) {
                    primaryKeys.add(partitionColumn);
                }
            }
            builder.partitionKeys(partitionKeys)
                    .primaryKey(primaryKeys)

Why do we change the implementation here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added it back for consistency.

}

@Test
public void testMysqlDefaultTimestampValueConversionInAddColumn()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is "Mysql" a typo? Or maybe better to add a comment describing why this paimon test is related to mysql.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the original test case is meant to cover some special default values in MySQL timestamp fields (like CURRENT_TIMESTAMP). Renamed it to avoid confusion.


waitingTables("t3");
jobClient.cancel();
jobClient.cancel().get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better add a timeout to avoid infinite blocking. Same for other test cases.

* @param value The value of the option to be set.
* @return A SchemaChange object representing the setting of an option.
*/
public static SchemaChange setOption(String key, String value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused method.

CdcDebeziumDeserializationSchema schema =
new CdcDebeziumDeserializationSchema(true, customConverterConfigs);
return sourceBuilder.deserializer(schema).includeSchemaChanges(true).build();
return sourceBuilder.deserializer(schema).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we describe why we need to make changes like this in non-Paimon-Flink-CDC classes? I suppose these changes are along with the Flink CDC version upgrade from 3.1 to 3.5, but not quite sure why we need to make such changes. We can add some details to the description section of this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC due to technical limitations, PostgreSQL CDC does not support capturing DDL events or emitting any schema change events. In earlier versions this method (includeSchemaChanges) is presented but has no effect at all, and got removed in apache/flink-cdc#3464.

@yuxiqian
Copy link
Member

yuxiqian commented Nov 5, 2025

Thanks @yunfengzhou-hub for kindly reviewing this. As Yanquan is on vacation right now, I'll draft another PR and address comments there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Add FlinkCDC to Paimon type and data converter.

3 participants