diff --git a/bigtable/google/cloud/bigtable/retry.py b/bigtable/google/cloud/bigtable/retry.py new file mode 100644 index 000000000000..f20419ce4f8e --- /dev/null +++ b/bigtable/google/cloud/bigtable/retry.py @@ -0,0 +1,169 @@ +"""Provides function wrappers that implement retrying.""" +import random +import time +import six +import sys + +from google.cloud._helpers import _to_bytes +from google.cloud.bigtable._generated import ( + bigtable_pb2 as data_messages_v2_pb2) +from google.gax import config, errors +from grpc import RpcError + + +_MILLIS_PER_SECOND = 1000 + + +class ReadRowsIterator(object): + """Creates an iterator equivalent to a_iter, but that retries on certain + exceptions. + """ + + def __init__(self, client, name, start_key, end_key, filter_, limit, + retry_options, **kwargs): + self.client = client + self.retry_options = retry_options + self.name = name + self.start_key = start_key + self.start_key_closed = True + self.end_key = end_key + self.filter_ = filter_ + self.limit = limit + self.delay_mult = retry_options.backoff_settings.retry_delay_multiplier + self.max_delay_millis = \ + retry_options.backoff_settings.max_retry_delay_millis + self.timeout_mult = \ + retry_options.backoff_settings.rpc_timeout_multiplier + self.max_timeout = \ + (retry_options.backoff_settings.max_rpc_timeout_millis / + _MILLIS_PER_SECOND) + self.total_timeout = \ + (retry_options.backoff_settings.total_timeout_millis / + _MILLIS_PER_SECOND) + self.set_stream() + + def set_start_key(self, start_key): + """ + Sets the row key at which this iterator will begin reading. + """ + self.start_key = start_key + self.start_key_closed = False + + def set_stream(self): + """ + Resets the read stream by making an RPC on the 'ReadRows' endpoint. + """ + req_pb = _create_row_request(self.name, start_key=self.start_key, + start_key_closed=self.start_key_closed, + end_key=self.end_key, + filter_=self.filter_, limit=self.limit) + self.stream = self.client._data_stub.ReadRows(req_pb) + + def next(self, *args, **kwargs): + """ + Read and return the next row from the stream. + Retry on idempotent failure. + """ + delay = self.retry_options.backoff_settings.initial_retry_delay_millis + exc = errors.RetryError('Retry total timeout exceeded before any' + 'response was received') + timeout = (self.retry_options.backoff_settings + .initial_rpc_timeout_millis / + _MILLIS_PER_SECOND) + + now = time.time() + deadline = now + self.total_timeout + while deadline is None or now < deadline: + try: + return six.next(self.stream) + except StopIteration as stop: + raise stop + except RpcError as error: # pylint: disable=broad-except + code = config.exc_to_code(error) + if code not in self.retry_options.retry_codes: + six.reraise(type(error), error) + + # pylint: disable=redefined-variable-type + exc = errors.RetryError( + 'Retry total timeout exceeded with exception', error) + + # Sleep a random number which will, on average, equal the + # expected delay. + to_sleep = random.uniform(0, delay * 2) + time.sleep(to_sleep / _MILLIS_PER_SECOND) + delay = min(delay * self.delay_mult, self.max_delay_millis) + now = time.time() + timeout = min( + timeout * self.timeout_mult, self.max_timeout, + deadline - now) + self.set_stream() + + six.reraise(errors.RetryError, exc, sys.exc_info()[2]) + + def __next__(self, *args, **kwargs): + return self.next(*args, **kwargs) + + +def _create_row_request(table_name, row_key=None, start_key=None, + start_key_closed=True, end_key=None, filter_=None, + limit=None): + """Creates a request to read rows in a table. + + :type table_name: str + :param table_name: The name of the table to read from. + + :type row_key: bytes + :param row_key: (Optional) The key of a specific row to read from. + + :type start_key: bytes + :param start_key: (Optional) The beginning of a range of row keys to + read from. The range will include ``start_key``. If + left empty, will be interpreted as the empty string. + + :type end_key: bytes + :param end_key: (Optional) The end of a range of row keys to read from. + The range will not include ``end_key``. If left empty, + will be interpreted as an infinite string. + + :type filter_: :class:`.RowFilter` + :param filter_: (Optional) The filter to apply to the contents of the + specified row(s). If unset, reads the entire table. + + :type limit: int + :param limit: (Optional) The read will terminate after committing to N + rows' worth of results. The default (zero) is to return + all results. + + :rtype: :class:`data_messages_v2_pb2.ReadRowsRequest` + :returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs. + :raises: :class:`ValueError ` if both + ``row_key`` and one of ``start_key`` and ``end_key`` are set + """ + request_kwargs = {'table_name': table_name} + if (row_key is not None and + (start_key is not None or end_key is not None)): + raise ValueError('Row key and row range cannot be ' + 'set simultaneously') + range_kwargs = {} + if start_key is not None or end_key is not None: + if start_key is not None: + if start_key_closed: + range_kwargs['start_key_closed'] = _to_bytes(start_key) + else: + range_kwargs['start_key_open'] = _to_bytes(start_key) + if end_key is not None: + range_kwargs['end_key_open'] = _to_bytes(end_key) + if filter_ is not None: + request_kwargs['filter'] = filter_.to_pb() + if limit is not None: + request_kwargs['rows_limit'] = limit + + message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs) + + if row_key is not None: + message.rows.row_keys.append(_to_bytes(row_key)) + + if range_kwargs: + message.rows.row_ranges.add(**range_kwargs) + + return message diff --git a/bigtable/google/cloud/bigtable/row_data.py b/bigtable/google/cloud/bigtable/row_data.py index 78179db25c4e..0849e681b7e6 100644 --- a/bigtable/google/cloud/bigtable/row_data.py +++ b/bigtable/google/cloud/bigtable/row_data.py @@ -274,6 +274,9 @@ def consume_next(self): self._validate_chunk(chunk) + if hasattr(self._response_iterator, 'set_start_key'): + self._response_iterator.set_start_key(chunk.row_key) + if chunk.reset_row: row = self._row = None cell = self._cell = self._previous_cell = None diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index 8dbf8c1ce6fb..3ed2d20ea975 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -17,7 +17,6 @@ import six -from google.cloud._helpers import _to_bytes from google.cloud.bigtable._generated import ( bigtable_pb2 as data_messages_v2_pb2) from google.cloud.bigtable._generated import ( @@ -30,6 +29,26 @@ from google.cloud.bigtable.row import ConditionalRow from google.cloud.bigtable.row import DirectRow from google.cloud.bigtable.row_data import PartialRowsData +from google.gax import RetryOptions, BackoffSettings +from google.cloud.bigtable.retry import ReadRowsIterator, _create_row_request +from grpc import StatusCode + +BACKOFF_SETTINGS = BackoffSettings( + initial_retry_delay_millis=10, + retry_delay_multiplier=1.3, + max_retry_delay_millis=30000, + initial_rpc_timeout_millis=25 * 60 * 1000, + rpc_timeout_multiplier=1.0, + max_rpc_timeout_millis=25 * 60 * 1000, + total_timeout_millis=30 * 60 * 1000 +) + +RETRY_CODES = [ + StatusCode.DEADLINE_EXCEEDED, + StatusCode.ABORTED, + StatusCode.INTERNAL, + StatusCode.UNAVAILABLE +] # Maximum number of mutations in bulk (MutateRowsRequest message): @@ -257,7 +276,7 @@ def read_row(self, row_key, filter_=None): return rows_data.rows[row_key] def read_rows(self, start_key=None, end_key=None, limit=None, - filter_=None): + filter_=None, backoff_settings=None): """Read rows from this table. :type start_key: bytes @@ -284,13 +303,18 @@ def read_rows(self, start_key=None, end_key=None, limit=None, :returns: A :class:`.PartialRowsData` convenience wrapper for consuming the streamed results. """ - request_pb = _create_row_request( - self.name, start_key=start_key, end_key=end_key, filter_=filter_, - limit=limit) client = self._instance._client - response_iterator = client._data_stub.ReadRows(request_pb) - # We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse` - return PartialRowsData(response_iterator) + if backoff_settings is None: + backoff_settings = BACKOFF_SETTINGS + RETRY_OPTIONS = RetryOptions( + retry_codes=RETRY_CODES, + backoff_settings=backoff_settings + ) + + retrying_iterator = ReadRowsIterator(client, self.name, start_key, + end_key, filter_, limit, + RETRY_OPTIONS) + return PartialRowsData(retrying_iterator) def mutate_rows(self, rows): """Mutates multiple rows in bulk. @@ -359,67 +383,6 @@ def sample_row_keys(self): return response_iterator -def _create_row_request(table_name, row_key=None, start_key=None, end_key=None, - filter_=None, limit=None): - """Creates a request to read rows in a table. - - :type table_name: str - :param table_name: The name of the table to read from. - - :type row_key: bytes - :param row_key: (Optional) The key of a specific row to read from. - - :type start_key: bytes - :param start_key: (Optional) The beginning of a range of row keys to - read from. The range will include ``start_key``. If - left empty, will be interpreted as the empty string. - - :type end_key: bytes - :param end_key: (Optional) The end of a range of row keys to read from. - The range will not include ``end_key``. If left empty, - will be interpreted as an infinite string. - - :type filter_: :class:`.RowFilter` - :param filter_: (Optional) The filter to apply to the contents of the - specified row(s). If unset, reads the entire table. - - :type limit: int - :param limit: (Optional) The read will terminate after committing to N - rows' worth of results. The default (zero) is to return - all results. - - :rtype: :class:`data_messages_v2_pb2.ReadRowsRequest` - :returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs. - :raises: :class:`ValueError ` if both - ``row_key`` and one of ``start_key`` and ``end_key`` are set - """ - request_kwargs = {'table_name': table_name} - if (row_key is not None and - (start_key is not None or end_key is not None)): - raise ValueError('Row key and row range cannot be ' - 'set simultaneously') - range_kwargs = {} - if start_key is not None or end_key is not None: - if start_key is not None: - range_kwargs['start_key_closed'] = _to_bytes(start_key) - if end_key is not None: - range_kwargs['end_key_open'] = _to_bytes(end_key) - if filter_ is not None: - request_kwargs['filter'] = filter_.to_pb() - if limit is not None: - request_kwargs['rows_limit'] = limit - - message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs) - - if row_key is not None: - message.rows.row_keys.append(_to_bytes(row_key)) - - if range_kwargs: - message.rows.row_ranges.add(**range_kwargs) - - return message - - def _mutate_rows_request(table_name, rows): """Creates a request to mutate rows in a table. diff --git a/bigtable/tests/retry_test_script.txt b/bigtable/tests/retry_test_script.txt new file mode 100644 index 000000000000..863662e897ba --- /dev/null +++ b/bigtable/tests/retry_test_script.txt @@ -0,0 +1,38 @@ +# This retry script is processed by the retry server and the client under test. +# Client tests should parse any command beginning with "CLIENT:", send the corresponding RPC +# to the retry server and expect a valid response. +# "EXPECT" commands indicate the call the server is expecting the client to send. +# +# The retry server has one table named "table" that should be used for testing. +# There are three types of commands supported: +# READ +# Expect the corresponding rows to be returned with arbitrary values. +# SCAN ... +# Ranges are expressed as an interval with either open or closed start and end, +# such as [1,3) for "1,2" or (1, 3] for "2,3". +# WRITE +# All writes should succeed eventually. Value payload is ignored. +# The server writes PASS or FAIL on a line by itself to STDOUT depending on the result of the test. +# All other server output should be ignored. + +# Echo same scan back after immediate error +CLIENT: SCAN [r1,r3) r1,r2 +EXPECT: SCAN [r1,r3) +SERVER: ERROR Unavailable +EXPECT: SCAN [r1,r3) +SERVER: READ_RESPONSE r1,r2 + +# Retry scans with open interval starting at the least read row key. +# Instead of using open intervals for retry ranges, '\x00' can be +# appended to the last received row key and sent in a closed interval. +CLIENT: SCAN [r1,r9) r1,r2,r3,r4,r5,r6,r7,r8 +EXPECT: SCAN [r1,r9) +SERVER: READ_RESPONSE r1,r2,r3,r4 +SERVER: ERROR Unavailable +EXPECT: SCAN (r4,r9) +SERVER: ERROR Unavailable +EXPECT: SCAN (r4,r9) +SERVER: READ_RESPONSE r5,r6,r7 +SERVER: ERROR Unavailable +EXPECT: SCAN (r7,r9) +SERVER: READ_RESPONSE r8 diff --git a/bigtable/tests/system.py b/bigtable/tests/system.py index 1fcda808db39..5a5b4324cbbe 100644 --- a/bigtable/tests/system.py +++ b/bigtable/tests/system.py @@ -295,6 +295,84 @@ def test_delete_column_family(self): # Make sure we have successfully deleted it. self.assertEqual(temp_table.list_column_families(), {}) + def test_retry(self): + import subprocess, os, stat, platform + from google.cloud.bigtable.client import Client + from google.cloud.bigtable.instance import Instance + from google.cloud.bigtable.table import Table + + # import for urlopen based on version + try: + # python 3 + from urllib.request import urlopen + except ImportError: + # python 2 + from urllib2 import urlopen + + + TEST_SCRIPT = 'tests/retry_test_script.txt' + SERVER_NAME = 'retry_server' + SERVER_ZIP = SERVER_NAME + ".tar.gz" + + def process_scan(table, range, ids): + range_chunks = range.split(",") + range_open = range_chunks[0].lstrip("[") + range_close = range_chunks[1].rstrip(")") + rows = table.read_rows(range_open, range_close) + rows.consume_all() + + # Download server + MOCK_SERVER_URLS = { + 'Linux': 'https://storage.googleapis.com/cloud-bigtable-test/retries/retry_server_linux.tar.gz', + 'Darwin': 'https://storage.googleapis.com/cloud-bigtable-test/retries/retry_server_mac.tar.gz', + } + + test_platform = platform.system() + if test_platform not in MOCK_SERVER_URLS: + self.skip('Retry server not available for platform {0}.'.format(test_platform)) + + mock_server_download = urlopen(MOCK_SERVER_URLS[test_platform]).read() + mock_server_file = open(SERVER_ZIP, 'wb') + mock_server_file.write(mock_server_download) + + # Unzip server + subprocess.call(['tar', 'zxvf', SERVER_ZIP, '-C', '.']) + + # Connect to server + server = subprocess.Popen( + ['./' + SERVER_NAME, '--script=' + TEST_SCRIPT], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + (endpoint, port) = server.stdout.readline().rstrip("\n").split(":") + os.environ["BIGTABLE_EMULATOR_HOST"] = endpoint + ":" + port + client = Client(project="client", admin=True) + instance = Instance("instance", client) + table = instance.table("table") + + # Run test, line by line + with open(TEST_SCRIPT, 'r') as script: + for line in script.readlines(): + if line.startswith("CLIENT:"): + chunks = line.split(" ") + op = chunks[1] + process_scan(table, chunks[2], chunks[3]) + + # Check that the test passed + server.kill() + server_stdout_lines = [] + while True: + line = server.stdout.readline() + if line != '': + server_stdout_lines.append(line) + else: + break + self.assertEqual(server_stdout_lines[-1], "PASS\n") + + # Clean up + os.remove(SERVER_ZIP) + os.remove(SERVER_NAME) class TestDataAPI(unittest.TestCase): diff --git a/bigtable/tests/unit/_testing.py b/bigtable/tests/unit/_testing.py index e67af6a1498c..7587c66c133b 100644 --- a/bigtable/tests/unit/_testing.py +++ b/bigtable/tests/unit/_testing.py @@ -14,7 +14,6 @@ """Mocks used to emulate gRPC generated objects.""" - class _FakeStub(object): """Acts as a gPRC stub.""" @@ -27,6 +26,16 @@ def __getattr__(self, name): # since __getattribute__ will handle them. return _MethodMock(name, self) +class _CustomFakeStub(object): + """Acts as a gRPC stub. Generates a result using an injected callable.""" + def __init__(self, result_callable): + self.result_callable = result_callable + self.method_calls = [] + + def __getattr__(self, name): + # We need not worry about attributes set in constructor + # since __getattribute__ will handle them. + return _CustomMethodMock(name, self) class _MethodMock(object): """Mock for API method attached to a gRPC stub. @@ -42,5 +51,19 @@ def __call__(self, *args, **kwargs): """Sync method meant to mock a gRPC stub request.""" self._stub.method_calls.append((self._name, args, kwargs)) curr_result, self._stub.results = (self._stub.results[0], - self._stub.results[1:]) + self._stub.results[1:]) return curr_result + +class _CustomMethodMock(object): + """ + Same as _MethodMock, but backed by an injected callable. + """ + + def __init__(self, name, stub): + self._name = name + self._stub = stub + + def __call__(self, *args, **kwargs): + """Sync method meant to mock a gRPC stub request.""" + self._stub.method_calls.append((self._name, args, kwargs)) + return self._stub.result_callable() diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index 5867e76aff73..d985f7eb2f0f 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -493,7 +493,8 @@ def test_read_rows(self): from google.cloud._testing import _Monkey from tests.unit._testing import _FakeStub from google.cloud.bigtable.row_data import PartialRowsData - from google.cloud.bigtable import table as MUT + from google.cloud.bigtable import retry as MUT + from google.cloud.bigtable.retry import ReadRowsIterator client = _Client() instance = _Instance(self.INSTANCE_NAME, client=client) @@ -513,20 +514,18 @@ def mock_create_row_request(table_name, **kwargs): # Patch the stub used by the API method. client._data_stub = stub = _FakeStub(response_iterator) - # Create expected_result. - expected_result = PartialRowsData(response_iterator) - - # Perform the method and check the result. start_key = b'start-key' end_key = b'end-key' filter_obj = object() limit = 22 with _Monkey(MUT, _create_row_request=mock_create_row_request): + # Perform the method and check the result. result = table.read_rows( start_key=start_key, end_key=end_key, filter_=filter_obj, limit=limit) - self.assertEqual(result, expected_result) + self.assertIsInstance(result._response_iterator, ReadRowsIterator) + self.assertEqual(result._response_iterator.client, client) self.assertEqual(stub.method_calls, [( 'ReadRows', (request_pb,), @@ -537,9 +536,166 @@ def mock_create_row_request(table_name, **kwargs): 'end_key': end_key, 'filter_': filter_obj, 'limit': limit, + 'start_key_closed': True, } self.assertEqual(mock_created, [(table.name, created_kwargs)]) + def test_read_rows_one_chunk(self): + from google.cloud._testing import _Monkey + from tests.unit._testing import _FakeStub + from google.cloud.bigtable import retry as MUT + from google.cloud.bigtable.retry import ReadRowsIterator + from google.cloud.bigtable.row_data import Cell + from google.cloud.bigtable.row_data import PartialRowsData + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_one(self.TABLE_ID, instance) + + # Create request_pb + request_pb = object() # Returned by our mock. + mock_created = [] + + def mock_create_row_request(table_name, **kwargs): + mock_created.append((table_name, kwargs)) + return request_pb + + # Create response_iterator + chunk = _ReadRowsResponseCellChunkPB( + row_key=self.ROW_KEY, + family_name=self.FAMILY_NAME, + qualifier=self.QUALIFIER, + timestamp_micros=self.TIMESTAMP_MICROS, + value=self.VALUE, + commit_row=True, + ) + response_pb = _ReadRowsResponsePB(chunks=[chunk]) + response_iterator = iter([response_pb]) + + # Patch the stub used by the API method. + client._data_stub = stub = _FakeStub(response_iterator) + + start_key = b'start-key' + end_key = b'end-key' + filter_obj = object() + limit = 22 + with _Monkey(MUT, _create_row_request=mock_create_row_request): + # Perform the method and check the result. + result = table.read_rows( + start_key=start_key, end_key=end_key, filter_=filter_obj, + limit=limit) + result.consume_all() + + def test_read_rows_retry_timeout(self): + from google.cloud._testing import _Monkey + from tests.unit._testing import _CustomFakeStub + from google.cloud.bigtable.row_data import PartialRowsData + from google.cloud.bigtable import retry as MUT + from google.cloud.bigtable.retry import ReadRowsIterator + from google.gax import BackoffSettings + from google.gax.errors import RetryError + from grpc import StatusCode, RpcError + import time + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_one(self.TABLE_ID, instance) + + # Create request_pb + request_pb = object() # Returned by our mock. + mock_created = [] + + def mock_create_row_request(table_name, **kwargs): + mock_created.append((table_name, kwargs)) + return request_pb + + # Create a slow response iterator to cause a timeout + class MockTimeoutError(RpcError): + def code(self): + return StatusCode.DEADLINE_EXCEEDED + + def _wait_then_raise(): + time.sleep(0.1) + raise MockTimeoutError() + + # Patch the stub used by the API method. The stub should create a new + # slow_iterator every time its queried. + def make_slow_iterator(): + return (_wait_then_raise() for i in range(10)) + client._data_stub = stub = _CustomFakeStub(make_slow_iterator) + + # Set to timeout before RPC completes + test_backoff_settings = BackoffSettings( + initial_retry_delay_millis=10, + retry_delay_multiplier=0.3, + max_retry_delay_millis=30000, + initial_rpc_timeout_millis=1000, + rpc_timeout_multiplier=1.0, + max_rpc_timeout_millis=25 * 60 * 1000, + total_timeout_millis=1000 + ) + + start_key = b'start-key' + end_key = b'end-key' + filter_obj = object() + limit = 22 + with _Monkey(MUT, _create_row_request=mock_create_row_request): + # Verify that a RetryError is thrown on read. + result = table.read_rows( + start_key=start_key, end_key=end_key, filter_=filter_obj, + limit=limit, backoff_settings=test_backoff_settings) + with self.assertRaises(RetryError): + result.consume_next() + + def test_read_rows_non_idempotent_error_throws(self): + from google.cloud._testing import _Monkey + from tests.unit._testing import _CustomFakeStub + from google.cloud.bigtable.row_data import PartialRowsData + from google.cloud.bigtable import retry as MUT + from google.cloud.bigtable.retry import ReadRowsIterator + from google.gax import BackoffSettings + from google.gax.errors import RetryError + from grpc import StatusCode, RpcError + import time + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_one(self.TABLE_ID, instance) + + # Create request_pb + request_pb = object() # Returned by our mock. + mock_created = [] + + def mock_create_row_request(table_name, **kwargs): + mock_created.append((table_name, kwargs)) + return request_pb + + # Create response iterator that raises a non-idempotent exception + class MockNonIdempotentError(RpcError): + def code(self): + return StatusCode.RESOURCE_EXHAUSTED + + def _raise(): + raise MockNonIdempotentError() + + # Patch the stub used by the API method. The stub should create a new + # slow_iterator every time its queried. + def make_raising_iterator(): + return (_raise() for i in range(10)) + client._data_stub = stub = _CustomFakeStub(make_raising_iterator) + + start_key = b'start-key' + end_key = b'end-key' + filter_obj = object() + limit = 22 + with _Monkey(MUT, _create_row_request=mock_create_row_request): + # Verify that a RetryError is thrown on read. + result = table.read_rows( + start_key=start_key, end_key=end_key, filter_=filter_obj, + limit=limit) + with self.assertRaises(MockNonIdempotentError): + result.consume_next() + def test_sample_row_keys(self): from tests.unit._testing import _FakeStub @@ -572,12 +728,12 @@ def test_sample_row_keys(self): class Test__create_row_request(unittest.TestCase): def _call_fut(self, table_name, row_key=None, start_key=None, end_key=None, - filter_=None, limit=None): - from google.cloud.bigtable.table import _create_row_request + start_key_closed=True, filter_=None, limit=None): + from google.cloud.bigtable.retry import _create_row_request return _create_row_request( table_name, row_key=row_key, start_key=start_key, end_key=end_key, - filter_=filter_, limit=limit) + start_key_closed=start_key_closed, filter_=filter_, limit=limit) def test_table_name_only(self): table_name = 'table_name' @@ -600,7 +756,7 @@ def test_row_key(self): expected_result.rows.row_keys.append(row_key) self.assertEqual(result, expected_result) - def test_row_range_start_key(self): + def test_row_range_start_key_closed(self): table_name = 'table_name' start_key = b'start_key' result = self._call_fut(table_name, start_key=start_key) @@ -608,6 +764,15 @@ def test_row_range_start_key(self): expected_result.rows.row_ranges.add(start_key_closed=start_key) self.assertEqual(result, expected_result) + def test_row_range_start_key_open(self): + table_name = 'table_name' + start_key = b'start_key' + result = self._call_fut(table_name, start_key=start_key, + start_key_closed=False) + expected_result = _ReadRowsRequestPB(table_name=table_name) + expected_result.rows.row_ranges.add(start_key_open=start_key) + self.assertEqual(result, expected_result) + def test_row_range_end_key(self): table_name = 'table_name' end_key = b'end_key'