Add TestStartSyncJob()

This commit is contained in:
Gucheng Wang 2023-03-12 05:38:39 +08:00
parent 59566f61d7
commit c2eebd61a1
6 changed files with 178 additions and 124 deletions

View File

@ -1,17 +0,0 @@
package sync
var (
host1 = "127.0.0.1"
port1 = 3306
username1 = "root"
password1 = "123456"
database1 = "db"
)
var (
host2 = "127.0.0.1"
port2 = 3306
username2 = "root"
password2 = "123456"
database2 = "db"
)

100
sync/database.go Normal file
View File

@ -0,0 +1,100 @@
// Copyright 2023 The Casdoor Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sync
import (
"fmt"
"github.com/go-mysql-org/go-mysql/canal"
"github.com/xorm-io/xorm"
)
type Database struct {
host string
port int
database string
username string
password string
engine *xorm.Engine
serverId uint32
serverUuid string
Gtid string
canal.DummyEventHandler
}
func newDatabase(host string, port int, database string, username string, password string) *Database {
db := &Database{
host: host,
port: port,
database: database,
username: username,
password: password,
}
dataSourceName := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", username, password, host, port, database)
engine, err := createEngine(dataSourceName)
if err != nil {
panic(err)
}
db.engine = engine
db.serverId, err = getServerId(engine)
if err != nil {
panic(err)
}
db.serverUuid, err = getServerUuid(engine)
if err != nil {
panic(err)
}
return db
}
func (db *Database) getCanalConfig() *canal.Config {
// config canal
cfg := canal.NewDefaultConfig()
cfg.Addr = fmt.Sprintf("%s:%d", db.host, db.port)
cfg.Password = db.password
cfg.User = db.username
// We only care table in database1
cfg.Dump.TableDB = db.database
return cfg
}
func (db *Database) startCanal(targetDb *Database) error {
canalConfig := db.getCanalConfig()
c, err := canal.NewCanal(canalConfig)
if err != nil {
return err
}
gtidSet, err := c.GetMasterGTIDSet()
if err != nil {
return err
}
// Register a handler to handle RowsEvent
c.SetEventHandler(targetDb)
// Start replication
err = c.StartFromGTID(gtidSet)
if err != nil {
return err
}
return nil
}

View File

