Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: mysql replication enhancements #336

Merged
merged 10 commits into from
Jan 2, 2025
17 changes: 13 additions & 4 deletions .github/workflows/replication-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,23 +90,29 @@ jobs:
docker exec source-db dolt sql -q "
CREATE DATABASE test;
CREATE TABLE test.items (id INT PRIMARY KEY, name VARCHAR(50));
INSERT INTO test.items VALUES (1, 'test1'), (2, 'test2');"
INSERT INTO test.items VALUES (1, 'test1'), (2, 'test2');
CREATE TABLE test.skip (id INT PRIMARY KEY, name VARCHAR(50));
INSERT INTO test.skip VALUES (1, 'abc'), (2, 'def');"
elif [ "${{ matrix.source }}" = "mariadb" ]; then
docker exec source-db mariadb -uroot -proot test -e "
CREATE TABLE items (id INT PRIMARY KEY, name VARCHAR(50));
INSERT INTO items VALUES (1, 'test1'), (2, 'test2');"
INSERT INTO items VALUES (1, 'test1'), (2, 'test2');
CREATE TABLE skip (id INT PRIMARY KEY, name VARCHAR(50));
INSERT INTO skip VALUES (1, 'abc'), (2, 'def');"
else
docker exec source-db mysql -uroot -proot test -e "
CREATE TABLE items (id INT PRIMARY KEY, name VARCHAR(50));
INSERT INTO items VALUES (1, 'test1'), (2, 'test2');"
INSERT INTO items VALUES (1, 'test1'), (2, 'test2');
CREATE TABLE skip (id INT PRIMARY KEY, name VARCHAR(50));
INSERT INTO skip VALUES (1, 'abc'), (2, 'def');"
fi

- name: Start MyDuck Server in replica mode
run: |
if [ "${{ matrix.source }}" = "postgres" ]; then
SOURCE_DSN="postgres://postgres:[email protected]:5432/test"
else
SOURCE_DSN="mysql://root:[email protected]:3306"
SOURCE_DSN="mysql://root:[email protected]:3306/test?skip-tables=skip"
fi

docker run -d --name myduck \
Expand Down Expand Up @@ -203,6 +209,9 @@ jobs:
exit 1
fi

# Print the logs
docker logs myduck

- name: Cleanup
if: always()
run: |
Expand Down
2 changes: 1 addition & 1 deletion backend/loaddata.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (db *DuckBuilder) executeLoadData(ctx *sql.Context, insert *plan.InsertInto
// Replicated tables do not have physical primary keys.
// Their logical primary keys are fake and should not be used in INSERT INTO statements.
// https://github.com/apecloud/myduckserver/issues/272
keyless = t.ExtraTableInfo().Replicated
keyless = t.ExtraTableInfo().Replicated || !t.HasPrimaryKey()
}
}

