Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 76 additions & 14 deletions go/libraries/doltcore/sqle/binlogreplication/binlog_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,12 +348,24 @@ func (b *binlogProducer) createTableMapEvents(ctx *sql.Context, databaseName str
if tableName.Name == "" {
tableName = tableDelta.FromName
}

isFullRowMetadata, err := isFullRowMetadata(ctx)
if err != nil {
return nil, nil, err
}

tablesToId[tableName.Name] = tableId
tableMap, err := createTableMapFromDoltTable(ctx, databaseName, tableName.Name, tableDelta.ToTable)
tableMap, err := createTableMapFromDoltTable(ctx, databaseName,
tableName.Name, tableDelta.ToTable, isFullRowMetadata)
if err != nil {
return nil, nil, err
}

tableMapEvent, err := b.newTableMapEvent(tableId, tableMap)
if err != nil {
return nil, nil, err
}
events = append(events, b.newTableMapEvent(tableId, tableMap))
events = append(events, tableMapEvent)
}

return events, tablesToId, nil
Expand Down Expand Up @@ -548,12 +560,8 @@ func (b *binlogProducer) newXIDEvent() mysql.BinlogEvent {

// newTableMapEvent returns a new TableMap BinlogEvent for the specified |tableId| and |tableMap|, and updates the
// stream's log position.
func (b *binlogProducer) newTableMapEvent(tableId uint64, tableMap *mysql.TableMap) mysql.BinlogEvent {
// TODO: The new error introduced in this function signature is only used when optional table metadata
// is specified (e.g. column names). Dolt doesn't support populating this yet, so there's no
// need to look at the return error yet. That will be added in an upcoming PR.
event, _ := mysql.NewTableMapEvent(*b.binlogFormat, b.binlogEventMeta, tableId, tableMap)
return event
func (b *binlogProducer) newTableMapEvent(tableId uint64, tableMap *mysql.TableMap) (mysql.BinlogEvent, error) {
return mysql.NewTableMapEvent(*b.binlogFormat, b.binlogEventMeta, tableId, tableMap)
}

// newWriteRowsEvent returns a new WriteRows BinlogEvent for the specified |tableId| and |rows|, and updates the
Expand Down Expand Up @@ -594,6 +602,22 @@ func isDatabaseFilteredOut(ctx *sql.Context, dbName string) bool {
return false
}

// isFullRowMetadata returns true if the system is configured to return full metadata as part of
// binlog table descriptions. Full row metadata is configured by setting @@binlog_row_metadata
// to FULL.
func isFullRowMetadata(ctx *sql.Context) (bool, error) {
val, err := ctx.GetSessionVariable(ctx, "binlog_row_metadata")
if err != nil {
return false, err
}

if stringVal, ok := val.(string); ok {
return strings.EqualFold(stringVal, "FULL"), nil
} else {
return false, nil
}
}

// extractRowCountAndDiffType uses |sch| and |diff| to determine how many changed rows this
// diff represents (returned as the |rowCount| param) and what type of change it represents
// (returned as the |diffType| param). For tables with primary keys, this function will always
Expand Down Expand Up @@ -659,19 +683,47 @@ func extractRowCountAndDiffType(sch schema.Schema, diff tree.Diff) (rowCount uin
}
}

// createTableMapFromDoltTable creates a binlog TableMap for the given Dolt table.
func createTableMapFromDoltTable(ctx *sql.Context, databaseName, tableName string, table *doltdb.Table) (*mysql.TableMap, error) {
schema, err := table.GetSchema(ctx)
// createTableMapFromDoltTable creates a binlog TableMap for the given Dolt table. If
// |includeOptionalMetadata| is set to true, then additional, optional metadata such as
// column names and column collations will also be included in the TableMap.
func createTableMapFromDoltTable(ctx *sql.Context, databaseName, tableName string, table *doltdb.Table, includeOptionalMetadata bool) (*mysql.TableMap, error) {
sch, err := table.GetSchema(ctx)
if err != nil {
return nil, err
}

columns := schema.GetAllCols().GetColumns()
columns := sch.GetAllCols().GetColumns()
types := make([]byte, len(columns))
metadata := make([]uint16, len(columns))
canBeNullMap := mysql.NewServerBitmap(len(columns))

var columnNames []string
var columnCollationIds []uint64
var enumValues [][]string
var setValues [][]string
var enumAndSetCollationIds []uint64
if includeOptionalMetadata {
columnNames = make([]string, len(columns))
columnCollationIds = make([]uint64, len(columns))
}

for i, col := range columns {
if includeOptionalMetadata {
columnNames[i] = col.Name
columnCollationIds[i] = uint64(schema.Collation_Unspecified)
if stringType, ok := col.TypeInfo.ToSqlType().(sql.StringType); ok {
columnCollationIds[i] = uint64(stringType.Collation())
}

if enumType, ok := col.TypeInfo.ToSqlType().(sql.EnumType); ok {
enumValues = append(enumValues, enumType.Values())
enumAndSetCollationIds = append(enumAndSetCollationIds, uint64(enumType.Collation()))
} else if setType, ok := col.TypeInfo.ToSqlType().(sql.SetType); ok {
setValues = append(setValues, setType.Values())
enumAndSetCollationIds = append(enumAndSetCollationIds, uint64(setType.Collation()))
}
}

metadata[i] = 0
typ := col.TypeInfo.ToSqlType()

Expand All @@ -687,14 +739,24 @@ func createTableMapFromDoltTable(ctx *sql.Context, databaseName, tableName strin
}
}

return &mysql.TableMap{
tableMap := &mysql.TableMap{
Flags: 0x0000,
Database: databaseName,
Name: tableName,
Types: types,
CanBeNull: canBeNullMap,
Metadata: metadata,
}, nil
}

if includeOptionalMetadata {
tableMap.OptionalColumnNames = columnNames
tableMap.OptionalEnumValues = enumValues
tableMap.OptionalSetValues = setValues
tableMap.OptionalColumnCollations = columnCollationIds
tableMap.OptionalEnumAndSetCollations = enumAndSetCollationIds
}

return tableMap, nil
}

// createBinlogFormat returns a new BinlogFormat that describes the format of this binlog stream, which will always
Expand Down
3 changes: 2 additions & 1 deletion integration-tests/mysql-client-tests/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,14 @@ RUN make

FROM python:3.14-slim-bookworm AS python_clients_build
RUN apt-get update && apt-get install -y binutils libmariadb-dev gcc && rm -rf /var/lib/apt/lists/*
RUN pip install --no-cache-dir mysql-connector-python==8.0.33 PyMySQL==1.0.2 sqlalchemy==1.4.46 mariadb pyinstaller
RUN pip install --no-cache-dir mysql-connector-python==8.0.33 PyMySQL==1.0.2 sqlalchemy==1.4.46 mariadb pyinstaller mysql-replication
COPY dolt/integration-tests/mysql-client-tests/python/ /build/python/
WORKDIR /build/python/
RUN pyinstaller --onefile pymysql-test.py
RUN pyinstaller --onefile --collect-all mysql.connector sqlalchemy-test.py
RUN pyinstaller --onefile --collect-all mysql.connector mysql-connector-test.py
RUN pyinstaller --onefile mariadb-connector-test.py
RUN pyinstaller --onefile python-replication-test.py

FROM elixir:1.18.3-slim AS elixir_clients_build
RUN apt-get update && apt-get install -y ca-certificates xz-utils curl && rm -rf /var/lib/apt/lists/*
Expand Down
20 changes: 20 additions & 0 deletions integration-tests/mysql-client-tests/mysql-client-tests.bats
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,26 @@ teardown() {
/build/bin/python/sqlalchemy-test $USER $PORT $REPO_NAME
}

@test "python replication client" {
# Stop the server that setup launched
kill $SERVER_PID

# Configure binlog replication settings
dolt sql -q "SET @@PERSIST.log_bin=1;"
dolt sql -q "SET @@PERSIST.gtid_mode=ON;"
dolt sql -q "SET @@PERSIST.enforce_gtid_consistency=ON;"
dolt sql -q "SET @@PERSIST.binlog_format='ROW';"
dolt sql -q "SET @@PERSIST.binlog_row_image='FULL';"
dolt sql -q "SET @@PERSIST.binlog_row_metadata='FULL';"

# Restart the SQL server
dolt sql-server --host 0.0.0.0 --port=$PORT --loglevel=trace &
SERVER_PID=$!
sleep 1 # Give the server a chance to start

/build/bin/python/python-replication-test $USER $PORT $REPO_NAME
}

@test "java mysql-connector-j" {
java -jar /build/bin/java/mysql-connector-test.jar $USER $PORT $REPO_NAME
}
Expand Down
131 changes: 131 additions & 0 deletions integration-tests/mysql-client-tests/python/python-replication-test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import pymysql
import sys
import time
from typing import List, Optional, Iterable, Type, TypeVar
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.event import (
RotateEvent,
FormatDescriptionEvent,
GtidEvent,
QueryEvent,
HeartbeatLogEvent,
)
from pymysqlreplication.row_event import WriteRowsEvent

T = TypeVar("T")

def main():
user = sys.argv[1]
port = int(sys.argv[2])
db = sys.argv[3]

# Connect to the running SQL server to create some test data
connection = pymysql.connect(host="127.0.0.1",
port=port,
user=user,
db=db)
with connection.cursor() as cursor:
cursor.execute("CREATE TABLE t (pk int primary key, c1 varchar(100));")
cursor.execute("INSERT INTO t VALUES (1, 'foobarbazbar');")

# Connect to a replication event stream
mysql_settings = {'host': '127.0.0.1', 'port': port, 'user': user, 'passwd': ''}
stream = BinLogStreamReader(connection_settings=mysql_settings, blocking=True, server_id=100, auto_position='8e66e4f4-955a-4844-909a-33d79f78ddba:1')

# Grab the first 10 events
events = read_events(stream, n=10, timeout_s=15.0)

# To help debugging a failed test, print out the event data
for event in events:
event.dump()

assert_correct_events(events)
stream.close()
sys.exit(0)


def assert_correct_events(events):
"""
Checks the events for expected events and values and raises an AssertionError if unexpected results are found.
"""
# Assert that a RotateEvent is present and indicates that binlog-main.000001 is the next file
rotateEvent = require_event(events, RotateEvent)
next_file = getattr(rotateEvent, "next_binlog", None) or getattr(rotateEvent, "next_binlog_file", None)
assert next_file == "binlog-main.000001", f"RotateEvent next binlog expected binlog-main.000001, got {next_file!r}"

# Assert that a FormatDescriptionEvent is present and contains the expected binlog version number
formatDescriptionEvent = require_event(events, FormatDescriptionEvent)
assert getattr(formatDescriptionEvent, "binlog_version", None) == (4,), f"binlog_version expected 4, got {getattr(formatDescriptionEvent,'binlog_version',None)}"

# Assert that the QueryEvent for the CREATE TABLE statement is present
queryEvent = require_event(events, QueryEvent)
query = getattr(queryEvent, "query", None) or getattr(queryEvent, "query_text", None)
assert query is not None, "QueryEvent query field was None"
if isinstance(query, bytes):
query = query.decode("utf-8", errors="replace")
assert "CREATE TABLE" in query, f"Query did not contain CREATE TABLE: {query!r}"
assert "`t`" in query or " t " in query, f"Query did not appear to create table t: {query!r}"

# Assert that a WriteRowsEvent is present with the correct row values, and that column names
# can be used to access the fields (e.g. @@binlog_row_metadata=FULL is enabled and working).
writeRowsEvent = require_event(events, WriteRowsEvent)
row0 = writeRowsEvent.rows[0]
values = row0.get("values") if isinstance(row0, dict) else getattr(row0, "values", None)
assert isinstance(values, dict), f"Expected row values dict, got {type(values).__name__}"
assert "pk" in values, f"Missing column 'pk'. Got keys: {sorted(values.keys())}"
assert values["pk"] == 1, f"pk expected 1, got {values['pk']!r}"
assert "c1" in values, f"Missing column 'c1'. Got keys: {sorted(values.keys())}"
assert values["c1"] == "foobarbazbar", f"c1 expected 'foobarbazbar', got {values['c1']!r}"

def read_events(stream, n: int = 10, timeout_s: float = 30.0) -> List[object]:
"""
Read up to `n` non-heartbeat events from `stream`, or stop after `timeout_s`.
Returns the list of collected events (possibly fewer than n).
"""
events: List[object] = []
deadline = time.monotonic() + timeout_s

try:
for ev in stream:
# Timeout check first so we don't hang forever
if time.monotonic() >= deadline:
break

# Skip heartbeats (optional: count them, log them, etc.)
if isinstance(ev, HeartbeatLogEvent):
continue

events.append(ev)
if len(events) >= n:
break
finally:
# Important: stop network/background reading to avoid hangs/leaks
try:
stream.close()
except Exception:
pass

return events

def find_event(events: Iterable[object], event_type: Type[T]) -> Optional[T]:
"""
Return the first event in `events` that is an instance of `event_type`,
or None if no such event exists.
"""
for event in events:
if isinstance(event, event_type):
return event
return None

def require_event(events: Iterable[object], event_type: Type[T]) -> T:
"""
Return the first event in `events` that is an instance of `event_type`,
or raise an AssertionError if none is found.
"""
event = find_event(events, event_type)
if event is None:
raise AssertionError(f"Expected event of type {event_type.__name__}, but none was found")
return event


main()
Loading