Go library that allows consuming from MaxScale CDC listener. Useful for detecting database changes via binlog.
This consumer follows the connection protocol defined by MaxScale 6 here.
For this test environment the following file structure was used:
- 📂
mariadb-config
- 📄
mariadb.cnf
- 📄
- 📂
mariadb-init
- 📄
00_schema.sql
- 📄
- 📂
maxscale-config
- 📄
maxscale.cnf
- 📄
- 📄
docker-compose.yml
- 📄
consumer.go
We need to enable replication in the MariaDB master database:
[mysqld]
server_id=1
binlog_format=row
binlog_row_image=full
log-bin=/var/log/mysql/mariadb-bin
A basic schema adding the MaxScale user, and some testing database to play with:
RESET MASTER;
-- https://mariadb.com/kb/en/mariadb-maxscale-6-setting-up-mariadb-maxscale/#creating-a-user-account-for-maxscale
CREATE USER 'maxuser'@'%' IDENTIFIED BY 'maxpwd';
GRANT REPLICATION SLAVE ON *.* TO 'maxuser'@'%';
GRANT REPLICATION CLIENT ON *.* TO 'maxuser'@'%';
GRANT SELECT ON mysql.user TO 'maxuser'@'%';
GRANT SELECT ON mysql.db TO 'maxuser'@'%';
GRANT SELECT ON mysql.tables_priv TO 'maxuser'@'%';
GRANT SELECT ON mysql.columns_priv TO 'maxuser'@'%';
GRANT SELECT ON mysql.procs_priv TO 'maxuser'@'%';
GRANT SELECT ON mysql.proxies_priv TO 'maxuser'@'%';
GRANT SELECT ON mysql.roles_mapping TO 'maxuser'@'%';
GRANT SHOW DATABASES ON *.* TO 'maxuser'@'%';
FLUSH PRIVILEGES;
DROP DATABASE IF EXISTS example;
CREATE DATABASE IF NOT EXISTS example;
-- Allows MaxScale user to run 'SHOW CREATE TABLE' for DDL events
-- https://mariadb.com/kb/en/mariadb-maxscale-6-avrorouter/#avro-schema-generator
GRANT SELECT ON example.* TO 'maxuser'@'%';
FLUSH PRIVILEGES;
USE example;
DROP TABLE IF EXISTS users;
CREATE TABLE users (
`id` INT NOT NULL AUTO_INCREMENT,
`name` VARCHAR(255) NOT NULL,
`email` VARCHAR(255) NOT NULL,
PRIMARY KEY (id)
);
INSERT INTO `users` (`name`, `email`) VALUES ('John Doe', '[email protected]');
INSERT INTO `users` (`name`, `email`) VALUES ('Jane Doe', '[email protected]');
MaxScale configuration to configure the Avro router
(direct replication mode)
and expose a listener so gomaxscale
can retrieve the information.
[MaxScale]
threads=1
admin_secure_gui=false
threads=auto
admin_host=0.0.0.0
[server1]
type=server
address=db
port=3306
protocol=MariaDBBackend
[cdc-service]
type=service
router=avrorouter
servers=server1
server_id=1
user=maxuser
password=maxpwd
[cdc-listener]
type=listener
service=cdc-service
protocol=CDC
port=4001
[MariaDB-Monitor]
type=monitor
module=mariadbmon
servers=server1
user=maxuser
password=maxpwd
monitor_interval=5000
To setup a MariaDB database and a MaxScale server we will use docker-compose with the following configuration:
version: '2.4'
services:
db:
container_name: "lab-db"
image: mariadb:10.3.8
volumes:
- ./mariadb-config:/etc/mysql/conf.d
- ./mariadb-init:/docker-entrypoint-initdb.d
environment:
MYSQL_ROOT_PASSWORD: abc123
healthcheck:
test: ["CMD", "mysqladmin", "ping", "--silent"]
dbproxy:
container_name: "lab-maxscale"
image: mariadb/maxscale:6.2
volumes:
- ./maxscale-config/maxscale.cnf:/etc/maxscale.cnf
ports:
- 4001:4001
depends_on:
- db
A local Go file will consume and log the modified items in the database:
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"github.com/rafaeljusto/gomaxscale/v2"
)
func main() {
consumer := gomaxscale.NewConsumer("127.0.0.1:4001", "example", "users",
gomaxscale.WithAuth("maxuser", "maxpwd"),
)
err := consumer.Start()
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
fmt.Println("start consuming events")
done := make(chan bool)
go func() {
consumer.Process(func(event gomaxscale.CDCEvent) {
switch e := event.(type) {
case gomaxscale.DDLEvent:
fmt.Printf("ddl event detected on database '%s' and table '%s'\n",
e.Database, e.Table)
case gomaxscale.DMLEvent:
fmt.Printf("dml '%s' event detected\n", e.Type)
}
})
done <- true
}()
signalChanel := make(chan os.Signal, 1)
signal.Notify(signalChanel, syscall.SIGINT, syscall.SIGTERM)
select {
case <-signalChanel:
case <-done:
}
fmt.Println("terminating")
}
First, start all services:
% docker-compose up -d
Then we can start consuming the items:
% go run consumer.go
To see the magic happening you could do some database changes:
% docker-compose exec db mysql -u root -p abc123 -D example \
-e "INSERT INTO users (name, email) VALUES ('James Doe', '[email protected]')"