@ -17,84 +17,32 @@ package sync
import (
"fmt"
"strings"
"sync"
"github.com/go-mysql-org/go-mysql/canal"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/siddontang/go-log/log"
"github.com/xorm-io/xorm"
)
type Database struct {
dataSourceName string
engine *xorm.Engine
serverId uint32
serverUuid string
Gtid string
canal.DummyEventHandler
}
func StartCanal(cfg *canal.Config, username string, password string, host string, port int, database string) error {
c, err := canal.NewCanal(cfg)
if err != nil {
return err
}
gtidSet, err := c.GetMasterGTIDSet()
if err != nil {
return err
}
db := createDatabase(username, password, host, port, database)
// Register a handler to handle RowsEvent
c.SetEventHandler(&db)
// Start replication
err = c.StartFromGTID(gtidSet)
if err != nil {
return err
}
return nil
}
func StartBinlogSync() error {
var wg sync.WaitGroup
// init config
cfg1 := getCanalConfig(username1, password1, host1, port1, database1)
cfg2 := getCanalConfig(username2, password2, host2, port2, database2)
// start canal1 replication
go StartCanal(cfg1, username2, password2, host2, port2, database2)
wg.Add(1)
// start canal2 replication
go StartCanal(cfg2, username1, password1, host1, port1, database1)
wg.Add(1)
wg.Wait()
return nil
}
func (h *Database) OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error {
func (db *Database) OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error {
log.Info("OnGTID: ", gtid.String())
h.Gtid = gtid.String()
db.Gtid = gtid.String()
return nil
}
func (h *Database) onDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error {
func (db *Database) onDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error {
log.Info("into DDL event")
return nil
}
func (h *Database) OnRow(e *canal.RowsEvent) error {
func (db *Database) OnRow(e *canal.RowsEvent) error {
log.Info("serverId: ", e.Header.ServerID)
if strings.Contains(h.Gtid, h.serverUuid) {
if strings.Contains(db.Gtid, db.serverUuid) {
return nil
}
// Set the next gtid of the target library to the gtid of the current target library to avoid loopbacks
h.engine.Exec(fmt.Sprintf("SET GTID_NEXT= '%s'", h.Gtid))
db.engine.Exec(fmt.Sprintf("SET GTID_NEXT= '%s'", db.Gtid))
length := len(e.Table.Columns)
columnNames := make([]string, length)
oldColumnValue := make([]interface{}, length)
@ -114,7 +62,7 @@ func (h *Database) OnRow(e *canal.RowsEvent) error {
switch e.Action {
case canal.UpdateAction:
h.engine.Exec("BEGIN")
db.engine.Exec("BEGIN")
for i, row := range e.Rows {
for j, item := range row {
if i%2 == 0 {
@ -142,17 +90,17 @@ func (h *Database) OnRow(e *canal.RowsEvent) error {
return err
}
res, err := h.engine.DB().Exec(updateSql, args...)
res, err := db.engine.DB().Exec(updateSql, args...)
if err != nil {
return err
}
log.Info(updateSql, args, res)
}
}
h.engine.Exec("COMMIT")
h.engine.Exec("SET GTID_NEXT='automatic'")
db.engine.Exec("COMMIT")
db.engine.Exec("SET GTID_NEXT='automatic'")
case canal.DeleteAction:
h.engine.Exec("BEGIN")
db.engine.Exec("BEGIN")
for _, row := range e.Rows {
for j, item := range row {
if isChar[j] == true {
@ -168,16 +116,16 @@ func (h *Database) OnRow(e *canal.RowsEvent) error {
return err
}
res, err := h.engine.DB().Exec(deleteSql, args...)
res, err := db.engine.DB().Exec(deleteSql, args...)
if err != nil {
return err
}
log.Info(deleteSql, args, res)
}
h.engine.Exec("COMMIT")
h.engine.Exec("SET GTID_NEXT='automatic'")
db.engine.Exec("COMMIT")
db.engine.Exec("SET GTID_NEXT='automatic'")
case canal.InsertAction:
h.engine.Exec("BEGIN")
db.engine.Exec("BEGIN")
for _, row := range e.Rows {
for j, item := range row {
if isChar[j] == true {
@ -196,20 +144,20 @@ func (h *Database) OnRow(e *canal.RowsEvent) error {
return err
}
res, err := h.engine.DB().Exec(insertSql, args...)
res, err := db.engine.DB().Exec(insertSql, args...)
if err != nil {
return err
}
log.Info(insertSql, args, res)
}
h.engine.Exec("COMMIT")
h.engine.Exec("SET GTID_NEXT='automatic'")
db.engine.Exec("COMMIT")
db.engine.Exec("SET GTID_NEXT='automatic'")
default:
log.Infof("%v", e.String())
}
return nil
}
func (h *Database) String() string {
func (db *Database) String() string {
return "Database"
}

32
sync/sync.go Normal file
View File

@ -0,0 +1,32 @@
// Copyright 2023 The Casdoor Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sync
import "sync"
func startSyncJob(db1 *Database, db2 *Database) error {
var wg sync.WaitGroup
// start canal1 replication
go db1.startCanal(db2)
wg.Add(1)
// start canal2 replication
go db2.startCanal(db1)
wg.Add(1)
wg.Wait()
return nil
}

27
sync/sync_test.go Normal file
View File

@ -0,0 +1,27 @@
// Copyright 2023 The Casdoor Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sync
import (
"testing"
_ "github.com/go-sql-driver/mysql"
)
func TestStartSyncJob(t *testing.T) {
db1 := newDatabase("127.0.0.1", 3306, "casdoor", "root", "123456")
db2 := newDatabase("127.0.0.1", 3306, "casdoor2", "root", "123456")
startSyncJob(db1, db2)
}

View File

@ -19,8 +19,6 @@ import (
"log"
"strconv"
"github.com/go-mysql-org/go-mysql/canal"
"github.com/Masterminds/squirrel"
"github.com/xorm-io/xorm"
)
@ -108,37 +106,3 @@ func getPkColumnValues(columnValues []interface{}, PKColumns []int) []interface{
}
return pkColumnNames
}
func getCanalConfig(username string, password string, host string, port int, database string) *canal.Config {
// config canal
cfg := canal.NewDefaultConfig()
cfg.Addr = fmt.Sprintf("%s:%d", host, port)
cfg.Password = password
cfg.User = username
// We only care table in database1
cfg.Dump.TableDB = database
return cfg
}
func createDatabase(username string, password string, host string, port int, database string) Database {
var db Database
var err error
db.dataSourceName = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", username, password, host, port, database)
db.engine, err = createEngine(db.dataSourceName)
if err != nil {
panic(err)
}
db.serverId, err = getServerId(db.engine)
if err != nil {
panic(err)
}
db.serverUuid, err = getServerUuid(db.engine)
if err != nil {
panic(err)
}
return db
}