Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lost data when gtid with Multiple transaction id on mgr cluster #3452

Closed
zbingwen opened this issue Jul 4, 2024 · 3 comments
Closed

lost data when gtid with Multiple transaction id on mgr cluster #3452

zbingwen opened this issue Jul 4, 2024 · 3 comments

Comments

@zbingwen
Copy link

zbingwen commented Jul 4, 2024

version : flinkcdc 3.0

Merged GTID set is aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984950:220530100-221264502 is Incorrect

log:

2024-07-04 02:36:48,720 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext [] - Starting offset is initialized to {transaction_id=null, ts_sec=0, file=mysql-bin.000501, pos=616624667, kind=SPECIFIC, gtids=aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984892:220530100-221264502, row=0, event=0, server_id=102}
2024-07-04 02:36:48,725 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext [] - Merging server GTID set aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984950:220530100-221264502 with restored GTID set aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984892:220530100-221264502
2024-07-04 02:36:48,727 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext [] - Merged GTID set is aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984950:220530100-221264502
2024-07-04 02:36:48,727 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext [] - MySQL current GTID set aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984950:220530100-221264502 does contain the GTID set aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984950:220530100-221264502 required by the connector.
2024-07-04 02:36:48,738 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext [] - Server has already purged aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219531535:220530100-220605830 GTIDs
2024-07-04 02:36:48,739 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext [] - GTID set  known by the server but not processed yet, for replication are available only GTID set 
2024-07-04 02:36:48,776 INFO  io.debezium.util.Threads                                     [] - Requested thread factory for connector MySqlConnector, id = mysql_binlog_source named = binlog-client
2024-07-04 02:36:48,786 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - GTID set purged on server: aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219531535:220530100-220605830
2024-07-04 02:36:48,787 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - Attempting to generate a filtered GTID set
2024-07-04 02:36:48,787 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - GTID set from previous recorded offset: aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984892:220530100-221264502
2024-07-04 02:36:48,787 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - GTID set available on server: aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984951:220530100-221264502
2024-07-04 02:36:48,787 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - Using first available positions for new GTID channels
2024-07-04 02:36:48,787 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - Relevant GTID set available on server: aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984951:220530100-221264502
2024-07-04 02:36:48,789 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - Final merged GTID set to use when connecting to MySQL: aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984951:220530100-221264502
2024-07-04 02:36:48,789 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - Registering binlog reader with GTID set: aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984951:220530100-221264502
@zbingwen
Copy link
Author

zbingwen commented Jul 4, 2024

restoredIntervalEnd logic is wrong

    public static GtidSet fixRestoredGtidSetOld(GtidSet serverGtidSet, GtidSet restoredGtidSet) {
        Map<String, GtidSet.UUIDSet> newSet = new HashMap<>();
        serverGtidSet.getUUIDSets().forEach(uuidSet -> newSet.put(uuidSet.getUUID(), uuidSet));
        for (GtidSet.UUIDSet uuidSet : restoredGtidSet.getUUIDSets()) {
            GtidSet.UUIDSet serverUuidSet = newSet.get(uuidSet.getUUID());
            if (serverUuidSet != null) {
                long restoredIntervalEnd = getIntervalEnd(uuidSet);
                List<com.github.shyiko.mysql.binlog.GtidSet.Interval> newIntervals =
                        new ArrayList<>();
                for (GtidSet.Interval serverInterval : serverUuidSet.getIntervals()) {
                    if (serverInterval.getEnd() <= restoredIntervalEnd) {
                        newIntervals.add(
                                new com.github.shyiko.mysql.binlog.GtidSet.Interval(
                                        serverInterval.getStart(), serverInterval.getEnd()));
                    } else if (serverInterval.getStart() <= restoredIntervalEnd
                            && serverInterval.getEnd() > restoredIntervalEnd) {
                        newIntervals.add(
                                new com.github.shyiko.mysql.binlog.GtidSet.Interval(
                                        serverInterval.getStart(), restoredIntervalEnd));
                    }
                }
                newSet.put(
                        uuidSet.getUUID(),
                        new GtidSet.UUIDSet(
                                new com.github.shyiko.mysql.binlog.GtidSet.UUIDSet(
                                        uuidSet.getUUID(), newIntervals)));
            } else {
                newSet.put(uuidSet.getUUID(), uuidSet);
            }
        }
        return new GtidSet(newSet);
    }

@zbingwen
Copy link
Author

zbingwen commented Jul 4, 2024

change to below , test is correct.

    public static GtidSet fixRestoredGtidSet(GtidSet serverGtidSet, GtidSet restoredGtidSet) {
        Map<String, GtidSet.UUIDSet> newSet = new HashMap<>();

        serverGtidSet.getUUIDSets().forEach(uuidSet -> newSet.put(uuidSet.getUUID(), uuidSet));

        for (GtidSet.UUIDSet uuidSet : restoredGtidSet.getUUIDSets()) {

            GtidSet.UUIDSet serverUuidSet = newSet.get(uuidSet.getUUID());

            if (serverUuidSet != null) {
                List<GtidSet.Interval> restoredIntervals = uuidSet.getIntervals();
                int restoredIntervalsSize = restoredIntervals.size();
                List<GtidSet.Interval> serverIntervals = serverUuidSet.getIntervals();

                List<com.github.shyiko.mysql.binlog.GtidSet.Interval> newIntervals =
                        new ArrayList<>();

                for (int i = 0; i < serverIntervals.size(); i++) {
                    GtidSet.Interval serverInterval = serverIntervals.get(i);
                    if (i < restoredIntervalsSize) {
                        GtidSet.Interval restoredInterval = restoredIntervals.get(i);
                        if (serverInterval.getEnd() <= restoredInterval.getEnd()) {
                            newIntervals.add(
                                    new com.github.shyiko.mysql.binlog.GtidSet.Interval(
                                            serverInterval.getStart(), serverInterval.getEnd()));
                        } else if (serverInterval.getStart() <= restoredInterval.getEnd()
                                && serverInterval.getEnd() > restoredInterval.getEnd()) {
                            newIntervals.add(
                                    new com.github.shyiko.mysql.binlog.GtidSet.Interval(
                                            serverInterval.getStart(), restoredInterval.getEnd()));
                        }
                    } else {
                        newIntervals.add(
                                new com.github.shyiko.mysql.binlog.GtidSet.Interval(
                                        serverInterval.getStart(), serverInterval.getEnd()));
                    }
                }
                newSet.put(
                        uuidSet.getUUID(),
                        new GtidSet.UUIDSet(
                                new com.github.shyiko.mysql.binlog.GtidSet.UUIDSet(
                                        uuidSet.getUUID(), newIntervals)));
            } else {
                newSet.put(uuidSet.getUUID(), uuidSet);
            }
        }
        return new GtidSet(newSet);
    }

@leonardBang
Copy link
Contributor

As required by Apache Flink, please report bugs or new features on Apache Jira
under the project Flink using component tag Flink CDC. You must have a JIRA account in order to log cases and issues.
If you don’t have an ASF JIRA account, you can request one at the ASF Self-serve portal,
account creation requires review by the PMC member of the application project, which normally takes one to two working days to be approved.

你好,缺陷和新功能需要在 Apache Jira 或 Flink 邮件列表([email protected])中反馈,而不是在这里创建新 issue。 GitHub 上的新 issue 会被忽略且自动关闭。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants