From 4487827cd16bc22f7f115708ec0ad653885e62ff Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Thu, 18 Feb 2016 11:45:18 -0800 Subject: [PATCH] Adding Bigtable RowResponse parsing helpers. --- gcloud/bigtable/row_data.py | 61 ++++++++++++++++++++++ gcloud/bigtable/test_row_data.py | 89 ++++++++++++++++++++++++++++++++ 2 files changed, 150 insertions(+) diff --git a/gcloud/bigtable/row_data.py b/gcloud/bigtable/row_data.py index e8c785c47dd8..5a635f1cf7af 100644 --- a/gcloud/bigtable/row_data.py +++ b/gcloud/bigtable/row_data.py @@ -147,6 +147,67 @@ def clear(self): self._chunks_encountered = False self._cells.clear() + def _handle_commit_row(self, chunk, index, last_chunk_index): + """Handles a ``commit_row`` chunk. + + :type chunk: ``ReadRowsResponse.Chunk`` + :param chunk: The chunk being handled. + + :type index: int + :param index: The current index of the chunk. + + :type last_chunk_index: int + :param last_chunk_index: The index of the last chunk. + + :raises: :class:`ValueError ` if the value of + ``commit_row`` is :data:`False` or if the chunk passed is not + the last chunk in a response. + """ + # NOTE: We assume the caller has checked that the ``ONEOF`` property + # for ``chunk`` is ``commit_row``. + if not chunk.commit_row: + raise ValueError('Received commit_row that was False.') + + if index != last_chunk_index: + raise ValueError('Commit row chunk was not the last chunk') + else: + self._committed = True + + def _handle_reset_row(self, chunk): + """Handles a ``reset_row`` chunk. + + :type chunk: ``ReadRowsResponse.Chunk`` + :param chunk: The chunk being handled. + + :raises: :class:`ValueError ` if the value of + ``reset_row`` is :data:`False` + """ + # NOTE: We assume the caller has checked that the ``ONEOF`` property + # for ``chunk`` is ``reset_row``. + if not chunk.reset_row: + raise ValueError('Received reset_row that was False.') + + self.clear() + + def _handle_row_contents(self, chunk): + """Handles a ``row_contents`` chunk. + + :type chunk: ``ReadRowsResponse.Chunk`` + :param chunk: The chunk being handled. + """ + # NOTE: We assume the caller has checked that the ``ONEOF`` property + # for ``chunk`` is ``row_contents``. + + # chunk.row_contents is ._generated.bigtable_data_pb2.Family + column_family_id = chunk.row_contents.name + column_family_dict = self._cells.setdefault(column_family_id, {}) + for column in chunk.row_contents.columns: + cells = [Cell.from_pb(cell) for cell in column.cells] + + column_name = column.qualifier + column_cells = column_family_dict.setdefault(column_name, []) + column_cells.extend(cells) + class PartialRowsData(object): """Convenience wrapper for consuming a ``ReadRows`` streaming response. diff --git a/gcloud/bigtable/test_row_data.py b/gcloud/bigtable/test_row_data.py index 9eb9f7dbfa13..6b3d21739598 100644 --- a/gcloud/bigtable/test_row_data.py +++ b/gcloud/bigtable/test_row_data.py @@ -207,6 +207,95 @@ def test_clear(self): self.assertFalse(partial_row_data._chunks_encountered) self.assertEqual(partial_row_data.cells, {}) + def test__handle_commit_row(self): + from gcloud.bigtable._generated import ( + bigtable_service_messages_pb2 as messages_pb2) + + partial_row_data = self._makeOne(None) + chunk = messages_pb2.ReadRowsResponse.Chunk(commit_row=True) + + index = last_chunk_index = 1 + self.assertFalse(partial_row_data.committed) + partial_row_data._handle_commit_row(chunk, index, last_chunk_index) + self.assertTrue(partial_row_data.committed) + + def test__handle_commit_row_false(self): + from gcloud.bigtable._generated import ( + bigtable_service_messages_pb2 as messages_pb2) + + partial_row_data = self._makeOne(None) + chunk = messages_pb2.ReadRowsResponse.Chunk(commit_row=False) + + with self.assertRaises(ValueError): + partial_row_data._handle_commit_row(chunk, None, None) + + def test__handle_commit_row_not_last_chunk(self): + from gcloud.bigtable._generated import ( + bigtable_service_messages_pb2 as messages_pb2) + + partial_row_data = self._makeOne(None) + chunk = messages_pb2.ReadRowsResponse.Chunk(commit_row=True) + + with self.assertRaises(ValueError): + index = 0 + last_chunk_index = 1 + self.assertNotEqual(index, last_chunk_index) + partial_row_data._handle_commit_row(chunk, index, last_chunk_index) + + def test__handle_reset_row(self): + from gcloud.bigtable._generated import ( + bigtable_service_messages_pb2 as messages_pb2) + + partial_row_data = self._makeOne(None) + chunk = messages_pb2.ReadRowsResponse.Chunk(reset_row=True) + + # Modify the PartialRowData object so we can check it's been cleared. + partial_row_data._cells = {1: 2} + partial_row_data._committed = True + partial_row_data._handle_reset_row(chunk) + self.assertEqual(partial_row_data.cells, {}) + self.assertFalse(partial_row_data.committed) + + def test__handle_reset_row_failure(self): + from gcloud.bigtable._generated import ( + bigtable_service_messages_pb2 as messages_pb2) + + partial_row_data = self._makeOne(None) + chunk = messages_pb2.ReadRowsResponse.Chunk(reset_row=False) + + with self.assertRaises(ValueError): + partial_row_data._handle_reset_row(chunk) + + def test__handle_row_contents(self): + from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2 + from gcloud.bigtable._generated import ( + bigtable_service_messages_pb2 as messages_pb2) + from gcloud.bigtable.row_data import Cell + + partial_row_data = self._makeOne(None) + cell1_pb = data_pb2.Cell(timestamp_micros=1, value=b'val1') + cell2_pb = data_pb2.Cell(timestamp_micros=200, value=b'val2') + cell3_pb = data_pb2.Cell(timestamp_micros=300000, value=b'val3') + col1 = b'col1' + col2 = b'col2' + columns = [ + data_pb2.Column(qualifier=col1, cells=[cell1_pb, cell2_pb]), + data_pb2.Column(qualifier=col2, cells=[cell3_pb]), + ] + family_name = u'name' + row_contents = data_pb2.Family(name=family_name, columns=columns) + chunk = messages_pb2.ReadRowsResponse.Chunk(row_contents=row_contents) + + self.assertEqual(partial_row_data.cells, {}) + partial_row_data._handle_row_contents(chunk) + expected_cells = { + family_name: { + col1: [Cell.from_pb(cell1_pb), Cell.from_pb(cell2_pb)], + col2: [Cell.from_pb(cell3_pb)], + } + } + self.assertEqual(partial_row_data.cells, expected_cells) + class TestPartialRowsData(unittest2.TestCase):