diff --git a/sync/bi_sync_mysql.go b/sync/bi_sync_mysql.go index ef9be6ac..7f807d1b 100644 --- a/sync/bi_sync_mysql.go +++ b/sync/bi_sync_mysql.go @@ -16,6 +16,8 @@ package sync import ( "fmt" + "strings" + "sync" "github.com/go-mysql-org/go-mysql/canal" "github.com/go-mysql-org/go-mysql/mysql" @@ -24,60 +26,59 @@ import ( "github.com/xorm-io/xorm" ) -var ( - dataSourceName1 string - dataSourceName2 string - engin1 *xorm.Engine - engin2 *xorm.Engine -) - -func InitConfig() *canal.Config { - // init dataSource - dataSourceName1 = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", username1, password1, host1, port1, database1) - dataSourceName2 = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", username2, password2, host2, port2, database2) - - // create engine - engin1, _ = CreateEngine(dataSourceName1) - engin2, _ = CreateEngine(dataSourceName2) - log.Info("init engine success…") - - // config canal - cfg := canal.NewDefaultConfig() - cfg.Addr = fmt.Sprintf("%s:%d", host1, port1) - cfg.Password = password1 - cfg.User = username1 - // We only care table in database1 - cfg.Dump.TableDB = database1 - // cfg.Dump.Tables = []string{"user"} - log.Info("config canal success…") - return cfg +type MyEventHandler struct { + dataSourceName string + engine *xorm.Engine + serverId uint32 + serverUUID string + GTID string + canal.DummyEventHandler } -func StartBinlogSync() error { - // init config - config := InitConfig() - - c, err := canal.NewCanal(config) - pos, err := c.GetMasterPos() +func StartCanal(cfg *canal.Config, username string, password string, host string, port int, database string) error { + c, err := canal.NewCanal(cfg) if err != nil { return err } + GTIDSet, err := c.GetMasterGTIDSet() + if err != nil { + return err + } + + eventHandler := GetMyEventHandler(username, password, host, port, database) // Register a handler to handle RowsEvent - c.SetEventHandler(&MyEventHandler{}) - - // Start canal - c.RunFrom(pos) + c.SetEventHandler(&eventHandler) + // Start replication + err = c.StartFromGTID(GTIDSet) + if err != nil { + return err + } return nil } -type MyEventHandler struct { - canal.DummyEventHandler +func StartBinlogSync() error { + var wg sync.WaitGroup + // init config + cfg1 := GetCanalConfig(username1, password1, host1, port1, database1) + cfg2 := GetCanalConfig(username2, password2, host2, port2, database2) + + // start canal1 replication + go StartCanal(cfg1, username2, password2, host2, port2, database2) + wg.Add(1) + + // start canal2 replication + go StartCanal(cfg2, username1, password1, host1, port1, database1) + wg.Add(1) + + wg.Wait() + return nil } -func OnTableChanged(header *replication.EventHeader, schema string, table string) error { - log.Info("table changed event") +func (h *MyEventHandler) OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error { + log.Info("OnGTID: ", gtid.String()) + h.GTID = gtid.String() return nil } @@ -87,6 +88,13 @@ func (h *MyEventHandler) onDDL(header *replication.EventHeader, nextPos mysql.Po } func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error { + log.Info("serverId: ", e.Header.ServerID) + if strings.Contains(h.GTID, h.serverUUID) { + return nil + } + + // Set the next gtid of the target library to the gtid of the current target library to avoid loopbacks + h.engine.Exec(fmt.Sprintf("SET GTID_NEXT= '%s'", h.GTID)) length := len(e.Table.Columns) columnNames := make([]string, length) oldColumnValue := make([]interface{}, length) @@ -101,9 +109,12 @@ func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error { isChar[i] = true } } + // get pk column name + pkColumnNames := GetPKColumnNames(columnNames, e.Table.PKColumns) switch e.Action { case canal.UpdateAction: + h.engine.Exec("BEGIN") for i, row := range e.Rows { for j, item := range row { if i%2 == 0 { @@ -114,27 +125,34 @@ func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error { } } else { if isChar[j] == true { - newColumnValue[j] = fmt.Sprintf("%s", item) + if item == nil { + newColumnValue[j] = nil + } else { + newColumnValue[j] = fmt.Sprintf("%s", item) + } } else { newColumnValue[j] = fmt.Sprintf("%d", item) } } } - if i%2 == 1 { - updateSql, args, err := GetUpdateSql(e.Table.Schema, e.Table.Name, columnNames, newColumnValue, oldColumnValue) + pkColumnValue := GetPKColumnValues(oldColumnValue, e.Table.PKColumns) + updateSql, args, err := GetUpdateSql(e.Table.Schema, e.Table.Name, columnNames, newColumnValue, pkColumnNames, pkColumnValue) if err != nil { return err } - res, err := engin2.DB().Exec(updateSql, args...) + res, err := h.engine.DB().Exec(updateSql, args...) if err != nil { return err } log.Info(updateSql, args, res) } } + h.engine.Exec("COMMIT") + h.engine.Exec("SET GTID_NEXT='automatic'") case canal.DeleteAction: + h.engine.Exec("BEGIN") for _, row := range e.Rows { for j, item := range row { if isChar[j] == true { @@ -144,22 +162,30 @@ func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error { } } - deleteSql, args, err := GetDeleteSql(e.Table.Schema, e.Table.Name, columnNames, oldColumnValue) + pkColumnValue := GetPKColumnValues(oldColumnValue, e.Table.PKColumns) + deleteSql, args, err := GetDeleteSql(e.Table.Schema, e.Table.Name, pkColumnNames, pkColumnValue) if err != nil { return err } - res, err := engin2.DB().Exec(deleteSql, args...) + res, err := h.engine.DB().Exec(deleteSql, args...) if err != nil { return err } log.Info(deleteSql, args, res) } + h.engine.Exec("COMMIT") + h.engine.Exec("SET GTID_NEXT='automatic'") case canal.InsertAction: + h.engine.Exec("BEGIN") for _, row := range e.Rows { for j, item := range row { if isChar[j] == true { - newColumnValue[j] = fmt.Sprintf("%s", item) + if item == nil { + newColumnValue[j] = nil + } else { + newColumnValue[j] = fmt.Sprintf("%s", item) + } } else { newColumnValue[j] = fmt.Sprintf("%d", item) } @@ -170,12 +196,14 @@ func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error { return err } - res, err := engin2.DB().Exec(insertSql, args...) + res, err := h.engine.DB().Exec(insertSql, args...) if err != nil { return err } log.Info(insertSql, args, res) } + h.engine.Exec("COMMIT") + h.engine.Exec("SET GTID_NEXT='automatic'") default: log.Infof("%v", e.String()) } diff --git a/sync/utils.go b/sync/utils.go index 1cd1b4ab..1542172d 100644 --- a/sync/utils.go +++ b/sync/utils.go @@ -15,20 +15,24 @@ package sync import ( + "fmt" "log" + "strconv" + + "github.com/go-mysql-org/go-mysql/canal" "github.com/Masterminds/squirrel" "github.com/xorm-io/xorm" ) -func GetUpdateSql(schemaName string, tableName string, columnNames []string, newColumnVal []interface{}, oldColumnVal []interface{}) (string, []interface{}, error) { +func GetUpdateSql(schemaName string, tableName string, columnNames []string, newColumnVal []interface{}, pkColumnNames []string, pkColumnValue []interface{}) (string, []interface{}, error) { updateSql := squirrel.Update(schemaName + "." + tableName) for i, columnName := range columnNames { updateSql = updateSql.Set(columnName, newColumnVal[i]) } - for i, columnName := range columnNames { - updateSql = updateSql.Where(squirrel.Eq{columnName: oldColumnVal[i]}) + for i, pkColumnName := range pkColumnNames { + updateSql = updateSql.Where(squirrel.Eq{pkColumnName: pkColumnValue[i]}) } sql, args, err := updateSql.ToSql() @@ -45,11 +49,11 @@ func GetInsertSql(schemaName string, tableName string, columnNames []string, col return insertSql.ToSql() } -func GetDeleteSql(schemaName string, tableName string, columnNames []string, columnValue []interface{}) (string, []interface{}, error) { +func GetDeleteSql(schemaName string, tableName string, pkColumnNames []string, pkColumnValue []interface{}) (string, []interface{}, error) { deleteSql := squirrel.Delete(schemaName + "." + tableName) - for i, columnName := range columnNames { - deleteSql = deleteSql.Where(squirrel.Eq{columnName: columnValue[i]}) + for i, columnName := range pkColumnNames { + deleteSql = deleteSql.Where(squirrel.Eq{columnName: pkColumnValue[i]}) } return deleteSql.ToSql() @@ -70,3 +74,57 @@ func CreateEngine(dataSourceName string) (*xorm.Engine, error) { log.Println("mysql connection success……") return engine, nil } + +func GetServerId(engin *xorm.Engine) (uint32, error) { + res, err := engin.QueryInterface("SELECT @@server_id") + if err != nil { + return 0, err + } + serverId, _ := strconv.ParseUint(fmt.Sprintf("%s", res[0]["@@server_id"]), 10, 32) + return uint32(serverId), nil +} + +func GetServerUUID(engin *xorm.Engine) (string, error) { + res, err := engin.QueryString("show variables like 'server_uuid'") + if err != nil { + return "", err + } + serverUUID := fmt.Sprintf("%s", res[0]["Value"]) + return serverUUID, err +} + +func GetPKColumnNames(columnNames []string, PKColumns []int) []string { + pkColumnNames := make([]string, len(PKColumns)) + for i, index := range PKColumns { + pkColumnNames[i] = columnNames[index] + } + return pkColumnNames +} + +func GetPKColumnValues(columnValues []interface{}, PKColumns []int) []interface{} { + pkColumnNames := make([]interface{}, len(PKColumns)) + for i, index := range PKColumns { + pkColumnNames[i] = columnValues[index] + } + return pkColumnNames +} + +func GetCanalConfig(username string, password string, host string, port int, database string) *canal.Config { + // config canal + cfg := canal.NewDefaultConfig() + cfg.Addr = fmt.Sprintf("%s:%d", host, port) + cfg.Password = password + cfg.User = username + // We only care table in database1 + cfg.Dump.TableDB = database + return cfg +} + +func GetMyEventHandler(username string, password string, host string, port int, database string) MyEventHandler { + var eventHandler MyEventHandler + eventHandler.dataSourceName = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", username, password, host, port, database) + eventHandler.engine, _ = CreateEngine(eventHandler.dataSourceName) + eventHandler.serverId, _ = GetServerId(eventHandler.engine) + eventHandler.serverUUID, _ = GetServerUUID(eventHandler.engine) + return eventHandler +}