Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions gcloud/bigtable/row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,44 @@ def _handle_row_contents(self, chunk):
column_cells = column_family_dict.setdefault(column_name, [])
column_cells.extend(cells)

def update_from_read_rows(self, read_rows_response_pb):
"""Updates the current row from a ``ReadRows`` response.

:type read_rows_response_pb:
:class:`._generated.bigtable_service_messages_pb2.ReadRowsResponse`
:param read_rows_response_pb: A response streamed back as part of a
``ReadRows`` request.

:raises: :class:`ValueError <exceptions.ValueError>` if the current
partial row has already been committed, if the row key on the
response doesn't match the current one or if there is a chunk
encountered with an unexpected ``ONEOF`` protobuf property.
"""
if self._committed:
raise ValueError('The row has been committed')

if read_rows_response_pb.row_key != self.row_key:
raise ValueError('Response row key (%r) does not match current '
'one (%r).' % (read_rows_response_pb.row_key,
self.row_key))

last_chunk_index = len(read_rows_response_pb.chunks) - 1
for index, chunk in enumerate(read_rows_response_pb.chunks):
chunk_property = chunk.WhichOneof('chunk')
if chunk_property == 'row_contents':
self._handle_row_contents(chunk)
elif chunk_property == 'reset_row':
self._handle_reset_row(chunk)
elif chunk_property == 'commit_row':
self._handle_commit_row(chunk, index, last_chunk_index)
else:
# NOTE: This includes chunk_property == None since we always
# want a value to be set
raise ValueError('Unexpected chunk property: %s' % (
chunk_property,))

self._chunks_encountered = True

This comment was marked as spam.

This comment was marked as spam.



class PartialRowsData(object):
"""Convenience wrapper for consuming a ``ReadRows`` streaming response.
Expand Down
83 changes: 83 additions & 0 deletions gcloud/bigtable/test_row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,89 @@ def test__handle_row_contents(self):
}
self.assertEqual(partial_row_data.cells, expected_cells)

def test_update_from_read_rows(self):
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)

row_key = b'row-key'
partial_row_data = self._makeOne(row_key)

# Set-up chunk1, some data that will be reset by chunk2.
ignored_family_name = u'ignore-name'
row_contents = data_pb2.Family(name=ignored_family_name)
chunk1 = messages_pb2.ReadRowsResponse.Chunk(row_contents=row_contents)

# Set-up chunk2, a reset row.
chunk2 = messages_pb2.ReadRowsResponse.Chunk(reset_row=True)

# Set-up chunk3, a column family with no columns.
family_name = u'name'
row_contents = data_pb2.Family(name=family_name)
chunk3 = messages_pb2.ReadRowsResponse.Chunk(row_contents=row_contents)

# Set-up chunk4, a commit row.
chunk4 = messages_pb2.ReadRowsResponse.Chunk(commit_row=True)

# Prepare request and make sure PartialRowData is empty before.
read_rows_response_pb = messages_pb2.ReadRowsResponse(
row_key=row_key, chunks=[chunk1, chunk2, chunk3, chunk4])
self.assertEqual(partial_row_data.cells, {})
self.assertFalse(partial_row_data.committed)
self.assertFalse(partial_row_data._chunks_encountered)

# Parse the response and make sure the cells took place.
partial_row_data.update_from_read_rows(read_rows_response_pb)
self.assertEqual(partial_row_data.cells, {family_name: {}})
self.assertFalse(ignored_family_name in partial_row_data.cells)
self.assertTrue(partial_row_data.committed)
self.assertTrue(partial_row_data._chunks_encountered)

def test_update_from_read_rows_while_committed(self):
partial_row_data = self._makeOne(None)
partial_row_data._committed = True
self.assertFalse(partial_row_data._chunks_encountered)

with self.assertRaises(ValueError):
partial_row_data.update_from_read_rows(None)

self.assertFalse(partial_row_data._chunks_encountered)

def test_update_from_read_rows_row_key_disagree(self):
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)

row_key1 = b'row-key1'
row_key2 = b'row-key2'
partial_row_data = self._makeOne(row_key1)
self.assertFalse(partial_row_data._chunks_encountered)

self.assertNotEqual(row_key1, row_key2)
read_rows_response_pb = messages_pb2.ReadRowsResponse(row_key=row_key2)
with self.assertRaises(ValueError):
partial_row_data.update_from_read_rows(read_rows_response_pb)

self.assertFalse(partial_row_data._chunks_encountered)

def test_update_from_read_rows_empty_chunk(self):
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)

row_key = b'row-key'
partial_row_data = self._makeOne(row_key)
self.assertFalse(partial_row_data._chunks_encountered)

chunk = messages_pb2.ReadRowsResponse.Chunk()
read_rows_response_pb = messages_pb2.ReadRowsResponse(
row_key=row_key, chunks=[chunk])

# This makes it an "empty" chunk.
self.assertEqual(chunk.WhichOneof('chunk'), None)
with self.assertRaises(ValueError):
partial_row_data.update_from_read_rows(read_rows_response_pb)

self.assertFalse(partial_row_data._chunks_encountered)


class TestPartialRowsData(unittest2.TestCase):

Expand Down