diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_producer.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_producer.go index 3fdc5032388..695e08cfe6c 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_producer.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_producer.go @@ -348,12 +348,24 @@ func (b *binlogProducer) createTableMapEvents(ctx *sql.Context, databaseName str if tableName.Name == "" { tableName = tableDelta.FromName } + + isFullRowMetadata, err := isFullRowMetadata(ctx) + if err != nil { + return nil, nil, err + } + tablesToId[tableName.Name] = tableId - tableMap, err := createTableMapFromDoltTable(ctx, databaseName, tableName.Name, tableDelta.ToTable) + tableMap, err := createTableMapFromDoltTable(ctx, databaseName, + tableName.Name, tableDelta.ToTable, isFullRowMetadata) + if err != nil { + return nil, nil, err + } + + tableMapEvent, err := b.newTableMapEvent(tableId, tableMap) if err != nil { return nil, nil, err } - events = append(events, b.newTableMapEvent(tableId, tableMap)) + events = append(events, tableMapEvent) } return events, tablesToId, nil @@ -548,12 +560,8 @@ func (b *binlogProducer) newXIDEvent() mysql.BinlogEvent { // newTableMapEvent returns a new TableMap BinlogEvent for the specified |tableId| and |tableMap|, and updates the // stream's log position. -func (b *binlogProducer) newTableMapEvent(tableId uint64, tableMap *mysql.TableMap) mysql.BinlogEvent { - // TODO: The new error introduced in this function signature is only used when optional table metadata - // is specified (e.g. column names). Dolt doesn't support populating this yet, so there's no - // need to look at the return error yet. That will be added in an upcoming PR. - event, _ := mysql.NewTableMapEvent(*b.binlogFormat, b.binlogEventMeta, tableId, tableMap) - return event +func (b *binlogProducer) newTableMapEvent(tableId uint64, tableMap *mysql.TableMap) (mysql.BinlogEvent, error) { + return mysql.NewTableMapEvent(*b.binlogFormat, b.binlogEventMeta, tableId, tableMap) } // newWriteRowsEvent returns a new WriteRows BinlogEvent for the specified |tableId| and |rows|, and updates the @@ -594,6 +602,22 @@ func isDatabaseFilteredOut(ctx *sql.Context, dbName string) bool { return false } +// isFullRowMetadata returns true if the system is configured to return full metadata as part of +// binlog table descriptions. Full row metadata is configured by setting @@binlog_row_metadata +// to FULL. +func isFullRowMetadata(ctx *sql.Context) (bool, error) { + val, err := ctx.GetSessionVariable(ctx, "binlog_row_metadata") + if err != nil { + return false, err + } + + if stringVal, ok := val.(string); ok { + return strings.EqualFold(stringVal, "FULL"), nil + } else { + return false, nil + } +} + // extractRowCountAndDiffType uses |sch| and |diff| to determine how many changed rows this // diff represents (returned as the |rowCount| param) and what type of change it represents // (returned as the |diffType| param). For tables with primary keys, this function will always @@ -659,19 +683,47 @@ func extractRowCountAndDiffType(sch schema.Schema, diff tree.Diff) (rowCount uin } } -// createTableMapFromDoltTable creates a binlog TableMap for the given Dolt table. -func createTableMapFromDoltTable(ctx *sql.Context, databaseName, tableName string, table *doltdb.Table) (*mysql.TableMap, error) { - schema, err := table.GetSchema(ctx) +// createTableMapFromDoltTable creates a binlog TableMap for the given Dolt table. If +// |includeOptionalMetadata| is set to true, then additional, optional metadata such as +// column names and column collations will also be included in the TableMap. +func createTableMapFromDoltTable(ctx *sql.Context, databaseName, tableName string, table *doltdb.Table, includeOptionalMetadata bool) (*mysql.TableMap, error) { + sch, err := table.GetSchema(ctx) if err != nil { return nil, err } - columns := schema.GetAllCols().GetColumns() + columns := sch.GetAllCols().GetColumns() types := make([]byte, len(columns)) metadata := make([]uint16, len(columns)) canBeNullMap := mysql.NewServerBitmap(len(columns)) + var columnNames []string + var columnCollationIds []uint64 + var enumValues [][]string + var setValues [][]string + var enumAndSetCollationIds []uint64 + if includeOptionalMetadata { + columnNames = make([]string, len(columns)) + columnCollationIds = make([]uint64, len(columns)) + } + for i, col := range columns { + if includeOptionalMetadata { + columnNames[i] = col.Name + columnCollationIds[i] = uint64(schema.Collation_Unspecified) + if stringType, ok := col.TypeInfo.ToSqlType().(sql.StringType); ok { + columnCollationIds[i] = uint64(stringType.Collation()) + } + + if enumType, ok := col.TypeInfo.ToSqlType().(sql.EnumType); ok { + enumValues = append(enumValues, enumType.Values()) + enumAndSetCollationIds = append(enumAndSetCollationIds, uint64(enumType.Collation())) + } else if setType, ok := col.TypeInfo.ToSqlType().(sql.SetType); ok { + setValues = append(setValues, setType.Values()) + enumAndSetCollationIds = append(enumAndSetCollationIds, uint64(setType.Collation())) + } + } + metadata[i] = 0 typ := col.TypeInfo.ToSqlType() @@ -687,14 +739,24 @@ func createTableMapFromDoltTable(ctx *sql.Context, databaseName, tableName strin } } - return &mysql.TableMap{ + tableMap := &mysql.TableMap{ Flags: 0x0000, Database: databaseName, Name: tableName, Types: types, CanBeNull: canBeNullMap, Metadata: metadata, - }, nil + } + + if includeOptionalMetadata { + tableMap.OptionalColumnNames = columnNames + tableMap.OptionalEnumValues = enumValues + tableMap.OptionalSetValues = setValues + tableMap.OptionalColumnCollations = columnCollationIds + tableMap.OptionalEnumAndSetCollations = enumAndSetCollationIds + } + + return tableMap, nil } // createBinlogFormat returns a new BinlogFormat that describes the format of this binlog stream, which will always diff --git a/integration-tests/mysql-client-tests/Dockerfile b/integration-tests/mysql-client-tests/Dockerfile index 8e47ae4f438..94bdf721dc4 100644 --- a/integration-tests/mysql-client-tests/Dockerfile +++ b/integration-tests/mysql-client-tests/Dockerfile @@ -72,13 +72,14 @@ RUN make FROM python:3.14-slim-bookworm AS python_clients_build RUN apt-get update && apt-get install -y binutils libmariadb-dev gcc && rm -rf /var/lib/apt/lists/* -RUN pip install --no-cache-dir mysql-connector-python==8.0.33 PyMySQL==1.0.2 sqlalchemy==1.4.46 mariadb pyinstaller +RUN pip install --no-cache-dir mysql-connector-python==8.0.33 PyMySQL==1.0.2 sqlalchemy==1.4.46 mariadb pyinstaller mysql-replication COPY dolt/integration-tests/mysql-client-tests/python/ /build/python/ WORKDIR /build/python/ RUN pyinstaller --onefile pymysql-test.py RUN pyinstaller --onefile --collect-all mysql.connector sqlalchemy-test.py RUN pyinstaller --onefile --collect-all mysql.connector mysql-connector-test.py RUN pyinstaller --onefile mariadb-connector-test.py +RUN pyinstaller --onefile python-replication-test.py FROM elixir:1.18.3-slim AS elixir_clients_build RUN apt-get update && apt-get install -y ca-certificates xz-utils curl && rm -rf /var/lib/apt/lists/* diff --git a/integration-tests/mysql-client-tests/mysql-client-tests.bats b/integration-tests/mysql-client-tests/mysql-client-tests.bats index 3e0c5b61773..1fbd1d4d0da 100644 --- a/integration-tests/mysql-client-tests/mysql-client-tests.bats +++ b/integration-tests/mysql-client-tests/mysql-client-tests.bats @@ -49,6 +49,26 @@ teardown() { /build/bin/python/sqlalchemy-test $USER $PORT $REPO_NAME } +@test "python replication client" { + # Stop the server that setup launched + kill $SERVER_PID + + # Configure binlog replication settings + dolt sql -q "SET @@PERSIST.log_bin=1;" + dolt sql -q "SET @@PERSIST.gtid_mode=ON;" + dolt sql -q "SET @@PERSIST.enforce_gtid_consistency=ON;" + dolt sql -q "SET @@PERSIST.binlog_format='ROW';" + dolt sql -q "SET @@PERSIST.binlog_row_image='FULL';" + dolt sql -q "SET @@PERSIST.binlog_row_metadata='FULL';" + + # Restart the SQL server + dolt sql-server --host 0.0.0.0 --port=$PORT --loglevel=trace & + SERVER_PID=$! + sleep 1 # Give the server a chance to start + + /build/bin/python/python-replication-test $USER $PORT $REPO_NAME +} + @test "java mysql-connector-j" { java -jar /build/bin/java/mysql-connector-test.jar $USER $PORT $REPO_NAME } diff --git a/integration-tests/mysql-client-tests/python/python-replication-test.py b/integration-tests/mysql-client-tests/python/python-replication-test.py new file mode 100644 index 00000000000..8b38f67005f --- /dev/null +++ b/integration-tests/mysql-client-tests/python/python-replication-test.py @@ -0,0 +1,131 @@ +import pymysql +import sys +import time +from typing import List, Optional, Iterable, Type, TypeVar +from pymysqlreplication import BinLogStreamReader +from pymysqlreplication.event import ( + RotateEvent, + FormatDescriptionEvent, + GtidEvent, + QueryEvent, + HeartbeatLogEvent, +) +from pymysqlreplication.row_event import WriteRowsEvent + +T = TypeVar("T") + +def main(): + user = sys.argv[1] + port = int(sys.argv[2]) + db = sys.argv[3] + + # Connect to the running SQL server to create some test data + connection = pymysql.connect(host="127.0.0.1", + port=port, + user=user, + db=db) + with connection.cursor() as cursor: + cursor.execute("CREATE TABLE t (pk int primary key, c1 varchar(100));") + cursor.execute("INSERT INTO t VALUES (1, 'foobarbazbar');") + + # Connect to a replication event stream + mysql_settings = {'host': '127.0.0.1', 'port': port, 'user': user, 'passwd': ''} + stream = BinLogStreamReader(connection_settings=mysql_settings, blocking=True, server_id=100, auto_position='8e66e4f4-955a-4844-909a-33d79f78ddba:1') + + # Grab the first 10 events + events = read_events(stream, n=10, timeout_s=15.0) + + # To help debugging a failed test, print out the event data + for event in events: + event.dump() + + assert_correct_events(events) + stream.close() + sys.exit(0) + + +def assert_correct_events(events): + """ + Checks the events for expected events and values and raises an AssertionError if unexpected results are found. + """ + # Assert that a RotateEvent is present and indicates that binlog-main.000001 is the next file + rotateEvent = require_event(events, RotateEvent) + next_file = getattr(rotateEvent, "next_binlog", None) or getattr(rotateEvent, "next_binlog_file", None) + assert next_file == "binlog-main.000001", f"RotateEvent next binlog expected binlog-main.000001, got {next_file!r}" + + # Assert that a FormatDescriptionEvent is present and contains the expected binlog version number + formatDescriptionEvent = require_event(events, FormatDescriptionEvent) + assert getattr(formatDescriptionEvent, "binlog_version", None) == (4,), f"binlog_version expected 4, got {getattr(formatDescriptionEvent,'binlog_version',None)}" + + # Assert that the QueryEvent for the CREATE TABLE statement is present + queryEvent = require_event(events, QueryEvent) + query = getattr(queryEvent, "query", None) or getattr(queryEvent, "query_text", None) + assert query is not None, "QueryEvent query field was None" + if isinstance(query, bytes): + query = query.decode("utf-8", errors="replace") + assert "CREATE TABLE" in query, f"Query did not contain CREATE TABLE: {query!r}" + assert "`t`" in query or " t " in query, f"Query did not appear to create table t: {query!r}" + + # Assert that a WriteRowsEvent is present with the correct row values, and that column names + # can be used to access the fields (e.g. @@binlog_row_metadata=FULL is enabled and working). + writeRowsEvent = require_event(events, WriteRowsEvent) + row0 = writeRowsEvent.rows[0] + values = row0.get("values") if isinstance(row0, dict) else getattr(row0, "values", None) + assert isinstance(values, dict), f"Expected row values dict, got {type(values).__name__}" + assert "pk" in values, f"Missing column 'pk'. Got keys: {sorted(values.keys())}" + assert values["pk"] == 1, f"pk expected 1, got {values['pk']!r}" + assert "c1" in values, f"Missing column 'c1'. Got keys: {sorted(values.keys())}" + assert values["c1"] == "foobarbazbar", f"c1 expected 'foobarbazbar', got {values['c1']!r}" + +def read_events(stream, n: int = 10, timeout_s: float = 30.0) -> List[object]: + """ + Read up to `n` non-heartbeat events from `stream`, or stop after `timeout_s`. + Returns the list of collected events (possibly fewer than n). + """ + events: List[object] = [] + deadline = time.monotonic() + timeout_s + + try: + for ev in stream: + # Timeout check first so we don't hang forever + if time.monotonic() >= deadline: + break + + # Skip heartbeats (optional: count them, log them, etc.) + if isinstance(ev, HeartbeatLogEvent): + continue + + events.append(ev) + if len(events) >= n: + break + finally: + # Important: stop network/background reading to avoid hangs/leaks + try: + stream.close() + except Exception: + pass + + return events + +def find_event(events: Iterable[object], event_type: Type[T]) -> Optional[T]: + """ + Return the first event in `events` that is an instance of `event_type`, + or None if no such event exists. + """ + for event in events: + if isinstance(event, event_type): + return event + return None + +def require_event(events: Iterable[object], event_type: Type[T]) -> T: + """ + Return the first event in `events` that is an instance of `event_type`, + or raise an AssertionError if none is found. + """ + event = find_event(events, event_type) + if event is None: + raise AssertionError(f"Expected event of type {event_type.__name__}, but none was found") + return event + + +main() \ No newline at end of file