Skip to content

Commit 77f10a0

Browse files
authored
[cdc] Refactor mysql debezium json event parser (#2053)
1 parent b6d9c92 commit 77f10a0

File tree

17 files changed

+626
-169
lines changed

17 files changed

+626
-169
lines changed

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaUtils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
package org.apache.paimon.flink.action.cdc.kafka;
2020

2121
import org.apache.paimon.flink.action.cdc.TypeMapping;
22-
import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
23-
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
22+
import org.apache.paimon.flink.action.cdc.kafka.format.DataFormat;
23+
import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser;
2424
import org.apache.paimon.schema.Schema;
2525

2626
import org.apache.flink.configuration.Configuration;
@@ -44,7 +44,7 @@
4444
import java.util.stream.StreamSupport;
4545

4646
import static org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils.kafkaPropertiesGroupId;
47-
import static org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat.getDataFormat;
47+
import static org.apache.paimon.flink.action.cdc.kafka.format.DataFormat.getDataFormat;
4848

4949
/** Utility class to load kafka schema. */
5050
public class KafkaSchemaUtils {

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
import org.apache.paimon.flink.action.MultiTablesSinkMode;
2525
import org.apache.paimon.flink.action.cdc.TableNameConverter;
2626
import org.apache.paimon.flink.action.cdc.TypeMapping;
27-
import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
28-
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
27+
import org.apache.paimon.flink.action.cdc.kafka.format.DataFormat;
28+
import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser;
2929
import org.apache.paimon.flink.sink.cdc.EventParser;
3030
import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
3131
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
import org.apache.paimon.flink.action.ActionBase;
2727
import org.apache.paimon.flink.action.cdc.ComputedColumn;
2828
import org.apache.paimon.flink.action.cdc.TypeMapping;
29-
import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
30-
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
29+
import org.apache.paimon.flink.action.cdc.kafka.format.DataFormat;
30+
import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser;
3131
import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
3232
import org.apache.paimon.flink.sink.cdc.EventParser;
3333
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.paimon.flink.action.cdc.kafka.formats;
19+
package org.apache.paimon.flink.action.cdc.kafka.format;
2020

2121
import org.apache.paimon.flink.action.cdc.ComputedColumn;
2222
import org.apache.paimon.flink.action.cdc.TypeMapping;
23-
import org.apache.paimon.flink.action.cdc.kafka.formats.canal.CanalRecordParser;
24-
import org.apache.paimon.flink.action.cdc.kafka.formats.maxwell.MaxwellRecordParser;
25-
import org.apache.paimon.flink.action.cdc.kafka.formats.ogg.OggRecordParser;
23+
import org.apache.paimon.flink.action.cdc.kafka.format.canal.CanalRecordParser;
24+
import org.apache.paimon.flink.action.cdc.kafka.format.maxwell.MaxwellRecordParser;
25+
import org.apache.paimon.flink.action.cdc.kafka.format.ogg.OggRecordParser;
2626

2727
import org.apache.flink.configuration.Configuration;
2828
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.paimon.flink.action.cdc.kafka.formats;
19+
package org.apache.paimon.flink.action.cdc.kafka.format;
2020

2121
import org.apache.paimon.flink.action.cdc.ComputedColumn;
2222
import org.apache.paimon.flink.action.cdc.TypeMapping;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.paimon.flink.action.cdc.kafka.formats;
19+
package org.apache.paimon.flink.action.cdc.kafka.format;
2020

2121
import org.apache.paimon.flink.action.cdc.ComputedColumn;
2222
import org.apache.paimon.flink.action.cdc.TypeMapping;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.paimon.flink.action.cdc.kafka.formats.canal;
19+
package org.apache.paimon.flink.action.cdc.kafka.format.canal;
2020

2121
/** Converts some special types such as enum、set、geometry. */
2222
public class CanalFieldParser {
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.paimon.flink.action.cdc.kafka.formats.canal;
19+
package org.apache.paimon.flink.action.cdc.kafka.format.canal;
2020

2121
import org.apache.paimon.flink.action.cdc.ComputedColumn;
2222
import org.apache.paimon.flink.action.cdc.TypeMapping;
23-
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
23+
import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser;
2424
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
2525
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
2626
import org.apache.paimon.types.DataType;
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.paimon.flink.action.cdc.kafka.formats.maxwell;
19+
package org.apache.paimon.flink.action.cdc.kafka.format.maxwell;
2020

2121
import org.apache.paimon.flink.action.cdc.ComputedColumn;
2222
import org.apache.paimon.flink.action.cdc.TypeMapping;
23-
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
23+
import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser;
2424
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
2525
import org.apache.paimon.types.RowKind;
2626

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.paimon.flink.action.cdc.kafka.formats.ogg;
19+
package org.apache.paimon.flink.action.cdc.kafka.format.ogg;
2020

2121
import org.apache.paimon.flink.action.cdc.ComputedColumn;
2222
import org.apache.paimon.flink.action.cdc.TypeMapping;
23-
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
23+
import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser;
2424
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
2525
import org.apache.paimon.types.RowKind;
2626

0 commit comments

Comments
 (0)