From 30d57fb99995b06eae94589b8cc60c5ab70efba4 Mon Sep 17 00:00:00 2001 From: dongwook-chan Date: Sat, 16 Oct 2021 15:21:00 +0900 Subject: [PATCH] Parse NULL-bitmask in table map event [Fields in table map event](https://dev.mysql.com/doc/internals/en/table-map-event.html) [buffer layout for table map event](https://github.com/mysql/mysql-server/blob/beb865a960b9a8a16cf999c323e46c5b0c67f21f/libbinlogevents/include/rows_event.h#L633-L639) [null bitmask in mysql-server](https://github.com/mysql/mysql-server/blob/beb865a960b9a8a16cf999c323e46c5b0c67f21f/libbinlogevents/src/rows_event.cpp#L93) --- pymysqlreplication/row_event.py | 5 +- pymysqlreplication/tests/test_data_type.py | 91 +++++++++++++++++++++- 2 files changed, 90 insertions(+), 6 deletions(-) diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index d616f32a..a2e83f3e 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -648,8 +648,9 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.table_obj = Table(self.column_schemas, self.table_id, self.schema, self.table, self.columns) - # TODO: get this information instead of trashing data - # n NULL-bitmask, length: (column-length * 8) / 7 + # ith column is nullable if (i - 1)th bit is set to True, not nullable otherwise + ## Refer to definition of and call to row.event.__is_null() to interpret bitmap corresponding to columns + self.null_bitmask = self.packet.read((self.column_count + 7) / 8) def get_table(self): return self.table_obj diff --git a/pymysqlreplication/tests/test_data_type.py b/pymysqlreplication/tests/test_data_type.py index 37873444..2fca96f0 100644 --- a/pymysqlreplication/tests/test_data_type.py +++ b/pymysqlreplication/tests/test_data_type.py @@ -77,6 +77,30 @@ def create_table(self, create_query): return event + def create_and_get_tablemap_event(self, bit): + """Create table and return tablemap event + + Returns: + Table map event + """ + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + self.assertIsInstance(self.stream.fetchone(), RotateEvent) + self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) + #QueryEvent for the Create Table + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + + #QueryEvent for the BEGIN + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + + event = self.stream.fetchone() + + self.assertEqual(event.event_type, TABLE_MAP_EVENT) + + return event + def test_decimal(self): create_query = "CREATE TABLE test (test DECIMAL(2,1))" insert_query = "INSERT INTO test VALUES(4.2)" @@ -669,16 +693,75 @@ def test_status_vars(self): Note that if you change default db name 'pymysqlreplication_test', event.mts_accessed_db_names MUST be asserted against the changed db name. - Returns: - binary string parsed from __data_buffer - Raises: - AssertionError: if no + AssertionError: if status variables not set correctly """ create_query = "CREATE TABLE test (id INTEGER)" event = self.create_table(create_query) self.assertEqual(event.catalog_nz_code, b'std') self.assertEqual(event.mts_accessed_db_names, [b'pymysqlreplication_test']) + def test_null_bitmask(self) + """Test parse of null-bitmask in table map events + + Create table with 16 columns with nullability specified by 'bit_mask' variable + 'bit_mask' variable is asserted against null_bitmask attribute in table map event. + + Raises: + AssertionError: if null_bitmask isn't set as specified in 'bit_mask' variable + """ + + # any 2-byte bitmask in little-endian hex bytes format (b'a\x03') + ## b'a\x03' = 1101100001(2) + bit_mask = b'a\x03' + + # Prepare create_query + create_query = "CREATE TABLE test" + + columns = [] + for i in range(16): + # column_definition consists of... + ## column name, column type, nullability + column_definition = [] + + column_name = chr(ord('a') + i) + column_definition.append(column_name) + + column_type = "INT" + column_definition.append(column_type) + + nullability = "NOT NULL" if not RowsEvent.__is_null(bit_mask, i) else "" + column_definition.append(nullability) + + columns.append(" ".join(column_definition)) + + create_query += f' ({", ".join(columns)})' + + # Prepare insert_query + insert_query = "INSERT into test values" + + values = [] + for i in range(16): + values.append('0') + + insert_query += f' ({",".join(values)})') + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + self.assertIsInstance(self.stream.fetchone(), RotateEvent) + self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) + #QueryEvent for the Create Table + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + + #QueryEvent for the BEGIN + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + + event = self.stream.fetchone() + + self.assertEqual(event.event_type, TABLE_MAP_EVENT) + self.assertEqual(event.null_bitmask, bit_mask) + if __name__ == "__main__": unittest.main()