From 59566f61d723c4f68bef48e96d9a5c9581ffdd61 Mon Sep 17 00:00:00 2001 From: Gucheng Wang Date: Sun, 12 Mar 2023 05:10:23 +0800 Subject: [PATCH] Refactor sync code --- sync/bi_sync_mysql.go | 46 +++++++++++++++++------------------ sync/{utils.go => util.go} | 50 ++++++++++++++++++++++++-------------- 2 files changed, 55 insertions(+), 41 deletions(-) rename sync/{utils.go => util.go} (72%) diff --git a/sync/bi_sync_mysql.go b/sync/bi_sync_mysql.go index 7f807d1b..638ce5e4 100644 --- a/sync/bi_sync_mysql.go +++ b/sync/bi_sync_mysql.go @@ -26,12 +26,12 @@ import ( "github.com/xorm-io/xorm" ) -type MyEventHandler struct { +type Database struct { dataSourceName string engine *xorm.Engine serverId uint32 - serverUUID string - GTID string + serverUuid string + Gtid string canal.DummyEventHandler } @@ -41,17 +41,17 @@ func StartCanal(cfg *canal.Config, username string, password string, host string return err } - GTIDSet, err := c.GetMasterGTIDSet() + gtidSet, err := c.GetMasterGTIDSet() if err != nil { return err } - eventHandler := GetMyEventHandler(username, password, host, port, database) + db := createDatabase(username, password, host, port, database) // Register a handler to handle RowsEvent - c.SetEventHandler(&eventHandler) + c.SetEventHandler(&db) // Start replication - err = c.StartFromGTID(GTIDSet) + err = c.StartFromGTID(gtidSet) if err != nil { return err } @@ -61,8 +61,8 @@ func StartCanal(cfg *canal.Config, username string, password string, host string func StartBinlogSync() error { var wg sync.WaitGroup // init config - cfg1 := GetCanalConfig(username1, password1, host1, port1, database1) - cfg2 := GetCanalConfig(username2, password2, host2, port2, database2) + cfg1 := getCanalConfig(username1, password1, host1, port1, database1) + cfg2 := getCanalConfig(username2, password2, host2, port2, database2) // start canal1 replication go StartCanal(cfg1, username2, password2, host2, port2, database2) @@ -76,25 +76,25 @@ func StartBinlogSync() error { return nil } -func (h *MyEventHandler) OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error { +func (h *Database) OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error { log.Info("OnGTID: ", gtid.String()) - h.GTID = gtid.String() + h.Gtid = gtid.String() return nil } -func (h *MyEventHandler) onDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error { +func (h *Database) onDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error { log.Info("into DDL event") return nil } -func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error { +func (h *Database) OnRow(e *canal.RowsEvent) error { log.Info("serverId: ", e.Header.ServerID) - if strings.Contains(h.GTID, h.serverUUID) { + if strings.Contains(h.Gtid, h.serverUuid) { return nil } // Set the next gtid of the target library to the gtid of the current target library to avoid loopbacks - h.engine.Exec(fmt.Sprintf("SET GTID_NEXT= '%s'", h.GTID)) + h.engine.Exec(fmt.Sprintf("SET GTID_NEXT= '%s'", h.Gtid)) length := len(e.Table.Columns) columnNames := make([]string, length) oldColumnValue := make([]interface{}, length) @@ -110,7 +110,7 @@ func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error { } } // get pk column name - pkColumnNames := GetPKColumnNames(columnNames, e.Table.PKColumns) + pkColumnNames := getPkColumnNames(columnNames, e.Table.PKColumns) switch e.Action { case canal.UpdateAction: @@ -136,8 +136,8 @@ func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error { } } if i%2 == 1 { - pkColumnValue := GetPKColumnValues(oldColumnValue, e.Table.PKColumns) - updateSql, args, err := GetUpdateSql(e.Table.Schema, e.Table.Name, columnNames, newColumnValue, pkColumnNames, pkColumnValue) + pkColumnValue := getPkColumnValues(oldColumnValue, e.Table.PKColumns) + updateSql, args, err := getUpdateSql(e.Table.Schema, e.Table.Name, columnNames, newColumnValue, pkColumnNames, pkColumnValue) if err != nil { return err } @@ -162,8 +162,8 @@ func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error { } } - pkColumnValue := GetPKColumnValues(oldColumnValue, e.Table.PKColumns) - deleteSql, args, err := GetDeleteSql(e.Table.Schema, e.Table.Name, pkColumnNames, pkColumnValue) + pkColumnValue := getPkColumnValues(oldColumnValue, e.Table.PKColumns) + deleteSql, args, err := getDeleteSql(e.Table.Schema, e.Table.Name, pkColumnNames, pkColumnValue) if err != nil { return err } @@ -191,7 +191,7 @@ func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error { } } - insertSql, args, err := GetInsertSql(e.Table.Schema, e.Table.Name, columnNames, newColumnValue) + insertSql, args, err := getInsertSql(e.Table.Schema, e.Table.Name, columnNames, newColumnValue) if err != nil { return err } @@ -210,6 +210,6 @@ func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error { return nil } -func (h *MyEventHandler) String() string { - return "MyEventHandler" +func (h *Database) String() string { + return "Database" } diff --git a/sync/utils.go b/sync/util.go similarity index 72% rename from sync/utils.go rename to sync/util.go index 1542172d..a27169a4 100644 --- a/sync/utils.go +++ b/sync/util.go @@ -25,7 +25,7 @@ import ( "github.com/xorm-io/xorm" ) -func GetUpdateSql(schemaName string, tableName string, columnNames []string, newColumnVal []interface{}, pkColumnNames []string, pkColumnValue []interface{}) (string, []interface{}, error) { +func getUpdateSql(schemaName string, tableName string, columnNames []string, newColumnVal []interface{}, pkColumnNames []string, pkColumnValue []interface{}) (string, []interface{}, error) { updateSql := squirrel.Update(schemaName + "." + tableName) for i, columnName := range columnNames { updateSql = updateSql.Set(columnName, newColumnVal[i]) @@ -43,13 +43,13 @@ func GetUpdateSql(schemaName string, tableName string, columnNames []string, new return sql, args, nil } -func GetInsertSql(schemaName string, tableName string, columnNames []string, columnValue []interface{}) (string, []interface{}, error) { +func getInsertSql(schemaName string, tableName string, columnNames []string, columnValue []interface{}) (string, []interface{}, error) { insertSql := squirrel.Insert(schemaName + "." + tableName).Columns(columnNames...).Values(columnValue...) return insertSql.ToSql() } -func GetDeleteSql(schemaName string, tableName string, pkColumnNames []string, pkColumnValue []interface{}) (string, []interface{}, error) { +func getDeleteSql(schemaName string, tableName string, pkColumnNames []string, pkColumnValue []interface{}) (string, []interface{}, error) { deleteSql := squirrel.Delete(schemaName + "." + tableName) for i, columnName := range pkColumnNames { @@ -59,7 +59,7 @@ func GetDeleteSql(schemaName string, tableName string, pkColumnNames []string, p return deleteSql.ToSql() } -func CreateEngine(dataSourceName string) (*xorm.Engine, error) { +func createEngine(dataSourceName string) (*xorm.Engine, error) { engine, err := xorm.NewEngine("mysql", dataSourceName) if err != nil { return nil, err @@ -75,7 +75,7 @@ func CreateEngine(dataSourceName string) (*xorm.Engine, error) { return engine, nil } -func GetServerId(engin *xorm.Engine) (uint32, error) { +func getServerId(engin *xorm.Engine) (uint32, error) { res, err := engin.QueryInterface("SELECT @@server_id") if err != nil { return 0, err @@ -84,16 +84,16 @@ func GetServerId(engin *xorm.Engine) (uint32, error) { return uint32(serverId), nil } -func GetServerUUID(engin *xorm.Engine) (string, error) { +func getServerUuid(engin *xorm.Engine) (string, error) { res, err := engin.QueryString("show variables like 'server_uuid'") if err != nil { return "", err } - serverUUID := fmt.Sprintf("%s", res[0]["Value"]) - return serverUUID, err + serverUuid := fmt.Sprintf("%s", res[0]["Value"]) + return serverUuid, err } -func GetPKColumnNames(columnNames []string, PKColumns []int) []string { +func getPkColumnNames(columnNames []string, PKColumns []int) []string { pkColumnNames := make([]string, len(PKColumns)) for i, index := range PKColumns { pkColumnNames[i] = columnNames[index] @@ -101,7 +101,7 @@ func GetPKColumnNames(columnNames []string, PKColumns []int) []string { return pkColumnNames } -func GetPKColumnValues(columnValues []interface{}, PKColumns []int) []interface{} { +func getPkColumnValues(columnValues []interface{}, PKColumns []int) []interface{} { pkColumnNames := make([]interface{}, len(PKColumns)) for i, index := range PKColumns { pkColumnNames[i] = columnValues[index] @@ -109,7 +109,7 @@ func GetPKColumnValues(columnValues []interface{}, PKColumns []int) []interface{ return pkColumnNames } -func GetCanalConfig(username string, password string, host string, port int, database string) *canal.Config { +func getCanalConfig(username string, password string, host string, port int, database string) *canal.Config { // config canal cfg := canal.NewDefaultConfig() cfg.Addr = fmt.Sprintf("%s:%d", host, port) @@ -120,11 +120,25 @@ func GetCanalConfig(username string, password string, host string, port int, dat return cfg } -func GetMyEventHandler(username string, password string, host string, port int, database string) MyEventHandler { - var eventHandler MyEventHandler - eventHandler.dataSourceName = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", username, password, host, port, database) - eventHandler.engine, _ = CreateEngine(eventHandler.dataSourceName) - eventHandler.serverId, _ = GetServerId(eventHandler.engine) - eventHandler.serverUUID, _ = GetServerUUID(eventHandler.engine) - return eventHandler +func createDatabase(username string, password string, host string, port int, database string) Database { + var db Database + var err error + + db.dataSourceName = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", username, password, host, port, database) + db.engine, err = createEngine(db.dataSourceName) + if err != nil { + panic(err) + } + + db.serverId, err = getServerId(db.engine) + if err != nil { + panic(err) + } + + db.serverUuid, err = getServerUuid(db.engine) + if err != nil { + panic(err) + } + + return db }