Skip to content

Commit

Permalink
Parse NULL-bitmask in table map event (#361)
Browse files Browse the repository at this point in the history
  • Loading branch information
dongwook-chan authored Oct 18, 2021
1 parent cf314b2 commit f028364
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 6 deletions.
5 changes: 3 additions & 2 deletions pymysqlreplication/row_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,8 +648,9 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
self.table_obj = Table(self.column_schemas, self.table_id, self.schema,
self.table, self.columns)

# TODO: get this information instead of trashing data
# n NULL-bitmask, length: (column-length * 8) / 7
# ith column is nullable if (i - 1)th bit is set to True, not nullable otherwise
## Refer to definition of and call to row.event.__is_null() to interpret bitmap corresponding to columns
self.null_bitmask = self.packet.read((self.column_count + 7) / 8)

def get_table(self):
return self.table_obj
Expand Down
91 changes: 87 additions & 4 deletions pymysqlreplication/tests/test_data_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,30 @@ def create_table(self, create_query):

return event

def create_and_get_tablemap_event(self, bit):
"""Create table and return tablemap event
Returns:
Table map event
"""
self.execute(create_query)
self.execute(insert_query)
self.execute("COMMIT")

self.assertIsInstance(self.stream.fetchone(), RotateEvent)
self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)
#QueryEvent for the Create Table
self.assertIsInstance(self.stream.fetchone(), QueryEvent)

#QueryEvent for the BEGIN
self.assertIsInstance(self.stream.fetchone(), QueryEvent)

event = self.stream.fetchone()

self.assertEqual(event.event_type, TABLE_MAP_EVENT)

return event

def test_decimal(self):
create_query = "CREATE TABLE test (test DECIMAL(2,1))"
insert_query = "INSERT INTO test VALUES(4.2)"
Expand Down Expand Up @@ -669,16 +693,75 @@ def test_status_vars(self):
Note that if you change default db name 'pymysqlreplication_test',
event.mts_accessed_db_names MUST be asserted against the changed db name.
Returns:
binary string parsed from __data_buffer
Raises:
AssertionError: if no
AssertionError: if status variables not set correctly
"""
create_query = "CREATE TABLE test (id INTEGER)"
event = self.create_table(create_query)
self.assertEqual(event.catalog_nz_code, b'std')
self.assertEqual(event.mts_accessed_db_names, [b'pymysqlreplication_test'])

def test_null_bitmask(self)
"""Test parse of null-bitmask in table map events
Create table with 16 columns with nullability specified by 'bit_mask' variable
'bit_mask' variable is asserted against null_bitmask attribute in table map event.
Raises:
AssertionError: if null_bitmask isn't set as specified in 'bit_mask' variable
"""

# any 2-byte bitmask in little-endian hex bytes format (b'a\x03')
## b'a\x03' = 1101100001(2)
bit_mask = b'a\x03'

# Prepare create_query
create_query = "CREATE TABLE test"

columns = []
for i in range(16):
# column_definition consists of...
## column name, column type, nullability
column_definition = []

column_name = chr(ord('a') + i)
column_definition.append(column_name)

column_type = "INT"
column_definition.append(column_type)

nullability = "NOT NULL" if not RowsEvent.__is_null(bit_mask, i) else ""
column_definition.append(nullability)

columns.append(" ".join(column_definition))

create_query += f' ({", ".join(columns)})'

# Prepare insert_query
insert_query = "INSERT into test values"

values = []
for i in range(16):
values.append('0')

insert_query += f' ({",".join(values)})')

self.execute(create_query)
self.execute(insert_query)
self.execute("COMMIT")

self.assertIsInstance(self.stream.fetchone(), RotateEvent)
self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)
#QueryEvent for the Create Table
self.assertIsInstance(self.stream.fetchone(), QueryEvent)

#QueryEvent for the BEGIN
self.assertIsInstance(self.stream.fetchone(), QueryEvent)

event = self.stream.fetchone()

self.assertEqual(event.event_type, TABLE_MAP_EVENT)
self.assertEqual(event.null_bitmask, bit_mask)

if __name__ == "__main__":
unittest.main()

0 comments on commit f028364

Please sign in to comment.