Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parse NULL-bitmask in table map event #361

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pymysqlreplication/row_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,8 +648,9 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
self.table_obj = Table(self.column_schemas, self.table_id, self.schema,
self.table, self.columns)

# TODO: get this information instead of trashing data
# n NULL-bitmask, length: (column-length * 8) / 7
# ith column is nullable if (i - 1)th bit is set to True, not nullable otherwise
## Refer to definition of and call to row.event.__is_null() to interpret bitmap corresponding to columns
self.null_bitmask = self.packet.read((self.column_count + 7) / 8)

def get_table(self):
return self.table_obj
Expand Down
91 changes: 87 additions & 4 deletions pymysqlreplication/tests/test_data_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,30 @@ def create_table(self, create_query):

return event

def create_and_get_tablemap_event(self, bit):
"""Create table and return tablemap event

Returns:
Table map event
"""
self.execute(create_query)
self.execute(insert_query)
self.execute("COMMIT")

self.assertIsInstance(self.stream.fetchone(), RotateEvent)
self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)
#QueryEvent for the Create Table
self.assertIsInstance(self.stream.fetchone(), QueryEvent)

#QueryEvent for the BEGIN
self.assertIsInstance(self.stream.fetchone(), QueryEvent)

event = self.stream.fetchone()

self.assertEqual(event.event_type, TABLE_MAP_EVENT)

return event

def test_decimal(self):
create_query = "CREATE TABLE test (test DECIMAL(2,1))"
insert_query = "INSERT INTO test VALUES(4.2)"
Expand Down Expand Up @@ -669,16 +693,75 @@ def test_status_vars(self):
Note that if you change default db name 'pymysqlreplication_test',
event.mts_accessed_db_names MUST be asserted against the changed db name.

Returns:
binary string parsed from __data_buffer

Raises:
AssertionError: if no
AssertionError: if status variables not set correctly
"""
create_query = "CREATE TABLE test (id INTEGER)"
event = self.create_table(create_query)
self.assertEqual(event.catalog_nz_code, b'std')
self.assertEqual(event.mts_accessed_db_names, [b'pymysqlreplication_test'])

def test_null_bitmask(self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def test_null_bitmask(self):

"""Test parse of null-bitmask in table map events

Create table with 16 columns with nullability specified by 'bit_mask' variable
'bit_mask' variable is asserted against null_bitmask attribute in table map event.

Raises:
AssertionError: if null_bitmask isn't set as specified in 'bit_mask' variable
"""

# any 2-byte bitmask in little-endian hex bytes format (b'a\x03')
## b'a\x03' = 1101100001(2)
bit_mask = b'a\x03'

# Prepare create_query
create_query = "CREATE TABLE test"

columns = []
for i in range(16):
# column_definition consists of...
## column name, column type, nullability
column_definition = []

column_name = chr(ord('a') + i)
column_definition.append(column_name)

column_type = "INT"
column_definition.append(column_type)

nullability = "NOT NULL" if not RowsEvent.__is_null(bit_mask, i) else ""
column_definition.append(nullability)

columns.append(" ".join(column_definition))

create_query += f' ({", ".join(columns)})'

# Prepare insert_query
insert_query = "INSERT into test values"

values = []
for i in range(16):
values.append('0')

insert_query += f' ({",".join(values)})')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

insert_query += f' ({",".join(values)})'

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the comment. 👍👍
I will have these fixed in a separate PR.


self.execute(create_query)
self.execute(insert_query)
self.execute("COMMIT")

self.assertIsInstance(self.stream.fetchone(), RotateEvent)
self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)
#QueryEvent for the Create Table
self.assertIsInstance(self.stream.fetchone(), QueryEvent)

#QueryEvent for the BEGIN
self.assertIsInstance(self.stream.fetchone(), QueryEvent)

event = self.stream.fetchone()

self.assertEqual(event.event_type, TABLE_MAP_EVENT)
self.assertEqual(event.null_bitmask, bit_mask)

if __name__ == "__main__":
unittest.main()