Expand Down
32 changes: 32 additions & 0 deletions binlogreplication/binlog_replica_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,18 @@ func (d *myBinlogReplicaController) SetReplicationSourceOptions(ctx *sql.Context
func (d *myBinlogReplicaController) SetReplicationFilterOptions(_ *sql.Context, options []binlogreplication.ReplicationOption) error {
for _, option := range options {
switch strings.ToUpper(option.Name) {
case "REPLICATE_DO_DB":
value, err := getOptionValueAsDatabaseNames(option)
if err != nil {
return err
}
d.filters.setDoDatabases(value)
case "REPLICATE_IGNORE_DB":
value, err := getOptionValueAsDatabaseNames(option)
if err != nil {
return err
}
d.filters.setIgnoreDatabases(value)
case "REPLICATE_DO_TABLE":
value, err := getOptionValueAsTableNames(option)
if err != nil {
Expand Down Expand Up @@ -378,6 +390,8 @@ func (d *myBinlogReplicaController) GetReplicaStatus(ctx *sql.Context) (*binlogr
copy.SourceServerUuid = replicaSourceInfo.Uuid
copy.ConnectRetry = replicaSourceInfo.ConnectRetryInterval
copy.SourceRetryCount = replicaSourceInfo.ConnectRetryCount
// copy.ReplicateDoDBs = d.filters.getDoDatabases()
// copy.ReplicateIgnoreDBs = d.filters.getIgnoreDatabases()
copy.ReplicateDoTables = d.filters.getDoTables()
copy.ReplicateIgnoreTables = d.filters.getIgnoreTables()

Expand Down Expand Up @@ -523,6 +537,24 @@ func getOptionValueAsTableNames(option binlogreplication.ReplicationOption) ([]s
"but expected a list of tables", option.Name, option.Value.GetValue())
}

func getOptionValueAsDatabaseNames(option binlogreplication.ReplicationOption) ([]string, error) {
// The value of the option should be a list of database names.
// But since the parser doesn't have a database name list type,
// we reuse the table name list type to represent a list of database names.
ov, ok := option.Value.(binlogreplication.TableNamesReplicationOptionValue)
if ok {
list := ov.GetValueAsTableList()
names := make([]string, len(list))
for i, t := range list {
names[i] = t.Name()
}
return names, nil
}

return nil, fmt.Errorf("unsupported value type for option %q; found %T, "+
"but expected a list of databases", option.Name, option.Value.GetValue())
}

func verifyAllTablesAreQualified(urts []sql.UnresolvedTable) error {
for _, urt := range urts {
if urt.Database().Name() == "" {
Expand Down
89 changes: 85 additions & 4 deletions binlogreplication/binlog_replica_filtering.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (

// filterConfiguration defines the binlog filtering rules applied on the replica.
type filterConfiguration struct {
// doDatabases holds a map of database names that SHOULD be replicated.
doDatabases map[string]struct{}
// ignoreDatabases holds a map of database names that should NOT be replicated.
ignoreDatabases map[string]struct{}
// doTables holds a map of database name to map of table names, indicating tables that SHOULD be replicated.
doTables map[string]map[string]struct{}
// ignoreTables holds a map of database name to map of table names, indicating tables that should NOT be replicated.
Expand All @@ -36,9 +40,39 @@ type filterConfiguration struct {
// newFilterConfiguration creates a new filterConfiguration instance and initializes members.
func newFilterConfiguration() *filterConfiguration {
return &filterConfiguration{
doTables: make(map[string]map[string]struct{}),
ignoreTables: make(map[string]map[string]struct{}),
mu: &sync.Mutex{},
doDatabases: make(map[string]struct{}),
ignoreDatabases: make(map[string]struct{}),
doTables: make(map[string]map[string]struct{}),
ignoreTables: make(map[string]map[string]struct{}),
mu: &sync.Mutex{},
}
}

// setDoDatabases sets the databases that are allowed to replicate. If any DoDatabases were previously configured,
// they are cleared out before the new databases are set.
func (fc *filterConfiguration) setDoDatabases(databases []string) {
fc.mu.Lock()
defer fc.mu.Unlock()

// Setting new replication filters clears out any existing filters
fc.doDatabases = make(map[string]struct{})

for _, db := range databases {
fc.doDatabases[strings.ToLower(db)] = struct{}{}
}
}

// setIgnoreDatabases sets the databases that are NOT allowed to replicate. If any IgnoreDatabases were previously configured,
// they are cleared out before the new databases are set.
func (fc *filterConfiguration) setIgnoreDatabases(databases []string) {
fc.mu.Lock()
defer fc.mu.Unlock()

// Setting new replication filters clears out any existing filters
fc.ignoreDatabases = make(map[string]struct{})

for _, db := range databases {
fc.ignoreDatabases[strings.ToLower(db)] = struct{}{}
}
}

Expand Down Expand Up @@ -96,6 +130,38 @@ func (fc *filterConfiguration) setIgnoreTables(urts []sql.UnresolvedTable) error
return nil
}

// getDoDatabases returns a slice of database names that are configured to be replicated.
func (fc *filterConfiguration) getDoDatabases() []string {
fc.mu.Lock()
defer fc.mu.Unlock()

if len(fc.doDatabases) == 0 {
return nil
}

databases := make([]string, 0, len(fc.doDatabases))
for db := range fc.doDatabases {
databases = append(databases, db)
}
return databases
}

// getIgnoreDatabases returns a slice of database names that are configured to be filtered out of replication.
func (fc *filterConfiguration) getIgnoreDatabases() []string {
fc.mu.Lock()
defer fc.mu.Unlock()

if len(fc.ignoreDatabases) == 0 {
return nil
}

databases := make([]string, 0, len(fc.ignoreDatabases))
for db := range fc.ignoreDatabases {
databases = append(databases, db)
}
return databases
}

// isTableFilteredOut returns true if the table identified by |tableMap| has been filtered out on this replica and
// should not have any updates applied from binlog messages.
func (fc *filterConfiguration) isTableFilteredOut(ctx *sql.Context, tableMap *mysql.TableMap) bool {
Expand All @@ -109,6 +175,21 @@ func (fc *filterConfiguration) isTableFilteredOut(ctx *sql.Context, tableMap *my
fc.mu.Lock()
defer fc.mu.Unlock()

// If any filter doDatabase options are specified, then a database MUST be listed in the set
// for it to be replicated. doDatabase options are processed BEFORE ignoreDatabase options.
// https://dev.mysql.com/doc/refman/8.4/en/replication-rules-db-options.html
if len(fc.doDatabases) > 0 {
if _, ok := fc.doDatabases[db]; !ok {
ctx.GetLogger().Tracef("skipping database %s (not in doDatabases)", db)
return true
}
} else if len(fc.ignoreDatabases) > 0 {
if _, ok := fc.ignoreDatabases[db]; ok {
ctx.GetLogger().Tracef("skipping database %s (in ignoreDatabases)", db)
return true
}
}

// If any filter doTable options are specified, then a table MUST be listed in the set
// for it to be replicated. doTables options are processed BEFORE ignoreTables options.
// If a table appears in both doTable and ignoreTables, it is ignored.
Expand Down Expand Up @@ -160,7 +241,7 @@ func convertFilterMapToStringSlice(filterMap map[string]map[string]struct{}) []s

tableNames := make([]string, 0, len(filterMap))
for dbName, tableMap := range filterMap {
for tableName, _ := range tableMap {
for tableName := range tableMap {
tableNames = append(tableNames, fmt.Sprintf("%s.%s", dbName, tableName))
}
}
Expand Down
95 changes: 94 additions & 1 deletion binlogreplication/binlog_replication_filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/stretchr/testify/require"
)

// TestBinlogReplicationFilters_ignoreTablesOnly tests that the ignoreTables replication
// TestReplicationFilters_ignoreTablesOnly tests that the ignoreTables replication
// filtering option is correctly applied and honored.
func TestBinlogReplicationFilters_ignoreTablesOnly(t *testing.T) {
defer teardown(t)
Expand Down Expand Up @@ -189,3 +189,96 @@ func TestBinlogReplicationFilters_errorCases(t *testing.T) {
require.Error(t, err)
require.ErrorContains(t, err, "no database specified for table")
}

// TestReplicationFilters_ignoreDatabasesOnly tests that the ignoreDatabases replication
// filtering option is correctly applied and honored.
func TestReplicationFilters_ignoreDatabasesOnly(t *testing.T) {
defer teardown(t)
startSqlServersWithSystemVars(t, duckReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)

// Ignore replication events for db01. Also tests that the first filter setting is overwritten by
// the second and that db names are case-insensitive.
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_DB=(db02);")
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_DB=(DB01);")

// TODO(fan): Not implemented yet
// Assert that status shows replication filters
// status := showReplicaStatus(t)
// require.Equal(t, "db01", status["Replicate_Ignore_DB"])
// require.Equal(t, "", status["Replicate_Do_DB"])

// Make changes on the primary
primaryDatabase.MustExec("CREATE DATABASE db02;")
primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
primaryDatabase.MustExec("CREATE TABLE db02.t1 (pk INT PRIMARY KEY);")
for i := 1; i < 12; i++ {
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db02.t1 VALUES (%d);", i))
}

// Pause to let the replica catch up
waitForReplicaToCatchUp(t)

// Although the database is ignored, it is still created on the replica
// because the DDL statements are not filtered out.

// Verify that no changes from db01 were applied on the replica
rows, err := replicaDatabase.Queryx("SELECT COUNT(pk) as count FROM db01.t1;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "0", row["count"])
require.NoError(t, rows.Close())

// Verify that all changes from db02 were applied on the replica
rows, err = replicaDatabase.Queryx("SELECT COUNT(pk) as count FROM db02.t1;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "11", row["count"])
require.NoError(t, rows.Close())
}

// TestReplicationFilters_doDatabasesOnly tests that the doDatabases replication
// filtering option is correctly applied and honored.
func TestReplicationFilters_doDatabasesOnly(t *testing.T) {
defer teardown(t)
startSqlServersWithSystemVars(t, duckReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)

// Do replication events for db01. Also tests that the first filter setting is overwritten by
// the second and that db names are case-insensitive.
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_DB=(db02);")
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_DB=(DB01);")

// TODO(fan): Not implemented yet
// Assert that status shows replication filters
// status := showReplicaStatus(t)
// require.Equal(t, "db01", status["Replicate_Do_DB"])
// require.Equal(t, "", status["Replicate_Ignore_DB"])

// Make changes on the primary
primaryDatabase.MustExec("CREATE DATABASE db02;")
primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
primaryDatabase.MustExec("CREATE TABLE db02.t1 (pk INT PRIMARY KEY);")
for i := 1; i < 12; i++ {
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db02.t1 VALUES (%d);", i))
}

// Pause to let the replica catch up
waitForReplicaToCatchUp(t)

// Verify that all changes from db01 were applied on the replica
rows, err := replicaDatabase.Queryx("SELECT COUNT(pk) as count FROM db01.t1;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "11", row["count"])
require.NoError(t, rows.Close())

// Verify that no changes from db02 were applied on the replica
rows, err = replicaDatabase.Queryx("SELECT COUNT(pk) as count FROM db02.t1;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "0", row["count"])
require.NoError(t, rows.Close())
}
7 changes: 4 additions & 3 deletions catalog/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (d *Database) tablesInsensitive(ctx *sql.Context, pattern string) ([]*Table
}

func (d *Database) findTables(ctx *sql.Context, pattern string) ([]*Table, error) {
rows, err := adapter.QueryCatalog(ctx, "SELECT DISTINCT table_name, comment FROM duckdb_tables() WHERE (database_name = ? AND schema_name = ? AND table_name ILIKE ?) OR (temporary IS TRUE AND table_name ILIKE ?)", d.catalog, d.name, pattern, pattern)
rows, err := adapter.QueryCatalog(ctx, "SELECT table_name, has_primary_key, comment FROM duckdb_tables() WHERE (database_name = ? AND schema_name = ? AND table_name ILIKE ?) OR (temporary IS TRUE AND table_name ILIKE ?)", d.catalog, d.name, pattern, pattern)
if err != nil {
return nil, ErrDuckDB.New(err)
}
Expand All @@ -104,11 +104,12 @@ func (d *Database) findTables(ctx *sql.Context, pattern string) ([]*Table, error
var tbls []*Table
for rows.Next() {
var tblName string
var hasPrimaryKey bool
var comment stdsql.NullString
if err := rows.Scan(&tblName, &comment); err != nil {
if err := rows.Scan(&tblName, &hasPrimaryKey, &comment); err != nil {
return nil, ErrDuckDB.New(err)
}
t := NewTable(tblName, d).withComment(DecodeComment[ExtraTableInfo](comment.String))
t := NewTable(d, tblName, hasPrimaryKey).withComment(DecodeComment[ExtraTableInfo](comment.String))
tbls = append(tbls, t)
}
if err := rows.Err(); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion catalog/inserter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type rowInserter struct {
db string
table string
schema sql.Schema
hasPK bool
replace bool

once sync.Once
Expand Down Expand Up @@ -69,7 +70,7 @@ func (ri *rowInserter) init(ctx *sql.Context) {

insert.Reset()
insert.WriteString("INSERT ")
if ri.replace {
if ri.replace && ri.hasPK {
insert.WriteString(" OR REPLACE")
}
insert.WriteString(" INTO ")
Expand Down
Loading
Loading