diff --git a/sync/conf.go b/sync/conf.go deleted file mode 100644 index 1821c310..00000000 --- a/sync/conf.go +++ /dev/null @@ -1,17 +0,0 @@ -package sync - -var ( - host1 = "127.0.0.1" - port1 = 3306 - username1 = "root" - password1 = "123456" - database1 = "db" -) - -var ( - host2 = "127.0.0.1" - port2 = 3306 - username2 = "root" - password2 = "123456" - database2 = "db" -) diff --git a/sync/database.go b/sync/database.go new file mode 100644 index 00000000..456a3ee5 --- /dev/null +++ b/sync/database.go @@ -0,0 +1,100 @@ +// Copyright 2023 The Casdoor Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sync + +import ( + "fmt" + + "github.com/go-mysql-org/go-mysql/canal" + "github.com/xorm-io/xorm" +) + +type Database struct { + host string + port int + database string + username string + password string + + engine *xorm.Engine + serverId uint32 + serverUuid string + Gtid string + canal.DummyEventHandler +} + +func newDatabase(host string, port int, database string, username string, password string) *Database { + db := &Database{ + host: host, + port: port, + database: database, + username: username, + password: password, + } + + dataSourceName := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", username, password, host, port, database) + engine, err := createEngine(dataSourceName) + if err != nil { + panic(err) + } + + db.engine = engine + + db.serverId, err = getServerId(engine) + if err != nil { + panic(err) + } + + db.serverUuid, err = getServerUuid(engine) + if err != nil { + panic(err) + } + + return db +} + +func (db *Database) getCanalConfig() *canal.Config { + // config canal + cfg := canal.NewDefaultConfig() + cfg.Addr = fmt.Sprintf("%s:%d", db.host, db.port) + cfg.Password = db.password + cfg.User = db.username + // We only care table in database1 + cfg.Dump.TableDB = db.database + return cfg +} + +func (db *Database) startCanal(targetDb *Database) error { + canalConfig := db.getCanalConfig() + c, err := canal.NewCanal(canalConfig) + if err != nil { + return err + } + + gtidSet, err := c.GetMasterGTIDSet() + if err != nil { + return err + } + + // Register a handler to handle RowsEvent + c.SetEventHandler(targetDb) + + // Start replication + err = c.StartFromGTID(gtidSet) + if err != nil { + return err + } + return nil +} diff --git a/sync/bi_sync_mysql.go b/sync/database_canal.go similarity index 63% rename from sync/bi_sync_mysql.go rename to sync/database_canal.go index 638ce5e4..927c6b61 100644 --- a/sync/bi_sync_mysql.go +++ b/sync/database_canal.go @@ -17,84 +17,32 @@ package sync import ( "fmt" "strings" - "sync" "github.com/go-mysql-org/go-mysql/canal" "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" "github.com/siddontang/go-log/log" - "github.com/xorm-io/xorm" ) -type Database struct { - dataSourceName string - engine *xorm.Engine - serverId uint32 - serverUuid string - Gtid string - canal.DummyEventHandler -} - -func StartCanal(cfg *canal.Config, username string, password string, host string, port int, database string) error { - c, err := canal.NewCanal(cfg) - if err != nil { - return err - } - - gtidSet, err := c.GetMasterGTIDSet() - if err != nil { - return err - } - - db := createDatabase(username, password, host, port, database) - // Register a handler to handle RowsEvent - c.SetEventHandler(&db) - - // Start replication - err = c.StartFromGTID(gtidSet) - if err != nil { - return err - } - return nil -} - -func StartBinlogSync() error { - var wg sync.WaitGroup - // init config - cfg1 := getCanalConfig(username1, password1, host1, port1, database1) - cfg2 := getCanalConfig(username2, password2, host2, port2, database2) - - // start canal1 replication - go StartCanal(cfg1, username2, password2, host2, port2, database2) - wg.Add(1) - - // start canal2 replication - go StartCanal(cfg2, username1, password1, host1, port1, database1) - wg.Add(1) - - wg.Wait() - return nil -} - -func (h *Database) OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error { +func (db *Database) OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error { log.Info("OnGTID: ", gtid.String()) - h.Gtid = gtid.String() + db.Gtid = gtid.String() return nil } -func (h *Database) onDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error { +func (db *Database) onDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error { log.Info("into DDL event") return nil } -func (h *Database) OnRow(e *canal.RowsEvent) error { +func (db *Database) OnRow(e *canal.RowsEvent) error { log.Info("serverId: ", e.Header.ServerID) - if strings.Contains(h.Gtid, h.serverUuid) { + if strings.Contains(db.Gtid, db.serverUuid) { return nil } // Set the next gtid of the target library to the gtid of the current target library to avoid loopbacks - h.engine.Exec(fmt.Sprintf("SET GTID_NEXT= '%s'", h.Gtid)) + db.engine.Exec(fmt.Sprintf("SET GTID_NEXT= '%s'", db.Gtid)) length := len(e.Table.Columns) columnNames := make([]string, length) oldColumnValue := make([]interface{}, length) @@ -114,7 +62,7 @@ func (h *Database) OnRow(e *canal.RowsEvent) error { switch e.Action { case canal.UpdateAction: - h.engine.Exec("BEGIN") + db.engine.Exec("BEGIN") for i, row := range e.Rows { for j, item := range row { if i%2 == 0 { @@ -142,17 +90,17 @@ func (h *Database) OnRow(e *canal.RowsEvent) error { return err } - res, err := h.engine.DB().Exec(updateSql, args...) + res, err := db.engine.DB().Exec(updateSql, args...) if err != nil { return err } log.Info(updateSql, args, res) } } - h.engine.Exec("COMMIT") - h.engine.Exec("SET GTID_NEXT='automatic'") + db.engine.Exec("COMMIT") + db.engine.Exec("SET GTID_NEXT='automatic'") case canal.DeleteAction: - h.engine.Exec("BEGIN") + db.engine.Exec("BEGIN") for _, row := range e.Rows { for j, item := range row { if isChar[j] == true { @@ -168,16 +116,16 @@ func (h *Database) OnRow(e *canal.RowsEvent) error { return err } - res, err := h.engine.DB().Exec(deleteSql, args...) + res, err := db.engine.DB().Exec(deleteSql, args...) if err != nil { return err } log.Info(deleteSql, args, res) } - h.engine.Exec("COMMIT") - h.engine.Exec("SET GTID_NEXT='automatic'") + db.engine.Exec("COMMIT") + db.engine.Exec("SET GTID_NEXT='automatic'") case canal.InsertAction: - h.engine.Exec("BEGIN") + db.engine.Exec("BEGIN") for _, row := range e.Rows { for j, item := range row { if isChar[j] == true { @@ -196,20 +144,20 @@ func (h *Database) OnRow(e *canal.RowsEvent) error { return err } - res, err := h.engine.DB().Exec(insertSql, args...) + res, err := db.engine.DB().Exec(insertSql, args...) if err != nil { return err } log.Info(insertSql, args, res) } - h.engine.Exec("COMMIT") - h.engine.Exec("SET GTID_NEXT='automatic'") + db.engine.Exec("COMMIT") + db.engine.Exec("SET GTID_NEXT='automatic'") default: log.Infof("%v", e.String()) } return nil } -func (h *Database) String() string { +func (db *Database) String() string { return "Database" } diff --git a/sync/sync.go b/sync/sync.go new file mode 100644 index 00000000..2221f5fd --- /dev/null +++ b/sync/sync.go @@ -0,0 +1,32 @@ +// Copyright 2023 The Casdoor Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sync + +import "sync" + +func startSyncJob(db1 *Database, db2 *Database) error { + var wg sync.WaitGroup + + // start canal1 replication + go db1.startCanal(db2) + wg.Add(1) + + // start canal2 replication + go db2.startCanal(db1) + wg.Add(1) + + wg.Wait() + return nil +} diff --git a/sync/sync_test.go b/sync/sync_test.go new file mode 100644 index 00000000..c2b9fd0b --- /dev/null +++ b/sync/sync_test.go @@ -0,0 +1,27 @@ +// Copyright 2023 The Casdoor Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sync + +import ( + "testing" + + _ "github.com/go-sql-driver/mysql" +) + +func TestStartSyncJob(t *testing.T) { + db1 := newDatabase("127.0.0.1", 3306, "casdoor", "root", "123456") + db2 := newDatabase("127.0.0.1", 3306, "casdoor2", "root", "123456") + startSyncJob(db1, db2) +} diff --git a/sync/util.go b/sync/util.go index a27169a4..f8f4cb62 100644 --- a/sync/util.go +++ b/sync/util.go @@ -19,8 +19,6 @@ import ( "log" "strconv" - "github.com/go-mysql-org/go-mysql/canal" - "github.com/Masterminds/squirrel" "github.com/xorm-io/xorm" ) @@ -108,37 +106,3 @@ func getPkColumnValues(columnValues []interface{}, PKColumns []int) []interface{ } return pkColumnNames } - -func getCanalConfig(username string, password string, host string, port int, database string) *canal.Config { - // config canal - cfg := canal.NewDefaultConfig() - cfg.Addr = fmt.Sprintf("%s:%d", host, port) - cfg.Password = password - cfg.User = username - // We only care table in database1 - cfg.Dump.TableDB = database - return cfg -} - -func createDatabase(username string, password string, host string, port int, database string) Database { - var db Database - var err error - - db.dataSourceName = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", username, password, host, port, database) - db.engine, err = createEngine(db.dataSourceName) - if err != nil { - panic(err) - } - - db.serverId, err = getServerId(db.engine) - if err != nil { - panic(err) - } - - db.serverUuid, err = getServerUuid(db.engine) - if err != nil { - panic(err) - } - - return db -}