Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cdc-connector][postgres]Flink cdc pipeline support postgres source #3442

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

ChengJie1053
Copy link
Member

@ChengJie1053 ChengJie1053 commented Jun 29, 2024

Flink cdc pipeline support postgres source

image

image

image

@ChengJie1053 ChengJie1053 changed the title [cdc-connector][jdbc-postgres]Flink cdc pipeline source jdbc postgres [cdc-connector][jdbc-postgres]Flink cdc pipeline support postgres source Jun 29, 2024
@ChengJie1053 ChengJie1053 changed the title [cdc-connector][jdbc-postgres]Flink cdc pipeline support postgres source [cdc-connector][postgres]Flink cdc pipeline support postgres source Jul 3, 2024
Copy link
Contributor

@loserwang1024 loserwang1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have left some comment:


@Override
protected boolean isSchemaChangeRecord(SourceRecord record) {
Schema keySchema = record.keySchema();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does postgres have schema change record?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code has been modified


private void sendCreateTableEvent(
PostgresConnection jdbc, TableId tableId, SourceOutput<Event> output) {
Schema schema = PostgresSchemaUtils.getTableSchema(tableId, sourceConfig, jdbc);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if log's schema is different with current table schema?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, I looked at the code and found that debezium queries the table schema by calling postgresql-42.5.1.jar

image

image

image

@ChengJie1053
Copy link
Member Author

I have left some comment:

Ok, thanks for reviewing the code for me

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants