Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35599][cdc-connector][jdbc-mysql]Flink cdc pipeline sink jdbc mysql #3433

Open
wants to merge 24 commits into
base: master
Choose a base branch
from

Conversation

kissycn
Copy link

@kissycn kissycn commented Jun 24, 2024

Close: #FLINK-35599

@github-actions github-actions bot added the build label Jun 25, 2024
@kissycn kissycn changed the title Flink cdc pipeline sink jdbc mysql [FLINK-35599][cdc-connector][jdbc-mysql]Flink cdc pipeline sink jdbc mysql Jun 26, 2024
Copy link

@yuanoOo yuanoOo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good Job! And I left some comment. @kissycn

public class MySqlDataSinkFactory implements DataSinkFactory {
private static final Logger LOG = LoggerFactory.getLogger(MySqlDataSinkFactory.class);

public static final String IDENTIFIER = "mysql-writer";
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be changed to mysql to be consistent with other sink pipeline connectors.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do, thanks!

String jdbcUrl = MysqlPooledDataSinkFactory.INSTANCE.getJdbcUrl(builder.build());
builder.connUrl(jdbcUrl);
// print configs
Map<String, String> map = config.toMap();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my experience, when PASSWORD is empty, a NullPointerException will be thrown.
However, MySQL allows the password to be empty.

+ " jdbc.properties.allowPublicKeyRetrieval: true\n"
+ "\n"
+ "sink:\n"
+ " type: mysql-writer\n"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think mysql is better.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do, thanks!


@Override
public void addColumn(TableId tableId, AddColumnEvent addedColumns)
throws TableNotExistException, CatalogException {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception that is never thrown: TableNotExistException.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants