Skip to content

Commit 8a4758c

Browse files
committed
Update sync code
1 parent ee3b46e commit 8a4758c

File tree

4 files changed

+38
-14
lines changed

4 files changed

+38
-14
lines changed

sync/database_canal.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,12 @@ func (db *Database) onDDL(header *replication.EventHeader, nextPos mysql.Positio
3636
}
3737

3838
func (db *Database) OnRow(e *canal.RowsEvent) error {
39-
log.Info("serverId: ", e.Header.ServerID)
39+
if e.Header != nil {
40+
log.Info("serverId: ", e.Header.ServerID)
41+
} else {
42+
log.Info("serverId: e.Header == nil")
43+
}
44+
4045
if strings.Contains(db.Gtid, db.serverUuid) {
4146
return nil
4247
}
@@ -87,11 +92,13 @@ func (db *Database) OnRow(e *canal.RowsEvent) error {
8792
pkColumnValue := getPkColumnValues(oldColumnValue, e.Table.PKColumns)
8893
updateSql, args, err := getUpdateSql(e.Table.Schema, e.Table.Name, columnNames, newColumnValue, pkColumnNames, pkColumnValue)
8994
if err != nil {
95+
log.Error(err)
9096
return err
9197
}
9298

9399
res, err := db.engine.DB().Exec(updateSql, args...)
94100
if err != nil {
101+
log.Error(err)
95102
return err
96103
}
97104
log.Info(updateSql, args, res)
@@ -113,11 +120,13 @@ func (db *Database) OnRow(e *canal.RowsEvent) error {
113120
pkColumnValue := getPkColumnValues(oldColumnValue, e.Table.PKColumns)
114121
deleteSql, args, err := getDeleteSql(e.Table.Schema, e.Table.Name, pkColumnNames, pkColumnValue)
115122
if err != nil {
123+
log.Error(err)
116124
return err
117125
}
118126

119127
res, err := db.engine.DB().Exec(deleteSql, args...)
120128
if err != nil {
129+
log.Error(err)
121130
return err
122131
}
123132
log.Info(deleteSql, args, res)
@@ -141,11 +150,13 @@ func (db *Database) OnRow(e *canal.RowsEvent) error {
141150

142151
insertSql, args, err := getInsertSql(e.Table.Schema, e.Table.Name, columnNames, newColumnValue)
143152
if err != nil {
153+
log.Error(err)
144154
return err
145155
}
146156

147157
res, err := db.engine.DB().Exec(insertSql, args...)
148158
if err != nil {
159+
log.Error(err)
149160
return err
150161
}
151162
log.Info(insertSql, args, res)

sync/sync.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,21 @@ func startSyncJob(db1 *Database, db2 *Database) error {
2020
var wg sync.WaitGroup
2121

2222
// start canal1 replication
23-
go db1.startCanal(db2)
23+
go func(db1 *Database, db2 *Database) {
24+
err := db1.startCanal(db2)
25+
if err != nil {
26+
panic(err)
27+
}
28+
}(db1, db2)
2429
wg.Add(1)
2530

2631
// start canal2 replication
27-
go db2.startCanal(db1)
32+
go func(db1 *Database, db2 *Database) {
33+
err := db2.startCanal(db1)
34+
if err != nil {
35+
panic(err)
36+
}
37+
}(db1, db2)
2838
wg.Add(1)
2939

3040
wg.Wait()

sync/sync_test.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ import (
2424
)
2525

2626
func TestStartSyncJob(t *testing.T) {
27-
db1 := newDatabase("127.0.0.1", 3306, "casdoor", "root", "123456")
28-
db2 := newDatabase("127.0.0.1", 3306, "casdoor2", "root", "123456")
29-
startSyncJob(db1, db2)
27+
db1 := newDatabase("localhost", 3306, "casdoor", "root", "123456")
28+
db2 := newDatabase("localhost", 3306, "casdoor2", "root", "123456")
29+
err := startSyncJob(db1, db2)
30+
if err != nil {
31+
panic(err)
32+
}
3033
}

sync/util.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515
package sync
1616

1717
import (
18-
"fmt"
1918
"log"
20-
"strconv"
2119

2220
"github.com/Masterminds/squirrel"
2321
"github.com/xorm-io/xorm"
@@ -74,21 +72,23 @@ func createEngine(dataSourceName string) (*xorm.Engine, error) {
7472
}
7573

7674
func getServerId(engin *xorm.Engine) (uint32, error) {
77-
res, err := engin.QueryInterface("SELECT @@server_id")
75+
record, err := engin.QueryInterface("SELECT @@server_id")
7876
if err != nil {
7977
return 0, err
8078
}
81-
serverId, _ := strconv.ParseUint(fmt.Sprintf("%s", res[0]["@@server_id"]), 10, 32)
82-
return uint32(serverId), nil
79+
80+
res := uint32(record[0]["@@server_id"].(int64))
81+
return res, nil
8382
}
8483

8584
func getServerUuid(engin *xorm.Engine) (string, error) {
86-
res, err := engin.QueryString("show variables like 'server_uuid'")
85+
record, err := engin.QueryString("show variables like 'server_uuid'")
8786
if err != nil {
8887
return "", err
8988
}
90-
serverUuid := fmt.Sprintf("%s", res[0]["Value"])
91-
return serverUuid, err
89+
90+
res := record[0]["Value"]
91+
return res, err
9292
}
9393

9494
func getPkColumnNames(columnNames []string, PKColumns []int) []string {

0 commit comments

Comments
 (0)