ci: support MySQL data sync (#2443)

* feat: support tool for mysql master-slave sync

* feat: support mysql master-master sync

* feat: improve log

* feat: improve code

* fix: fix bug when len(res) ==0

* fix: fix bug when len(res) ==0

* feat: support master-slave sync

* feat: add deleteSlaveUser for TestStopMasterSlaveSync

* feat: add deleteSlaveUser for TestStopMasterSlaveSync
This commit is contained in:
haiwu 2023-10-31 21:00:09 +08:00 committed by GitHub
parent 49c6ce2221
commit b285144a64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 425 additions and 0 deletions

116
sync_v2/cmd_test.go Normal file
View File

@ -0,0 +1,116 @@
// Copyright 2023 The Casdoor Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !skipCi
// +build !skipCi
package sync_v2
import (
"testing"
_ "github.com/go-sql-driver/mysql"
)
/*
The following config should be added to my.cnf:
gtid_mode=on
enforce_gtid_consistency=on
binlog-format=ROW
server-id = 1 # this should be different for each mysql instance (1,2)
auto_increment_offset = 1 # this is same as server-id
auto_increment_increment = 2 # this is same as the number of mysql instances (2)
log-bin = mysql-bin
replicate-do-db = casdoor # this is the database name
binlog-do-db = casdoor # this is the database name
*/
var Configs = []Database{
{
host: "test-db.v2tl.com",
port: 3306,
username: "root",
password: "password",
database: "casdoor",
// the following two fields are used to create replication user, you don't need to change them
slaveUser: "repl_user",
slavePassword: "repl_user",
},
{
host: "localhost",
port: 3306,
username: "root",
password: "password",
database: "casdoor",
// the following two fields are used to create replication user, you don't need to change them
slaveUser: "repl_user",
slavePassword: "repl_user",
},
}
func TestStartMasterSlaveSync(t *testing.T) {
// for example, this is aliyun rds
db0 := newDatabase(&Configs[0])
// for example, this is local mysql instance
db1 := newDatabase(&Configs[1])
createSlaveUser(db0)
// db0 is master, db1 is slave
startSlave(db0, db1)
}
func TestStopMasterSlaveSync(t *testing.T) {
// for example, this is aliyun rds
db0 := newDatabase(&Configs[0])
// for example, this is local mysql instance
db1 := newDatabase(&Configs[1])
stopSlave(db1)
deleteSlaveUser(db0)
}
func TestStartMasterMasterSync(t *testing.T) {
db0 := newDatabase(&Configs[0])
db1 := newDatabase(&Configs[1])
createSlaveUser(db0)
createSlaveUser(db1)
// db0 is master, db1 is slave
startSlave(db0, db1)
// db1 is master, db0 is slave
startSlave(db1, db0)
}
func TestStopMasterMasterSync(t *testing.T) {
db0 := newDatabase(&Configs[0])
db1 := newDatabase(&Configs[1])
stopSlave(db0)
stopSlave(db1)
deleteSlaveUser(db0)
deleteSlaveUser(db1)
}
func TestShowSlaveStatus(t *testing.T) {
db0 := newDatabase(&Configs[0])
db1 := newDatabase(&Configs[1])
slaveStatus(db0)
slaveStatus(db1)
}
func TestShowMasterStatus(t *testing.T) {
db0 := newDatabase(&Configs[0])
db1 := newDatabase(&Configs[1])
masterStatus(db0)
masterStatus(db1)
}

70
sync_v2/db.go Normal file
View File

@ -0,0 +1,70 @@
// Copyright 2023 The Casdoor Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sync_v2
import (
"fmt"
"log"
"github.com/xorm-io/xorm"
)
type Database struct {
host string
port int
database string
username string
password string
slaveUser string
slavePassword string
engine *xorm.Engine
}
func (db *Database) exec(format string, args ...interface{}) []map[string]string {
sql := fmt.Sprintf(format, args...)
res, err := db.engine.QueryString(sql)
if err != nil {
panic(err)
}
return res
}
func createEngine(dataSourceName string) (*xorm.Engine, error) {
engine, err := xorm.NewEngine("mysql", dataSourceName)
if err != nil {
return nil, err
}
// ping mysql
err = engine.Ping()
if err != nil {
return nil, err
}
engine.ShowSQL(true)
log.Println("mysql connection success")
return engine, nil
}
func newDatabase(db *Database) *Database {
dataSourceName := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", db.username, db.password, db.host, db.port, db.database)
engine, err := createEngine(dataSourceName)
if err != nil {
panic(err)
}
db.engine = engine
return db
}

89
sync_v2/master.go Normal file
View File

@ -0,0 +1,89 @@
// Copyright 2023 The Casdoor Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sync_v2
import (
"fmt"
"log"
)
func deleteSlaveUser(masterdb *Database) {
defer func() {
if err := recover(); err != nil {
log.Fatalln(err)
}
}()
masterdb.exec("delete from mysql.user where user = '%v'", masterdb.slaveUser)
masterdb.exec("flush privileges")
}
func createSlaveUser(masterdb *Database) {
res := make([]map[string]string, 0)
defer func() {
if err := recover(); err != nil {
log.Fatalln(err)
}
}()
res = masterdb.exec("show databases")
dbNames := make([]string, 0, len(res))
for _, dbInfo := range res {
dbName := dbInfo["Database"]
dbNames = append(dbNames, dbName)
}
log.Println("dbs in mysql: ", dbNames)
res = masterdb.exec("show tables")
tableNames := make([]string, 0, len(res))
for _, table := range res {
tableName := table[fmt.Sprintf("Tables_in_%v", masterdb.database)]
tableNames = append(tableNames, tableName)
}
log.Printf("tables in %v: %v", masterdb.database, tableNames)
// delete user to prevent user already exists
res = masterdb.exec("delete from mysql.user where user = '%v'", masterdb.slaveUser)
res = masterdb.exec("flush privileges")
// create replication user
res = masterdb.exec("create user '%s'@'%s' identified by '%s'", masterdb.slaveUser, "%", masterdb.slavePassword)
res = masterdb.exec("select host, user from mysql.user where user = '%v'", masterdb.slaveUser)
log.Println("user: ", res[0])
res = masterdb.exec("grant replication slave on *.* to '%s'@'%s'", masterdb.slaveUser, "%")
res = masterdb.exec("flush privileges")
res = masterdb.exec("show grants for '%s'@'%s'", masterdb.slaveUser, "%")
log.Println("grants: ", res[0])
// check env
res = masterdb.exec("show variables like 'server_id'")
log.Println("server_id: ", res[0]["Value"])
res = masterdb.exec("show variables like 'log_bin'")
log.Println("log_bin: ", res[0]["Value"])
res = masterdb.exec("show variables like 'binlog_format'")
log.Println("binlog_format: ", res[0]["Value"])
res = masterdb.exec("show variables like 'binlog_row_image'")
}
func masterStatus(masterdb *Database) {
res := masterdb.exec("show master status")
if len(res) == 0 {
log.Printf("no master status for master [%v:%v]\n", masterdb.host, masterdb.port)
return
}
pos := res[0]["Position"]
file := res[0]["File"]
log.Println("*****check master status*****")
log.Println("master:", masterdb.host, ":", masterdb.port)
log.Println("file:", file, ", position:", pos, ", master status:", res)
log.Println("*****************************")
}

84
sync_v2/slave.go Normal file
View File

@ -0,0 +1,84 @@
// Copyright 2023 The Casdoor Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sync_v2
import "log"
// slaveStatus shows slave status
func slaveStatus(slavedb *Database) {
res := slavedb.exec("show slave status")
if len(res) == 0 {
log.Printf("no slave status for slave [%v:%v]\n", slavedb.host, slavedb.port)
return
}
log.Println("*****check slave status*****")
log.Println("slave:", slavedb.host, ":", slavedb.port)
masterServerId := res[0]["Master_Server_Id"]
log.Println("master server id:", masterServerId)
lastError := res[0]["Last_Error"]
log.Println("last error:", lastError) // this should be empty
lastIoError := res[0]["Last_IO_Error"]
log.Println("last io error:", lastIoError) // this should be empty
slaveIoState := res[0]["Slave_IO_State"]
log.Println("slave io state:", slaveIoState)
slaveIoRunning := res[0]["Slave_IO_Running"]
log.Println("slave io running:", slaveIoRunning) // this should be Yes
slaveSqlRunning := res[0]["Slave_SQL_Running"]
log.Println("slave sql running:", slaveSqlRunning) // this should be Yes
slaveSqlRunningState := res[0]["Slave_SQL_Running_State"]
log.Println("slave sql running state:", slaveSqlRunningState)
slaveSecondsBehindMaster := res[0]["Seconds_Behind_Master"]
log.Println("seconds behind master:", slaveSecondsBehindMaster) // this should be 0, if not, it means the slave is behind the master
log.Println("slave status:", res)
log.Println("****************************")
}
// stopSlave stops slave
func stopSlave(slavedb *Database) {
defer func() {
if err := recover(); err != nil {
log.Fatalln(err)
}
}()
slavedb.exec("stop slave")
slaveStatus(slavedb)
}
// startSlave starts slave
func startSlave(masterdb *Database, slavedb *Database) {
res := make([]map[string]string, 0)
defer func() {
if err := recover(); err != nil {
log.Fatalln(err)
}
}()
stopSlave(slavedb)
// get the info about master
res = masterdb.exec("show master status")
if len(res) == 0 {
log.Println("no master status")
return
}
pos := res[0]["Position"]
file := res[0]["File"]
log.Println("file:", file, ", position:", pos, ", master status:", res)
res = slavedb.exec("stop slave")
res = slavedb.exec(
"change master to master_host='%v', master_port=%v, master_user='%v', master_password='%v', master_log_file='%v', master_log_pos=%v;",
masterdb.host, masterdb.port, masterdb.slaveUser, masterdb.slavePassword, file, pos,
)
res = slavedb.exec("start slave")
slaveStatus(slavedb)
}

66
sync_v2/table_test.go Normal file
View File

@ -0,0 +1,66 @@
// Copyright 2023 The Casdoor Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !skipCi
// +build !skipCi
package sync_v2
import (
"log"
"math/rand"
"testing"
"github.com/casdoor/casdoor/util"
)
type TestUser struct {
Id int64 `xorm:"pk autoincr"`
Username string `xorm:"varchar(50)"`
Address string `xorm:"varchar(50)"`
Card string `xorm:"varchar(50)"`
Age int
}
func TestCreateUserTable(t *testing.T) {
db := newDatabase(&Configs[0])
err := db.engine.Sync2(new(TestUser))
if err != nil {
log.Fatalln(err)
}
}
func TestInsertUser(t *testing.T) {
db := newDatabase(&Configs[0])
// random generate user
user := &TestUser{
Username: util.GetRandomName(),
Age: rand.Intn(100) + 10,
}
_, err := db.engine.Insert(user)
if err != nil {
log.Fatalln(err)
}
}
func TestDeleteUser(t *testing.T) {
db := newDatabase(&Configs[0])
user := &TestUser{
Id: 10,
}
_, err := db.engine.Delete(user)
if err != nil {
log.Fatalln(err)
}
}