Update sync code

This commit is contained in:
Yang Luo 2023-10-22 11:56:56 +08:00
parent ee3b46e91c
commit 8a4758c22d
4 changed files with 38 additions and 14 deletions

View File

@ -36,7 +36,12 @@ func (db *Database) onDDL(header *replication.EventHeader, nextPos mysql.Positio
} }
func (db *Database) OnRow(e *canal.RowsEvent) error { func (db *Database) OnRow(e *canal.RowsEvent) error {
log.Info("serverId: ", e.Header.ServerID) if e.Header != nil {
log.Info("serverId: ", e.Header.ServerID)
} else {
log.Info("serverId: e.Header == nil")
}
if strings.Contains(db.Gtid, db.serverUuid) { if strings.Contains(db.Gtid, db.serverUuid) {
return nil return nil
} }
@ -87,11 +92,13 @@ func (db *Database) OnRow(e *canal.RowsEvent) error {
pkColumnValue := getPkColumnValues(oldColumnValue, e.Table.PKColumns) pkColumnValue := getPkColumnValues(oldColumnValue, e.Table.PKColumns)
updateSql, args, err := getUpdateSql(e.Table.Schema, e.Table.Name, columnNames, newColumnValue, pkColumnNames, pkColumnValue) updateSql, args, err := getUpdateSql(e.Table.Schema, e.Table.Name, columnNames, newColumnValue, pkColumnNames, pkColumnValue)
if err != nil { if err != nil {
log.Error(err)
return err return err
} }
res, err := db.engine.DB().Exec(updateSql, args...) res, err := db.engine.DB().Exec(updateSql, args...)
if err != nil { if err != nil {
log.Error(err)
return err return err
} }
log.Info(updateSql, args, res) log.Info(updateSql, args, res)
@ -113,11 +120,13 @@ func (db *Database) OnRow(e *canal.RowsEvent) error {
pkColumnValue := getPkColumnValues(oldColumnValue, e.Table.PKColumns) pkColumnValue := getPkColumnValues(oldColumnValue, e.Table.PKColumns)
deleteSql, args, err := getDeleteSql(e.Table.Schema, e.Table.Name, pkColumnNames, pkColumnValue) deleteSql, args, err := getDeleteSql(e.Table.Schema, e.Table.Name, pkColumnNames, pkColumnValue)
if err != nil { if err != nil {
log.Error(err)
return err return err
} }
res, err := db.engine.DB().Exec(deleteSql, args...) res, err := db.engine.DB().Exec(deleteSql, args...)
if err != nil { if err != nil {
log.Error(err)
return err return err
} }
log.Info(deleteSql, args, res) log.Info(deleteSql, args, res)
@ -141,11 +150,13 @@ func (db *Database) OnRow(e *canal.RowsEvent) error {
insertSql, args, err := getInsertSql(e.Table.Schema, e.Table.Name, columnNames, newColumnValue) insertSql, args, err := getInsertSql(e.Table.Schema, e.Table.Name, columnNames, newColumnValue)
if err != nil { if err != nil {
log.Error(err)
return err return err
} }
res, err := db.engine.DB().Exec(insertSql, args...) res, err := db.engine.DB().Exec(insertSql, args...)
if err != nil { if err != nil {
log.Error(err)
return err return err
} }
log.Info(insertSql, args, res) log.Info(insertSql, args, res)

View File

@ -20,11 +20,21 @@ func startSyncJob(db1 *Database, db2 *Database) error {
var wg sync.WaitGroup var wg sync.WaitGroup
// start canal1 replication // start canal1 replication
go db1.startCanal(db2) go func(db1 *Database, db2 *Database) {
err := db1.startCanal(db2)
if err != nil {
panic(err)
}
}(db1, db2)
wg.Add(1) wg.Add(1)
// start canal2 replication // start canal2 replication
go db2.startCanal(db1) go func(db1 *Database, db2 *Database) {
err := db2.startCanal(db1)
if err != nil {
panic(err)
}
}(db1, db2)
wg.Add(1) wg.Add(1)
wg.Wait() wg.Wait()

View File

@ -24,7 +24,10 @@ import (
) )
func TestStartSyncJob(t *testing.T) { func TestStartSyncJob(t *testing.T) {
db1 := newDatabase("127.0.0.1", 3306, "casdoor", "root", "123456") db1 := newDatabase("localhost", 3306, "casdoor", "root", "123456")
db2 := newDatabase("127.0.0.1", 3306, "casdoor2", "root", "123456") db2 := newDatabase("localhost", 3306, "casdoor2", "root", "123456")
startSyncJob(db1, db2) err := startSyncJob(db1, db2)
if err != nil {
panic(err)
}
} }

View File

@ -15,9 +15,7 @@
package sync package sync
import ( import (
"fmt"
"log" "log"
"strconv"
"github.com/Masterminds/squirrel" "github.com/Masterminds/squirrel"
"github.com/xorm-io/xorm" "github.com/xorm-io/xorm"
@ -74,21 +72,23 @@ func createEngine(dataSourceName string) (*xorm.Engine, error) {
} }
func getServerId(engin *xorm.Engine) (uint32, error) { func getServerId(engin *xorm.Engine) (uint32, error) {
res, err := engin.QueryInterface("SELECT @@server_id") record, err := engin.QueryInterface("SELECT @@server_id")
if err != nil { if err != nil {
return 0, err return 0, err
} }
serverId, _ := strconv.ParseUint(fmt.Sprintf("%s", res[0]["@@server_id"]), 10, 32)
return uint32(serverId), nil res := uint32(record[0]["@@server_id"].(int64))
return res, nil
} }
func getServerUuid(engin *xorm.Engine) (string, error) { func getServerUuid(engin *xorm.Engine) (string, error) {
res, err := engin.QueryString("show variables like 'server_uuid'") record, err := engin.QueryString("show variables like 'server_uuid'")
if err != nil { if err != nil {
return "", err return "", err
} }
serverUuid := fmt.Sprintf("%s", res[0]["Value"])
return serverUuid, err res := record[0]["Value"]
return res, err
} }
func getPkColumnNames(columnNames []string, PKColumns []int) []string { func getPkColumnNames(columnNames []string, PKColumns []int) []string {