diff --git a/gcloud/bigtable/row.py b/gcloud/bigtable/row.py index 1dbd38aa7962..aae048b0c7d6 100644 --- a/gcloud/bigtable/row.py +++ b/gcloud/bigtable/row.py @@ -22,10 +22,10 @@ from gcloud._helpers import _datetime_from_microseconds from gcloud._helpers import _microseconds_from_datetime from gcloud._helpers import _to_bytes -from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) -from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) +from gcloud.bigtable._generated_v2 import ( + data_pb2 as data_v2_pb2) +from gcloud.bigtable._generated_v2 import ( + bigtable_pb2 as messages_v2_pb2) _PACK_I64 = struct.Struct('>q').pack @@ -134,13 +134,13 @@ def _set_cell(self, column_family_id, column, value, timestamp=None, # Truncate to millisecond granularity. timestamp_micros -= (timestamp_micros % 1000) - mutation_val = data_v1_pb2.Mutation.SetCell( + mutation_val = data_v2_pb2.Mutation.SetCell( family_name=column_family_id, column_qualifier=column, timestamp_micros=timestamp_micros, value=value, ) - mutation_pb = data_v1_pb2.Mutation(set_cell=mutation_val) + mutation_pb = data_v2_pb2.Mutation(set_cell=mutation_val) self._get_mutations(state).append(mutation_pb) def _delete(self, state=None): @@ -156,8 +156,8 @@ def _delete(self, state=None): :param state: (Optional) The state that is passed along to :meth:`_get_mutations`. """ - mutation_val = data_v1_pb2.Mutation.DeleteFromRow() - mutation_pb = data_v1_pb2.Mutation(delete_from_row=mutation_val) + mutation_val = data_v2_pb2.Mutation.DeleteFromRow() + mutation_pb = data_v2_pb2.Mutation(delete_from_row=mutation_val) self._get_mutations(state).append(mutation_pb) def _delete_cells(self, column_family_id, columns, time_range=None, @@ -188,10 +188,10 @@ def _delete_cells(self, column_family_id, columns, time_range=None, """ mutations_list = self._get_mutations(state) if columns is self.ALL_COLUMNS: - mutation_val = data_v1_pb2.Mutation.DeleteFromFamily( + mutation_val = data_v2_pb2.Mutation.DeleteFromFamily( family_name=column_family_id, ) - mutation_pb = data_v1_pb2.Mutation(delete_from_family=mutation_val) + mutation_pb = data_v2_pb2.Mutation(delete_from_family=mutation_val) mutations_list.append(mutation_pb) else: delete_kwargs = {} @@ -207,9 +207,9 @@ def _delete_cells(self, column_family_id, columns, time_range=None, family_name=column_family_id, column_qualifier=column, ) - mutation_val = data_v1_pb2.Mutation.DeleteFromColumn( + mutation_val = data_v2_pb2.Mutation.DeleteFromColumn( **delete_kwargs) - mutation_pb = data_v1_pb2.Mutation( + mutation_pb = data_v2_pb2.Mutation( delete_from_column=mutation_val) to_append.append(mutation_pb) @@ -389,7 +389,7 @@ def commit(self): if num_mutations > MAX_MUTATIONS: raise ValueError('%d total mutations exceed the maximum allowable ' '%d.' % (num_mutations, MAX_MUTATIONS)) - request_pb = messages_v1_pb2.MutateRowRequest( + request_pb = messages_v2_pb2.MutateRowRequest( table_name=self._table.name, row_key=self._row_key, mutations=mutations_list, @@ -504,14 +504,14 @@ def commit(self): 'mutations and %d false mutations.' % ( MAX_MUTATIONS, num_true_mutations, num_false_mutations)) - request_pb = messages_v1_pb2.CheckAndMutateRowRequest( + request_pb = messages_v2_pb2.CheckAndMutateRowRequest( table_name=self._table.name, row_key=self._row_key, predicate_filter=self._filter.to_pb(), true_mutations=true_mutations, false_mutations=false_mutations, ) - # We expect a `.messages_v1_pb2.CheckAndMutateRowResponse` + # We expect a `.messages_v2_pb2.CheckAndMutateRowResponse` client = self._table._cluster._client resp = client._data_stub.CheckAndMutateRow( request_pb, client.timeout_seconds) @@ -701,7 +701,7 @@ def append_cell_value(self, column_family_id, column, value): """ column = _to_bytes(column) value = _to_bytes(value) - rule_pb = data_v1_pb2.ReadModifyWriteRule( + rule_pb = data_v2_pb2.ReadModifyWriteRule( family_name=column_family_id, column_qualifier=column, append_value=value) @@ -738,7 +738,7 @@ def increment_cell_value(self, column_family_id, column, int_value): will fail. """ column = _to_bytes(column) - rule_pb = data_v1_pb2.ReadModifyWriteRule( + rule_pb = data_v2_pb2.ReadModifyWriteRule( family_name=column_family_id, column_qualifier=column, increment_amount=int_value) @@ -794,12 +794,12 @@ def commit(self): if num_mutations > MAX_MUTATIONS: raise ValueError('%d total append mutations exceed the maximum ' 'allowable %d.' % (num_mutations, MAX_MUTATIONS)) - request_pb = messages_v1_pb2.ReadModifyWriteRowRequest( + request_pb = messages_v2_pb2.ReadModifyWriteRowRequest( table_name=self._table.name, row_key=self._row_key, rules=self._rule_pb_list, ) - # We expect a `.data_v1_pb2.Row` + # We expect a `.data_v2_pb2.Row` client = self._table._cluster._client row_response = client._data_stub.ReadModifyWriteRow( request_pb, client.timeout_seconds) @@ -814,7 +814,7 @@ def commit(self): def _parse_rmw_row_response(row_response): """Parses the response to a ``ReadModifyWriteRow`` request. - :type row_response: :class:`.data_v1_pb2.Row` + :type row_response: :class:`.data_v2_pb2.Row` :param row_response: The response row (with only modified cells) from a ``ReadModifyWriteRow`` request. diff --git a/gcloud/bigtable/row_data.py b/gcloud/bigtable/row_data.py index b6a52405f8b9..e353b8735ba9 100644 --- a/gcloud/bigtable/row_data.py +++ b/gcloud/bigtable/row_data.py @@ -67,6 +67,49 @@ def __ne__(self, other): return not self.__eq__(other) +class PartialCellData(object): + """Representation of partial cell in a Google Cloud Bigtable Table. + + These are expected to be updated directly from a + :class:`._generated.bigtable_service_messages_pb2.ReadRowsResponse` + + :type row_key: bytes + :param row_key: The key for the row holding the (partial) cell. + + :type family_name: str + :param family_name: The family name of the (partial) cell. + + :type qualifier: bytes + :param qualifier: The column qualifier of the (partial) cell. + + :type timestamp_micros: int + :param timestamp_micros: The timestamp (in microsecods) of the + (partial) cell. + + :type labels: list of str + :param labels: labels assigned to the (partial) cell + + :type value: bytes + :param value: The (accumulated) value of the (partial) cell. + """ + def __init__(self, row_key, family_name, qualifier, timestamp_micros, + labels=(), value=b''): + self.row_key = row_key + self.family_name = family_name + self.qualifier = qualifier + self.timestamp_micros = timestamp_micros + self.labels = labels + self.value = value + + def append_value(self, value): + """Append bytes from a new chunk to value. + + :type value: bytes + :param value: bytes to append + """ + self.value += value + + class PartialRowData(object): """Representation of partial row in a Google Cloud Bigtable Table. @@ -80,15 +123,11 @@ class PartialRowData(object): def __init__(self, row_key): self._row_key = row_key self._cells = {} - self._committed = False - self._chunks_encountered = False def __eq__(self, other): if not isinstance(other, self.__class__): return False return (other._row_key == self._row_key and - other._committed == self._committed and - other._chunks_encountered == self._chunks_encountered and other._cells == self._cells) def __ne__(self, other): @@ -132,119 +171,13 @@ def row_key(self): """ return self._row_key - @property - def committed(self): - """Getter for the committed status of the (partial) row. - - :rtype: bool - :returns: The committed status of the (partial) row. - """ - return self._committed - def clear(self): - """Clears all cells that have been added.""" - self._committed = False - self._chunks_encountered = False - self._cells.clear() - - def _handle_commit_row(self, chunk, index, last_chunk_index): - """Handles a ``commit_row`` chunk. - - :type chunk: ``ReadRowsResponse.Chunk`` - :param chunk: The chunk being handled. - - :type index: int - :param index: The current index of the chunk. - - :type last_chunk_index: int - :param last_chunk_index: The index of the last chunk. - - :raises: :class:`ValueError ` if the value of - ``commit_row`` is :data:`False` or if the chunk passed is not - the last chunk in a response. - """ - # NOTE: We assume the caller has checked that the ``ONEOF`` property - # for ``chunk`` is ``commit_row``. - if not chunk.commit_row: - raise ValueError('Received commit_row that was False.') - - if index != last_chunk_index: - raise ValueError('Commit row chunk was not the last chunk') - else: - self._committed = True - - def _handle_reset_row(self, chunk): - """Handles a ``reset_row`` chunk. - - :type chunk: ``ReadRowsResponse.Chunk`` - :param chunk: The chunk being handled. - - :raises: :class:`ValueError ` if the value of - ``reset_row`` is :data:`False` - """ - # NOTE: We assume the caller has checked that the ``ONEOF`` property - # for ``chunk`` is ``reset_row``. - if not chunk.reset_row: - raise ValueError('Received reset_row that was False.') - - self.clear() - - def _handle_row_contents(self, chunk): - """Handles a ``row_contents`` chunk. +class InvalidReadRowsResponse(RuntimeError): + """Exception raised to to invalid response data from back-end.""" - :type chunk: ``ReadRowsResponse.Chunk`` - :param chunk: The chunk being handled. - """ - # NOTE: We assume the caller has checked that the ``ONEOF`` property - # for ``chunk`` is ``row_contents``. - - # chunk.row_contents is ._generated.bigtable_data_pb2.Family - column_family_id = chunk.row_contents.name - column_family_dict = self._cells.setdefault(column_family_id, {}) - for column in chunk.row_contents.columns: - cells = [Cell.from_pb(cell) for cell in column.cells] - - column_name = column.qualifier - column_cells = column_family_dict.setdefault(column_name, []) - column_cells.extend(cells) - - def update_from_read_rows(self, read_rows_response_pb): - """Updates the current row from a ``ReadRows`` response. - - :type read_rows_response_pb: - :class:`._generated.bigtable_service_messages_pb2.ReadRowsResponse` - :param read_rows_response_pb: A response streamed back as part of a - ``ReadRows`` request. - - :raises: :class:`ValueError ` if the current - partial row has already been committed, if the row key on the - response doesn't match the current one or if there is a chunk - encountered with an unexpected ``ONEOF`` protobuf property. - """ - if self._committed: - raise ValueError('The row has been committed') - - if read_rows_response_pb.row_key != self.row_key: - raise ValueError('Response row key (%r) does not match current ' - 'one (%r).' % (read_rows_response_pb.row_key, - self.row_key)) - - last_chunk_index = len(read_rows_response_pb.chunks) - 1 - for index, chunk in enumerate(read_rows_response_pb.chunks): - chunk_property = chunk.WhichOneof('chunk') - if chunk_property == 'row_contents': - self._handle_row_contents(chunk) - elif chunk_property == 'reset_row': - self._handle_reset_row(chunk) - elif chunk_property == 'commit_row': - self._handle_commit_row(chunk, index, last_chunk_index) - else: - # NOTE: This includes chunk_property == None since we always - # want a value to be set - raise ValueError('Unexpected chunk property: %s' % ( - chunk_property,)) - self._chunks_encountered = True +class InvalidChunk(RuntimeError): + """Exception raised to to invalid chunk data from back-end.""" class PartialRowsData(object): @@ -255,11 +188,27 @@ class PartialRowsData(object): :param response_iterator: A streaming iterator returned from a ``ReadRows`` request. """ + START = "Start" # No responses yet processed. + NEW_ROW = "New row" # No cells yet complete for row + ROW_IN_PROGRESS = "Row in progress" # Some cells complete for row + CELL_IN_PROGRESS = "Cell in progress" # Incomplete cell for row def __init__(self, response_iterator): - # We expect an iterator of `data_messages_pb2.ReadRowsResponse` self._response_iterator = response_iterator + # Fully-processed rows, keyed by `row_key` self._rows = {} + # Counter for responses pulled from iterator + self._counter = 0 + # Maybe cached from previous response + self._last_scanned_row_key = None + # In-progress row, unset until first response, after commit/reset + self._row = None + # Last complete row, unset until first commit + self._previous_row = None + # In-progress cell, unset until first response, after completion + self._cell = None + # Last complete cell, unset until first completion, after new row + self._previous_cell = None def __eq__(self, other): if not isinstance(other, self.__class__): @@ -269,12 +218,32 @@ def __eq__(self, other): def __ne__(self, other): return not self.__eq__(other) + @property + def state(self): + """State machine state. + + :rtype: str + :returns: name of state corresponding to currrent row / chunk + processing. + """ + if self._last_scanned_row_key is None: + return self.START + if self._row is None: + assert self._cell is None + assert self._previous_cell is None + return self.NEW_ROW + if self._cell is not None: + return self.CELL_IN_PROGRESS + if self._previous_cell is not None: + return self.ROW_IN_PROGRESS + return self.NEW_ROW # row added, no chunk yet processed + @property def rows(self): """Property returning all rows accumulated from the stream. :rtype: dict - :returns: Dictionary of :class:`PartialRowData`. + :returns: row_key -> :class:`PartialRowData`. """ # NOTE: To avoid duplicating large objects, this is just the # mutable private data. @@ -285,21 +254,55 @@ def cancel(self): self._response_iterator.cancel() def consume_next(self): - """Consumes the next ``ReadRowsResponse`` from the stream. - - Parses the response and stores it as a :class:`PartialRowData` - in a dictionary owned by this object. + """Consume the next ``ReadRowsResponse`` from the stream. - :raises: :class:`StopIteration ` if the - response iterator has no more responses to stream. + Parse the response and its chunks into a new/existing row in + :attr:`_rows` """ - read_rows_response = self._response_iterator.next() - row_key = read_rows_response.row_key - partial_row = self._rows.get(row_key) - if partial_row is None: - partial_row = self._rows[row_key] = PartialRowData(row_key) - # NOTE: This is not atomic in the case of failures. - partial_row.update_from_read_rows(read_rows_response) + response = six.next(self._response_iterator) + self._counter += 1 + + if self._last_scanned_row_key is None: # first response + if response.last_scanned_row_key: + raise InvalidReadRowsResponse() + + self._last_scanned_row_key = response.last_scanned_row_key + + row = self._row + cell = self._cell + + for chunk in response.chunks: + + self._validate_chunk(chunk) + + if chunk.reset_row: + row = self._row = None + cell = self._cell = self._previous_cell = None + continue + + if row is None: + row = self._row = PartialRowData(chunk.row_key) + + if cell is None: + cell = self._cell = PartialCellData( + chunk.row_key, + chunk.family_name.value, + chunk.qualifier.value, + chunk.timestamp_micros, + chunk.labels, + chunk.value) + self._copy_from_previous(cell) + else: + cell.append_value(chunk.value) + + if chunk.commit_row: + self._save_current_row() + row = cell = None + continue + + if chunk.value_size == 0: + self._save_current_cell() + cell = None def consume_all(self, max_loops=None): """Consume the streamed responses until there are no more. @@ -322,100 +325,6 @@ def consume_all(self, max_loops=None): except StopIteration: break - -class InvalidReadRowsResponse(RuntimeError): - """Exception raised to to invalid response data from back-end.""" - - -class InvalidChunk(RuntimeError): - """Exception raised to to invalid chunk data from back-end.""" - - -def _raise_if(predicate, *args): - """Helper for validation methods.""" - if predicate: - raise InvalidChunk(*args) - - -class PartialCellV2(object): - """Data for a not-yet-complete cell.""" - - def __init__(self, row_key, family_name, qualifier, timestamp_micros, - labels=(), value=b''): - self.row_key = row_key - self.family_name = family_name - self.qualifier = qualifier - self.timestamp_micros = timestamp_micros - self.labels = labels - self.value = value - - def append_value(self, value): - """Append bytes from a new chunk to value. - - :type value: bytes - :param value: bytes to append - """ - self.value += value - - -class PartialRowsDataV2(object): - """Handle state involved in consuming a ``ReadRows`` streaming response. - - :type response_iterator: - :class:`grpc.framework.alpha._reexport._CancellableIterator` returning - :class:`gcloud.bigtable._generated_v2.bigtable_pb2.ReadRowsResponse` - :param response_iterator: - A streaming iterator returned from a ``ReadRows`` request. - """ - # State names - START = "Start" - NEW_ROW = "New row" - ROW_IN_PROGRESS = "Row in progress" - CELL_IN_PROGRESS = "Cell in progress" - - def __init__(self, response_iterator): - self._response_iterator = response_iterator - # Fully-processed rows, keyed by `row_key` - self._rows = {} - # Counter for responses pulled from iterator - self._counter = 0 - # Maybe cached from previous response - self._last_scanned_row_key = None - # In-progress row, unset until first response, after commit/reset - self._row = None - # Last complete row, unset until first commit - self._previous_row = None - # In-progress cell, unset until first response, after completion - self._cell = None - # Last complete cell, unset until first completion, after new row - self._previous_cell = None - - @property - def state(self): - """Name of state machine state.""" - if self._last_scanned_row_key is None: - return self.START - if self._row is None: - assert self._cell is None - assert self._previous_cell is None - return self.NEW_ROW - if self._cell is not None: - return self.CELL_IN_PROGRESS - if self._previous_cell is not None: - return self.ROW_IN_PROGRESS - return self.NEW_ROW # row added, no chunk yet processed - - @property - def rows(self): - """Property returning all rows accumulated from the stream. - - :rtype: dict - :returns: Dictionary of :class:`PartialRowData`. - """ - # NOTE: To avoid duplicating large objects, this is just the - # mutable private data. - return self._rows - @staticmethod def _validate_chunk_status(chunk): """Helper for :meth:`_validate_chunk_row_in_progress`, etc.""" @@ -526,53 +435,8 @@ def _save_current_row(self): self._row, self._previous_row = None, self._row self._previous_cell = None - def consume_next(self): - """Consume the next ``ReadRowsResponse`` from the stream. - - Parse the response and its chunks into a new/existing row in - :attr:`_rows` - """ - response = self._response_iterator.next() - self._counter += 1 - - if self._last_scanned_row_key is None: # first response - if response.last_scanned_row_key: - raise InvalidReadRowsResponse() - - self._last_scanned_row_key = response.last_scanned_row_key - - row = self._row - cell = self._cell - - for chunk in response.chunks: - - self._validate_chunk(chunk) - - if chunk.reset_row: - row = self._row = None - cell = self._cell = self._previous_cell = None - continue - - if row is None: - row = self._row = PartialRowData(chunk.row_key) - - if cell is None: - cell = self._cell = PartialCellV2( - chunk.row_key, - chunk.family_name.value, - chunk.qualifier.value, - chunk.timestamp_micros, - chunk.labels, - chunk.value) - self._copy_from_previous(cell) - else: - cell.append_value(chunk.value) - - if chunk.commit_row: - self._save_current_row() - row = cell = None - continue - if chunk.value_size == 0: - self._save_current_cell() - cell = None +def _raise_if(predicate, *args): + """Helper for validation methods.""" + if predicate: + raise InvalidChunk(*args) diff --git a/gcloud/bigtable/row_filters.py b/gcloud/bigtable/row_filters.py index 2b11a06bfdd9..f76615ba5ea8 100644 --- a/gcloud/bigtable/row_filters.py +++ b/gcloud/bigtable/row_filters.py @@ -17,8 +17,8 @@ from gcloud._helpers import _microseconds_from_datetime from gcloud._helpers import _to_bytes -from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) +from gcloud.bigtable._generated_v2 import ( + data_pb2 as data_v2_pb2) class RowFilter(object): @@ -66,10 +66,10 @@ class SinkFilter(_BoolFilter): def to_pb(self): """Converts the row filter to a protobuf. - :rtype: :class:`.data_v1_pb2.RowFilter` + :rtype: :class:`.data_v2_pb2.RowFilter` :returns: The converted current object. """ - return data_v1_pb2.RowFilter(sink=self.flag) + return data_v2_pb2.RowFilter(sink=self.flag) class PassAllFilter(_BoolFilter): @@ -84,10 +84,10 @@ class PassAllFilter(_BoolFilter): def to_pb(self): """Converts the row filter to a protobuf. - :rtype: :class:`.data_v1_pb2.RowFilter` + :rtype: :class:`.data_v2_pb2.RowFilter` :returns: The converted current object. """ - return data_v1_pb2.RowFilter(pass_all_filter=self.flag) + return data_v2_pb2.RowFilter(pass_all_filter=self.flag) class BlockAllFilter(_BoolFilter): @@ -101,10 +101,10 @@ class BlockAllFilter(_BoolFilter): def to_pb(self): """Converts the row filter to a protobuf. - :rtype: :class:`.data_v1_pb2.RowFilter` + :rtype: :class:`.data_v2_pb2.RowFilter` :returns: The converted current object. """ - return data_v1_pb2.RowFilter(block_all_filter=self.flag) + return data_v2_pb2.RowFilter(block_all_filter=self.flag) class _RegexFilter(RowFilter): @@ -154,10 +154,10 @@ class RowKeyRegexFilter(_RegexFilter): def to_pb(self): """Converts the row filter to a protobuf. - :rtype: :class:`.data_v1_pb2.RowFilter` + :rtype: :class:`.data_v2_pb2.RowFilter` :returns: The converted current object. """ - return data_v1_pb2.RowFilter(row_key_regex_filter=self.regex) + return data_v2_pb2.RowFilter(row_key_regex_filter=self.regex) class RowSampleFilter(RowFilter): @@ -179,10 +179,10 @@ def __eq__(self, other): def to_pb(self): """Converts the row filter to a protobuf. - :rtype: :class:`.data_v1_pb2.RowFilter` + :rtype: :class:`.data_v2_pb2.RowFilter` :returns: The converted current object. """ - return data_v1_pb2.RowFilter(row_sample_filter=self.sample) + return data_v2_pb2.RowFilter(row_sample_filter=self.sample) class FamilyNameRegexFilter(_RegexFilter): @@ -203,10 +203,10 @@ class FamilyNameRegexFilter(_RegexFilter): def to_pb(self): """Converts the row filter to a protobuf. - :rtype: :class:`.data_v1_pb2.RowFilter` + :rtype: :class:`.data_v2_pb2.RowFilter` :returns: The converted current object. """ - return data_v1_pb2.RowFilter(family_name_regex_filter=self.regex) + return data_v2_pb2.RowFilter(family_name_regex_filter=self.regex) class ColumnQualifierRegexFilter(_RegexFilter): @@ -233,10 +233,10 @@ class ColumnQualifierRegexFilter(_RegexFilter): def to_pb(self): """Converts the row filter to a protobuf. - :rtype: :class:`.data_v1_pb2.RowFilter` + :rtype: :class:`.data_v2_pb2.RowFilter` :returns: The converted current object. """ - return data_v1_pb2.RowFilter(column_qualifier_regex_filter=self.regex) + return data_v2_pb2.RowFilter(column_qualifier_regex_filter=self.regex) class TimestampRange(object): @@ -267,7 +267,7 @@ def __ne__(self, other): def to_pb(self): """Converts the :class:`TimestampRange` to a protobuf. - :rtype: :class:`.data_v1_pb2.TimestampRange` + :rtype: :class:`.data_v2_pb2.TimestampRange` :returns: The converted current object. """ timestamp_range_kwargs = {} @@ -277,7 +277,7 @@ def to_pb(self): if self.end is not None: timestamp_range_kwargs['end_timestamp_micros'] = ( _microseconds_from_datetime(self.end)) - return data_v1_pb2.TimestampRange(**timestamp_range_kwargs) + return data_v2_pb2.TimestampRange(**timestamp_range_kwargs) class TimestampRangeFilter(RowFilter): @@ -301,10 +301,10 @@ def to_pb(self): First converts the ``range_`` on the current object to a protobuf and then uses it in the ``timestamp_range_filter`` field. - :rtype: :class:`.data_v1_pb2.RowFilter` + :rtype: :class:`.data_v2_pb2.RowFilter` :returns: The converted current object. """ - return data_v1_pb2.RowFilter( + return data_v2_pb2.RowFilter( timestamp_range_filter=self.range_.to_pb()) @@ -377,28 +377,28 @@ def __eq__(self, other): def to_pb(self): """Converts the row filter to a protobuf. - First converts to a :class:`.data_v1_pb2.ColumnRange` and then uses it + First converts to a :class:`.data_v2_pb2.ColumnRange` and then uses it in the ``column_range_filter`` field. - :rtype: :class:`.data_v1_pb2.RowFilter` + :rtype: :class:`.data_v2_pb2.RowFilter` :returns: The converted current object. """ column_range_kwargs = {'family_name': self.column_family_id} if self.start_column is not None: if self.inclusive_start: - key = 'start_qualifier_inclusive' + key = 'start_qualifier_closed' else: - key = 'start_qualifier_exclusive' + key = 'start_qualifier_open' column_range_kwargs[key] = _to_bytes(self.start_column) if self.end_column is not None: if self.inclusive_end: - key = 'end_qualifier_inclusive' + key = 'end_qualifier_closed' else: - key = 'end_qualifier_exclusive' + key = 'end_qualifier_open' column_range_kwargs[key] = _to_bytes(self.end_column) - column_range = data_v1_pb2.ColumnRange(**column_range_kwargs) - return data_v1_pb2.RowFilter(column_range_filter=column_range) + column_range = data_v2_pb2.ColumnRange(**column_range_kwargs) + return data_v2_pb2.RowFilter(column_range_filter=column_range) class ValueRegexFilter(_RegexFilter): @@ -425,10 +425,10 @@ class ValueRegexFilter(_RegexFilter): def to_pb(self): """Converts the row filter to a protobuf. - :rtype: :class:`.data_v1_pb2.RowFilter` + :rtype: :class:`.data_v2_pb2.RowFilter` :returns: The converted current object. """ - return data_v1_pb2.RowFilter(value_regex_filter=self.regex) + return data_v2_pb2.RowFilter(value_regex_filter=self.regex) class ValueRangeFilter(RowFilter): @@ -494,28 +494,28 @@ def __eq__(self, other): def to_pb(self): """Converts the row filter to a protobuf. - First converts to a :class:`.data_v1_pb2.ValueRange` and then uses + First converts to a :class:`.data_v2_pb2.ValueRange` and then uses it to create a row filter protobuf. - :rtype: :class:`.data_v1_pb2.RowFilter` + :rtype: :class:`.data_v2_pb2.RowFilter` :returns: The converted current object. """ value_range_kwargs = {} if self.start_value is not None: if self.inclusive_start: - key = 'start_value_inclusive' + key = 'start_value_closed' else: - key = 'start_value_exclusive' + key = 'start_value_open' value_range_kwargs[key] = _to_bytes(self.start_value) if self.end_value is not None: if self.inclusive_end: - key = 'end_value_inclusive' + key = 'end_value_closed' else: - key = 'end_value_exclusive' + key = 'end_value_open' value_range_kwargs[key] = _to_bytes(self.end_value) - value_range = data_v1_pb2.ValueRange(**value_range_kwargs) - return data_v1_pb2.RowFilter(value_range_filter=value_range) + value_range = data_v2_pb2.ValueRange(**value_range_kwargs) + return data_v2_pb2.RowFilter(value_range_filter=value_range) class _CellCountFilter(RowFilter): @@ -547,10 +547,10 @@ class CellsRowOffsetFilter(_CellCountFilter): def to_pb(self): """Converts the row filter to a protobuf. - :rtype: :class:`.data_v1_pb2.RowFilter` + :rtype: :class:`.data_v2_pb2.RowFilter` :returns: The converted current object. """ - return data_v1_pb2.RowFilter( + return data_v2_pb2.RowFilter( cells_per_row_offset_filter=self.num_cells) @@ -564,10 +564,10 @@ class CellsRowLimitFilter(_CellCountFilter): def to_pb(self): """Converts the row filter to a protobuf. - :rtype: :class:`.data_v1_pb2.RowFilter` + :rtype: :class:`.data_v2_pb2.RowFilter` :returns: The converted current object. """ - return data_v1_pb2.RowFilter(cells_per_row_limit_filter=self.num_cells) + return data_v2_pb2.RowFilter(cells_per_row_limit_filter=self.num_cells) class CellsColumnLimitFilter(_CellCountFilter): @@ -582,10 +582,10 @@ class CellsColumnLimitFilter(_CellCountFilter): def to_pb(self): """Converts the row filter to a protobuf. - :rtype: :class:`.data_v1_pb2.RowFilter` + :rtype: :class:`.data_v2_pb2.RowFilter` :returns: The converted current object. """ - return data_v1_pb2.RowFilter( + return data_v2_pb2.RowFilter( cells_per_column_limit_filter=self.num_cells) @@ -601,10 +601,10 @@ class StripValueTransformerFilter(_BoolFilter): def to_pb(self): """Converts the row filter to a protobuf. - :rtype: :class:`.data_v1_pb2.RowFilter` + :rtype: :class:`.data_v2_pb2.RowFilter` :returns: The converted current object. """ - return data_v1_pb2.RowFilter(strip_value_transformer=self.flag) + return data_v2_pb2.RowFilter(strip_value_transformer=self.flag) class ApplyLabelFilter(RowFilter): @@ -637,10 +637,10 @@ def __eq__(self, other): def to_pb(self): """Converts the row filter to a protobuf. - :rtype: :class:`.data_v1_pb2.RowFilter` + :rtype: :class:`.data_v2_pb2.RowFilter` :returns: The converted current object. """ - return data_v1_pb2.RowFilter(apply_label_transformer=self.label) + return data_v2_pb2.RowFilter(apply_label_transformer=self.label) class _FilterCombination(RowFilter): @@ -679,12 +679,12 @@ class RowFilterChain(_FilterCombination): def to_pb(self): """Converts the row filter to a protobuf. - :rtype: :class:`.data_v1_pb2.RowFilter` + :rtype: :class:`.data_v2_pb2.RowFilter` :returns: The converted current object. """ - chain = data_v1_pb2.RowFilter.Chain( + chain = data_v2_pb2.RowFilter.Chain( filters=[row_filter.to_pb() for row_filter in self.filters]) - return data_v1_pb2.RowFilter(chain=chain) + return data_v2_pb2.RowFilter(chain=chain) class RowFilterUnion(_FilterCombination): @@ -703,12 +703,12 @@ class RowFilterUnion(_FilterCombination): def to_pb(self): """Converts the row filter to a protobuf. - :rtype: :class:`.data_v1_pb2.RowFilter` + :rtype: :class:`.data_v2_pb2.RowFilter` :returns: The converted current object. """ - interleave = data_v1_pb2.RowFilter.Interleave( + interleave = data_v2_pb2.RowFilter.Interleave( filters=[row_filter.to_pb() for row_filter in self.filters]) - return data_v1_pb2.RowFilter(interleave=interleave) + return data_v2_pb2.RowFilter(interleave=interleave) class ConditionalRowFilter(RowFilter): @@ -756,7 +756,7 @@ def __eq__(self, other): def to_pb(self): """Converts the row filter to a protobuf. - :rtype: :class:`.data_v1_pb2.RowFilter` + :rtype: :class:`.data_v2_pb2.RowFilter` :returns: The converted current object. """ condition_kwargs = {'predicate_filter': self.base_filter.to_pb()} @@ -764,5 +764,5 @@ def to_pb(self): condition_kwargs['true_filter'] = self.true_filter.to_pb() if self.false_filter is not None: condition_kwargs['false_filter'] = self.false_filter.to_pb() - condition = data_v1_pb2.RowFilter.Condition(**condition_kwargs) - return data_v1_pb2.RowFilter(condition=condition) + condition = data_v2_pb2.RowFilter.Condition(**condition_kwargs) + return data_v2_pb2.RowFilter(condition=condition) diff --git a/gcloud/bigtable/table.py b/gcloud/bigtable/table.py index 155b5123c67f..83182d9f2a04 100644 --- a/gcloud/bigtable/table.py +++ b/gcloud/bigtable/table.py @@ -14,20 +14,16 @@ """User friendly container for Google Cloud Bigtable Table.""" - from gcloud._helpers import _to_bytes -from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) -from gcloud.bigtable._generated import ( - bigtable_table_service_messages_pb2 as messages_v1_pb2) -from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as data_messages_v1_pb2) +from gcloud.bigtable._generated_v2 import ( + bigtable_pb2 as data_messages_v2_pb2) +from gcloud.bigtable._generated_v2 import ( + bigtable_table_admin_pb2 as table_admin_messages_v2_pb2) from gcloud.bigtable.column_family import _gc_rule_from_pb from gcloud.bigtable.column_family import ColumnFamily from gcloud.bigtable.row import AppendRow from gcloud.bigtable.row import ConditionalRow from gcloud.bigtable.row import DirectRow -from gcloud.bigtable.row_data import PartialRowData from gcloud.bigtable.row_data import PartialRowsData @@ -168,8 +164,12 @@ def create(self, initial_split_keys=None): created, spanning the key ranges: ``[, s1)``, ``[s1, s2)``, ``[s2, )``. """ - request_pb = messages_v1_pb2.CreateTableRequest( - initial_split_keys=initial_split_keys or [], + split_pb = table_admin_messages_v2_pb2.CreateTableRequest.Split + if initial_split_keys is not None: + initial_split_keys = [ + split_pb(key=key) for key in initial_split_keys] + request_pb = table_admin_messages_v2_pb2.CreateTableRequest( + initial_splits=initial_split_keys or [], name=self._cluster.name, table_id=self.table_id, ) @@ -179,7 +179,8 @@ def create(self, initial_split_keys=None): def delete(self): """Delete this table.""" - request_pb = messages_v1_pb2.DeleteTableRequest(name=self.name) + request_pb = table_admin_messages_v2_pb2.DeleteTableRequest( + name=self.name) client = self._cluster._client # We expect a `google.protobuf.empty_pb2.Empty` client._table_stub.DeleteTable(request_pb, client.timeout_seconds) @@ -195,7 +196,8 @@ def list_column_families(self): family name from the response does not agree with the computed name from the column family ID. """ - request_pb = messages_v1_pb2.GetTableRequest(name=self.name) + request_pb = table_admin_messages_v2_pb2.GetTableRequest( + name=self.name) client = self._cluster._client # We expect a `._generated.bigtable_table_data_pb2.Table` table_pb = client._table_stub.GetTable(request_pb, @@ -206,10 +208,6 @@ def list_column_families(self): gc_rule = _gc_rule_from_pb(value_pb.gc_rule) column_family = self.column_family(column_family_id, gc_rule=gc_rule) - if column_family.name != value_pb.name: - raise ValueError('Column family name %s does not agree with ' - 'name from request: %s.' % ( - column_family.name, value_pb.name)) result[column_family_id] = column_family return result @@ -234,21 +232,18 @@ def read_row(self, row_key, filter_=None): client = self._cluster._client response_iterator = client._data_stub.ReadRows(request_pb, client.timeout_seconds) - # We expect an iterator of `data_messages_v1_pb2.ReadRowsResponse` - result = PartialRowData(row_key) - for read_rows_response in response_iterator: - result.update_from_read_rows(read_rows_response) + rows_data = PartialRowsData(response_iterator) + rows_data.consume_all() + if rows_data.state != rows_data.NEW_ROW: + raise ValueError('The row remains partial / is not committed.') - # Make sure the result actually contains data. - if not result._chunks_encountered: + if len(rows_data.rows) == 0: return None - # Make sure the result was committed by the back-end. - if not result.committed: - raise ValueError('The row remains partial / is not committed.') - return result - def read_rows(self, start_key=None, end_key=None, - allow_row_interleaving=None, limit=None, filter_=None): + return rows_data.rows[row_key] + + def read_rows(self, start_key=None, end_key=None, limit=None, + filter_=None): """Read rows from this table. :type start_key: bytes @@ -261,26 +256,10 @@ def read_rows(self, start_key=None, end_key=None, The range will not include ``end_key``. If left empty, will be interpreted as an infinite string. - :type allow_row_interleaving: bool - :param allow_row_interleaving: (Optional) By default, rows are read - sequentially, producing results which - are guaranteed to arrive in increasing - row order. Setting - ``allow_row_interleaving`` to - :data:`True` allows multiple rows to be - interleaved in the response stream, - which increases throughput but breaks - this guarantee, and may force the - client to use more memory to buffer - partially-received rows. - :type limit: int :param limit: (Optional) The read will terminate after committing to N rows' worth of results. The default (zero) is to return - all results. Note that if ``allow_row_interleaving`` is - set to :data:`True`, partial results may be returned for - more than N rows. However, only N ``commit_row`` chunks - will be sent. + all results. :type filter_: :class:`.RowFilter` :param filter_: (Optional) The filter to apply to the contents of the @@ -293,11 +272,11 @@ def read_rows(self, start_key=None, end_key=None, """ request_pb = _create_row_request( self.name, start_key=start_key, end_key=end_key, filter_=filter_, - allow_row_interleaving=allow_row_interleaving, limit=limit) + limit=limit) client = self._cluster._client response_iterator = client._data_stub.ReadRows(request_pb, client.timeout_seconds) - # We expect an iterator of `data_messages_v1_pb2.ReadRowsResponse` + # We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse` return PartialRowsData(response_iterator) def sample_row_keys(self): @@ -331,7 +310,7 @@ def sample_row_keys(self): or by casting to a :class:`list` and can be cancelled by calling ``cancel()``. """ - request_pb = data_messages_v1_pb2.SampleRowKeysRequest( + request_pb = data_messages_v2_pb2.SampleRowKeysRequest( table_name=self.name) client = self._cluster._client response_iterator = client._data_stub.SampleRowKeys( @@ -340,7 +319,7 @@ def sample_row_keys(self): def _create_row_request(table_name, row_key=None, start_key=None, end_key=None, - filter_=None, allow_row_interleaving=None, limit=None): + filter_=None, limit=None): """Creates a request to read rows in a table. :type table_name: str @@ -363,28 +342,12 @@ def _create_row_request(table_name, row_key=None, start_key=None, end_key=None, :param filter_: (Optional) The filter to apply to the contents of the specified row(s). If unset, reads the entire table. - :type allow_row_interleaving: bool - :param allow_row_interleaving: (Optional) By default, rows are read - sequentially, producing results which are - guaranteed to arrive in increasing row - order. Setting - ``allow_row_interleaving`` to - :data:`True` allows multiple rows to be - interleaved in the response stream, - which increases throughput but breaks - this guarantee, and may force the - client to use more memory to buffer - partially-received rows. - :type limit: int :param limit: (Optional) The read will terminate after committing to N rows' worth of results. The default (zero) is to return - all results. Note that if ``allow_row_interleaving`` is - set to :data:`True`, partial results may be returned for - more than N rows. However, only N ``commit_row`` chunks - will be sent. + all results. - :rtype: :class:`data_messages_v1_pb2.ReadRowsRequest` + :rtype: :class:`data_messages_v2_pb2.ReadRowsRequest` :returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs. :raises: :class:`ValueError ` if both ``row_key`` and one of ``start_key`` and ``end_key`` are set @@ -394,21 +357,23 @@ def _create_row_request(table_name, row_key=None, start_key=None, end_key=None, (start_key is not None or end_key is not None)): raise ValueError('Row key and row range cannot be ' 'set simultaneously') - if row_key is not None: - request_kwargs['row_key'] = _to_bytes(row_key) + range_kwargs = {} if start_key is not None or end_key is not None: - range_kwargs = {} if start_key is not None: - range_kwargs['start_key'] = _to_bytes(start_key) + range_kwargs['start_key_closed'] = _to_bytes(start_key) if end_key is not None: - range_kwargs['end_key'] = _to_bytes(end_key) - row_range = data_v1_pb2.RowRange(**range_kwargs) - request_kwargs['row_range'] = row_range + range_kwargs['end_key_open'] = _to_bytes(end_key) if filter_ is not None: request_kwargs['filter'] = filter_.to_pb() - if allow_row_interleaving is not None: - request_kwargs['allow_row_interleaving'] = allow_row_interleaving if limit is not None: - request_kwargs['num_rows_limit'] = limit + request_kwargs['rows_limit'] = limit + + message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs) + + if row_key is not None: + message.rows.row_keys.append(_to_bytes(row_key)) + + if range_kwargs: + message.rows.row_ranges.add(**range_kwargs) - return data_messages_v1_pb2.ReadRowsRequest(**request_kwargs) + return message diff --git a/gcloud/bigtable/test_row.py b/gcloud/bigtable/test_row.py index e2336d7520f3..2cc7630758d2 100644 --- a/gcloud/bigtable/test_row.py +++ b/gcloud/bigtable/test_row.py @@ -75,9 +75,6 @@ def _set_cell_helper(self, column=None, column_bytes=None, timestamp_micros=-1): import six import struct - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - row_key = b'row_key' column_family_id = u'column_family_id' if column is None: @@ -90,8 +87,8 @@ def _set_cell_helper(self, column=None, column_bytes=None, if isinstance(value, six.integer_types): value = struct.pack('>q', value) - expected_pb = data_v1_pb2.Mutation( - set_cell=data_v1_pb2.Mutation.SetCell( + expected_pb = _MutationPB( + set_cell=_MutationSetCellPB( family_name=column_family_id, column_qualifier=column_bytes or column, timestamp_micros=timestamp_micros, @@ -135,16 +132,13 @@ def test_set_cell_with_non_null_timestamp(self): timestamp_micros=millis_granularity) def test_delete(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - row_key = b'row_key' row = self._makeOne(row_key, object()) self.assertEqual(row._pb_mutations, []) row.delete() - expected_pb = data_v1_pb2.Mutation( - delete_from_row=data_v1_pb2.Mutation.DeleteFromRow(), + expected_pb = _MutationPB( + delete_from_row=_MutationDeleteFromRowPB(), ) self.assertEqual(row._pb_mutations, [expected_pb]) @@ -195,9 +189,6 @@ def test_delete_cells_non_iterable(self): row.delete_cells(column_family_id, columns) def test_delete_cells_all_columns(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - row_key = b'row_key' column_family_id = u'column_family_id' table = object() @@ -207,8 +198,8 @@ def test_delete_cells_all_columns(self): self.assertEqual(row._pb_mutations, []) row.delete_cells(column_family_id, klass.ALL_COLUMNS) - expected_pb = data_v1_pb2.Mutation( - delete_from_family=data_v1_pb2.Mutation.DeleteFromFamily( + expected_pb = _MutationPB( + delete_from_family=_MutationDeleteFromFamilyPB( family_name=column_family_id, ), ) @@ -226,9 +217,6 @@ def test_delete_cells_no_columns(self): self.assertEqual(row._pb_mutations, []) def _delete_cells_helper(self, time_range=None): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - row_key = b'row_key' column = b'column' column_family_id = u'column_family_id' @@ -239,8 +227,8 @@ def _delete_cells_helper(self, time_range=None): self.assertEqual(row._pb_mutations, []) row.delete_cells(column_family_id, columns, time_range=time_range) - expected_pb = data_v1_pb2.Mutation( - delete_from_column=data_v1_pb2.Mutation.DeleteFromColumn( + expected_pb = _MutationPB( + delete_from_column=_MutationDeleteFromColumnPB( family_name=column_family_id, column_qualifier=column, ), @@ -279,9 +267,6 @@ def test_delete_cells_with_bad_column(self): self.assertEqual(row._pb_mutations, []) def test_delete_cells_with_string_columns(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - row_key = b'row_key' column_family_id = u'column_family_id' column1 = u'column1' @@ -295,14 +280,14 @@ def test_delete_cells_with_string_columns(self): self.assertEqual(row._pb_mutations, []) row.delete_cells(column_family_id, columns) - expected_pb1 = data_v1_pb2.Mutation( - delete_from_column=data_v1_pb2.Mutation.DeleteFromColumn( + expected_pb1 = _MutationPB( + delete_from_column=_MutationDeleteFromColumnPB( family_name=column_family_id, column_qualifier=column1_bytes, ), ) - expected_pb2 = data_v1_pb2.Mutation( - delete_from_column=data_v1_pb2.Mutation.DeleteFromColumn( + expected_pb2 = _MutationPB( + delete_from_column=_MutationDeleteFromColumnPB( family_name=column_family_id, column_qualifier=column2_bytes, ), @@ -311,10 +296,6 @@ def test_delete_cells_with_string_columns(self): def test_commit(self): from google.protobuf import empty_pb2 - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) from gcloud.bigtable._testing import _FakeStub row_key = b'row_key' @@ -328,15 +309,15 @@ def test_commit(self): # Create request_pb value = b'bytes-value' - mutation = data_v1_pb2.Mutation( - set_cell=data_v1_pb2.Mutation.SetCell( + mutation = _MutationPB( + set_cell=_MutationSetCellPB( family_name=column_family_id, column_qualifier=column, timestamp_micros=-1, # Default value. value=value, ), ) - request_pb = messages_v1_pb2.MutateRowRequest( + request_pb = _MutateRowRequestPB( table_name=table_name, row_key=row_key, mutations=[mutation], @@ -427,10 +408,6 @@ def test__get_mutations(self): self.assertTrue(false_mutations is row._get_mutations(None)) def test_commit(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) from gcloud.bigtable._testing import _FakeStub from gcloud.bigtable.row_filters import RowSampleFilter @@ -449,29 +426,29 @@ def test_commit(self): # Create request_pb value1 = b'bytes-value' - mutation1 = data_v1_pb2.Mutation( - set_cell=data_v1_pb2.Mutation.SetCell( + mutation1 = _MutationPB( + set_cell=_MutationSetCellPB( family_name=column_family_id1, column_qualifier=column1, timestamp_micros=-1, # Default value. value=value1, ), ) - mutation2 = data_v1_pb2.Mutation( - delete_from_row=data_v1_pb2.Mutation.DeleteFromRow(), + mutation2 = _MutationPB( + delete_from_row=_MutationDeleteFromRowPB(), ) - mutation3 = data_v1_pb2.Mutation( - delete_from_column=data_v1_pb2.Mutation.DeleteFromColumn( + mutation3 = _MutationPB( + delete_from_column=_MutationDeleteFromColumnPB( family_name=column_family_id2, column_qualifier=column2, ), ) - mutation4 = data_v1_pb2.Mutation( - delete_from_family=data_v1_pb2.Mutation.DeleteFromFamily( + mutation4 = _MutationPB( + delete_from_family=_MutationDeleteFromFamilyPB( family_name=column_family_id3, ), ) - request_pb = messages_v1_pb2.CheckAndMutateRowRequest( + request_pb = _CheckAndMutateRowRequestPB( table_name=table_name, row_key=row_key, predicate_filter=row_filter.to_pb(), @@ -481,7 +458,7 @@ def test_commit(self): # Create response_pb predicate_matched = True - response_pb = messages_v1_pb2.CheckAndMutateRowResponse( + response_pb = _CheckAndMutateRowResponsePB( predicate_matched=predicate_matched) # Patch the stub used by the API method. @@ -567,9 +544,6 @@ def test_clear(self): self.assertEqual(row._rule_pb_list, []) def test_append_cell_value(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - table = object() row_key = b'row_key' row = self._makeOne(row_key, table) @@ -579,15 +553,12 @@ def test_append_cell_value(self): column_family_id = u'column_family_id' value = b'bytes-val' row.append_cell_value(column_family_id, column, value) - expected_pb = data_v1_pb2.ReadModifyWriteRule( + expected_pb = _ReadModifyWriteRulePB( family_name=column_family_id, column_qualifier=column, append_value=value) self.assertEqual(row._rule_pb_list, [expected_pb]) def test_increment_cell_value(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - table = object() row_key = b'row_key' row = self._makeOne(row_key, table) @@ -597,17 +568,13 @@ def test_increment_cell_value(self): column_family_id = u'column_family_id' int_value = 281330 row.increment_cell_value(column_family_id, column, int_value) - expected_pb = data_v1_pb2.ReadModifyWriteRule( + expected_pb = _ReadModifyWriteRulePB( family_name=column_family_id, column_qualifier=column, increment_amount=int_value) self.assertEqual(row._rule_pb_list, [expected_pb]) def test_commit(self): from gcloud._testing import _Monkey - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) from gcloud.bigtable._testing import _FakeStub from gcloud.bigtable import row as MUT @@ -623,11 +590,11 @@ def test_commit(self): # Create request_pb value = b'bytes-value' # We will call row.append_cell_value(COLUMN_FAMILY_ID, COLUMN, value). - request_pb = messages_v1_pb2.ReadModifyWriteRowRequest( + request_pb = _ReadModifyWriteRowRequestPB( table_name=table_name, row_key=row_key, rules=[ - data_v1_pb2.ReadModifyWriteRule( + _ReadModifyWriteRulePB( family_name=column_family_id, column_qualifier=column, append_value=value, @@ -703,9 +670,6 @@ def _callFUT(self, row_response): def test_it(self): from gcloud._helpers import _datetime_from_microseconds - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - col_fam1 = u'col-fam-id' col_fam2 = u'col-fam-id2' col_name1 = b'col-name1' @@ -734,28 +698,28 @@ def test_it(self): ], }, } - sample_input = data_v1_pb2.Row( + sample_input = _RowPB( families=[ - data_v1_pb2.Family( + _FamilyPB( name=col_fam1, columns=[ - data_v1_pb2.Column( + _ColumnPB( qualifier=col_name1, cells=[ - data_v1_pb2.Cell( + _CellPB( value=cell_val1, timestamp_micros=microseconds, ), - data_v1_pb2.Cell( + _CellPB( value=cell_val2, timestamp_micros=microseconds, ), ], ), - data_v1_pb2.Column( + _ColumnPB( qualifier=col_name2, cells=[ - data_v1_pb2.Cell( + _CellPB( value=cell_val3, timestamp_micros=microseconds, ), @@ -763,13 +727,13 @@ def test_it(self): ), ], ), - data_v1_pb2.Family( + _FamilyPB( name=col_fam2, columns=[ - data_v1_pb2.Column( + _ColumnPB( qualifier=col_name3, cells=[ - data_v1_pb2.Cell( + _CellPB( value=cell_val4, timestamp_micros=microseconds, ), @@ -790,9 +754,6 @@ def _callFUT(self, family_pb): def test_it(self): from gcloud._helpers import _datetime_from_microseconds - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - col_fam1 = u'col-fam-id' col_name1 = b'col-name1' col_name2 = b'col-name2' @@ -812,26 +773,26 @@ def test_it(self): ], } expected_output = (col_fam1, expected_dict) - sample_input = data_v1_pb2.Family( + sample_input = _FamilyPB( name=col_fam1, columns=[ - data_v1_pb2.Column( + _ColumnPB( qualifier=col_name1, cells=[ - data_v1_pb2.Cell( + _CellPB( value=cell_val1, timestamp_micros=microseconds, ), - data_v1_pb2.Cell( + _CellPB( value=cell_val2, timestamp_micros=microseconds, ), ], ), - data_v1_pb2.Column( + _ColumnPB( qualifier=col_name2, cells=[ - data_v1_pb2.Cell( + _CellPB( value=cell_val3, timestamp_micros=microseconds, ), @@ -842,6 +803,90 @@ def test_it(self): self.assertEqual(expected_output, self._callFUT(sample_input)) +def _CheckAndMutateRowRequestPB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + bigtable_pb2 as messages_v2_pb2) + return messages_v2_pb2.CheckAndMutateRowRequest(*args, **kw) + + +def _CheckAndMutateRowResponsePB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + bigtable_pb2 as messages_v2_pb2) + return messages_v2_pb2.CheckAndMutateRowResponse(*args, **kw) + + +def _MutateRowRequestPB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + bigtable_pb2 as messages_v2_pb2) + return messages_v2_pb2.MutateRowRequest(*args, **kw) + + +def _ReadModifyWriteRowRequestPB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + bigtable_pb2 as messages_v2_pb2) + return messages_v2_pb2.ReadModifyWriteRowRequest(*args, **kw) + + +def _CellPB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + data_pb2 as data_v2_pb2) + return data_v2_pb2.Cell(*args, **kw) + + +def _ColumnPB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + data_pb2 as data_v2_pb2) + return data_v2_pb2.Column(*args, **kw) + + +def _FamilyPB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + data_pb2 as data_v2_pb2) + return data_v2_pb2.Family(*args, **kw) + + +def _MutationPB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + data_pb2 as data_v2_pb2) + return data_v2_pb2.Mutation(*args, **kw) + + +def _MutationSetCellPB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + data_pb2 as data_v2_pb2) + return data_v2_pb2.Mutation.SetCell(*args, **kw) + + +def _MutationDeleteFromColumnPB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + data_pb2 as data_v2_pb2) + return data_v2_pb2.Mutation.DeleteFromColumn(*args, **kw) + + +def _MutationDeleteFromFamilyPB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + data_pb2 as data_v2_pb2) + return data_v2_pb2.Mutation.DeleteFromFamily(*args, **kw) + + +def _MutationDeleteFromRowPB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + data_pb2 as data_v2_pb2) + return data_v2_pb2.Mutation.DeleteFromRow(*args, **kw) + + +def _RowPB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + data_pb2 as data_v2_pb2) + return data_v2_pb2.Row(*args, **kw) + + +def _ReadModifyWriteRulePB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + data_pb2 as data_v2_pb2) + return data_v2_pb2.ReadModifyWriteRule(*args, **kw) + + class _Client(object): data_stub = None diff --git a/gcloud/bigtable/test_row_data.py b/gcloud/bigtable/test_row_data.py index 2c3c9ba260f5..6fae4d18c40b 100644 --- a/gcloud/bigtable/test_row_data.py +++ b/gcloud/bigtable/test_row_data.py @@ -105,8 +105,6 @@ def test_constructor(self): partial_row_data = self._makeOne(row_key) self.assertTrue(partial_row_data._row_key is row_key) self.assertEqual(partial_row_data._cells, {}) - self.assertFalse(partial_row_data._committed) - self.assertFalse(partial_row_data._chunks_encountered) def test___eq__(self): row_key = object() @@ -133,13 +131,6 @@ def test___ne__(self): partial_row_data2 = self._makeOne(row_key2) self.assertNotEqual(partial_row_data1, partial_row_data2) - def test___ne__committed(self): - row_key = object() - partial_row_data1 = self._makeOne(row_key) - partial_row_data1._committed = object() - partial_row_data2 = self._makeOne(row_key) - self.assertNotEqual(partial_row_data1, partial_row_data2) - def test___ne__cells(self): row_key = object() partial_row_data1 = self._makeOne(row_key) @@ -190,202 +181,6 @@ def test_row_key_getter(self): partial_row_data = self._makeOne(row_key) self.assertTrue(partial_row_data.row_key is row_key) - def test_committed_getter(self): - partial_row_data = self._makeOne(None) - partial_row_data._committed = value = object() - self.assertTrue(partial_row_data.committed is value) - - def test_clear(self): - partial_row_data = self._makeOne(None) - cells = {1: 2} - partial_row_data._cells = cells - self.assertEqual(partial_row_data.cells, cells) - partial_row_data._committed = True - partial_row_data._chunks_encountered = True - partial_row_data.clear() - self.assertFalse(partial_row_data.committed) - self.assertFalse(partial_row_data._chunks_encountered) - self.assertEqual(partial_row_data.cells, {}) - - def test__handle_commit_row(self): - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) - - partial_row_data = self._makeOne(None) - chunk = messages_v1_pb2.ReadRowsResponse.Chunk(commit_row=True) - - index = last_chunk_index = 1 - self.assertFalse(partial_row_data.committed) - partial_row_data._handle_commit_row(chunk, index, last_chunk_index) - self.assertTrue(partial_row_data.committed) - - def test__handle_commit_row_false(self): - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) - - partial_row_data = self._makeOne(None) - chunk = messages_v1_pb2.ReadRowsResponse.Chunk(commit_row=False) - - with self.assertRaises(ValueError): - partial_row_data._handle_commit_row(chunk, None, None) - - def test__handle_commit_row_not_last_chunk(self): - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) - - partial_row_data = self._makeOne(None) - chunk = messages_v1_pb2.ReadRowsResponse.Chunk(commit_row=True) - - with self.assertRaises(ValueError): - index = 0 - last_chunk_index = 1 - self.assertNotEqual(index, last_chunk_index) - partial_row_data._handle_commit_row( - chunk, index, last_chunk_index) - - def test__handle_reset_row(self): - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) - - partial_row_data = self._makeOne(None) - chunk = messages_v1_pb2.ReadRowsResponse.Chunk(reset_row=True) - - # Modify the PartialRowData object so we can check it's been cleared. - partial_row_data._cells = {1: 2} - partial_row_data._committed = True - partial_row_data._handle_reset_row(chunk) - self.assertEqual(partial_row_data.cells, {}) - self.assertFalse(partial_row_data.committed) - - def test__handle_reset_row_failure(self): - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) - - partial_row_data = self._makeOne(None) - chunk = messages_v1_pb2.ReadRowsResponse.Chunk(reset_row=False) - - with self.assertRaises(ValueError): - partial_row_data._handle_reset_row(chunk) - - def test__handle_row_contents(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) - from gcloud.bigtable.row_data import Cell - - partial_row_data = self._makeOne(None) - cell1_pb = data_v1_pb2.Cell(timestamp_micros=1, value=b'val1') - cell2_pb = data_v1_pb2.Cell(timestamp_micros=200, value=b'val2') - cell3_pb = data_v1_pb2.Cell(timestamp_micros=300000, value=b'val3') - col1 = b'col1' - col2 = b'col2' - columns = [ - data_v1_pb2.Column(qualifier=col1, cells=[cell1_pb, cell2_pb]), - data_v1_pb2.Column(qualifier=col2, cells=[cell3_pb]), - ] - family_name = u'name' - row_contents = data_v1_pb2.Family(name=family_name, columns=columns) - chunk = messages_v1_pb2.ReadRowsResponse.Chunk( - row_contents=row_contents) - - self.assertEqual(partial_row_data.cells, {}) - partial_row_data._handle_row_contents(chunk) - expected_cells = { - family_name: { - col1: [Cell.from_pb(cell1_pb), Cell.from_pb(cell2_pb)], - col2: [Cell.from_pb(cell3_pb)], - } - } - self.assertEqual(partial_row_data.cells, expected_cells) - - def test_update_from_read_rows(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) - - row_key = b'row-key' - partial_row_data = self._makeOne(row_key) - - # Set-up chunk1, some data that will be reset by chunk2. - ignored_family_name = u'ignore-name' - row_contents = data_v1_pb2.Family(name=ignored_family_name) - chunk1 = messages_v1_pb2.ReadRowsResponse.Chunk( - row_contents=row_contents) - - # Set-up chunk2, a reset row. - chunk2 = messages_v1_pb2.ReadRowsResponse.Chunk(reset_row=True) - - # Set-up chunk3, a column family with no columns. - family_name = u'name' - row_contents = data_v1_pb2.Family(name=family_name) - chunk3 = messages_v1_pb2.ReadRowsResponse.Chunk( - row_contents=row_contents) - - # Set-up chunk4, a commit row. - chunk4 = messages_v1_pb2.ReadRowsResponse.Chunk(commit_row=True) - - # Prepare request and make sure PartialRowData is empty before. - read_rows_response_pb = messages_v1_pb2.ReadRowsResponse( - row_key=row_key, chunks=[chunk1, chunk2, chunk3, chunk4]) - self.assertEqual(partial_row_data.cells, {}) - self.assertFalse(partial_row_data.committed) - self.assertFalse(partial_row_data._chunks_encountered) - - # Parse the response and make sure the cells took place. - partial_row_data.update_from_read_rows(read_rows_response_pb) - self.assertEqual(partial_row_data.cells, {family_name: {}}) - self.assertFalse(ignored_family_name in partial_row_data.cells) - self.assertTrue(partial_row_data.committed) - self.assertTrue(partial_row_data._chunks_encountered) - - def test_update_from_read_rows_while_committed(self): - partial_row_data = self._makeOne(None) - partial_row_data._committed = True - self.assertFalse(partial_row_data._chunks_encountered) - - with self.assertRaises(ValueError): - partial_row_data.update_from_read_rows(None) - - self.assertFalse(partial_row_data._chunks_encountered) - - def test_update_from_read_rows_row_key_disagree(self): - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) - - row_key1 = b'row-key1' - row_key2 = b'row-key2' - partial_row_data = self._makeOne(row_key1) - self.assertFalse(partial_row_data._chunks_encountered) - - self.assertNotEqual(row_key1, row_key2) - read_rows_response_pb = messages_v1_pb2.ReadRowsResponse( - row_key=row_key2) - with self.assertRaises(ValueError): - partial_row_data.update_from_read_rows(read_rows_response_pb) - - self.assertFalse(partial_row_data._chunks_encountered) - - def test_update_from_read_rows_empty_chunk(self): - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) - - row_key = b'row-key' - partial_row_data = self._makeOne(row_key) - self.assertFalse(partial_row_data._chunks_encountered) - - chunk = messages_v1_pb2.ReadRowsResponse.Chunk() - read_rows_response_pb = messages_v1_pb2.ReadRowsResponse( - row_key=row_key, chunks=[chunk]) - - # This makes it an "empty" chunk. - self.assertEqual(chunk.WhichOneof('chunk'), None) - with self.assertRaises(ValueError): - partial_row_data.update_from_read_rows(read_rows_response_pb) - - self.assertFalse(partial_row_data._chunks_encountered) - class TestPartialRowsData(unittest2.TestCase): @@ -444,6 +239,16 @@ def test___ne__(self): partial_rows_data2 = self._makeOne(response_iterator2) self.assertNotEqual(partial_rows_data1, partial_rows_data2) + def test_state_start(self): + prd = self._makeOne([]) + self.assertEqual(prd.state, prd.START) + + def test_state_new_row_w_row(self): + prd = self._makeOne([]) + prd._last_scanned_row_key = '' + prd._row = object() + self.assertEqual(prd.state, prd.NEW_ROW) + def test_rows_getter(self): partial_rows_data = self._makeOne(None) partial_rows_data._rows = value = object() @@ -456,43 +261,7 @@ def test_cancel(self): partial_rows_data.cancel() self.assertEqual(response_iterator.cancel_calls, 1) - def test_consume_next(self): - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) - from gcloud.bigtable.row_data import PartialRowData - - row_key = b'row-key' - value_pb = messages_v1_pb2.ReadRowsResponse(row_key=row_key) - response_iterator = _MockCancellableIterator(value_pb) - partial_rows_data = self._makeOne(response_iterator) - self.assertEqual(partial_rows_data.rows, {}) - partial_rows_data.consume_next() - expected_rows = {row_key: PartialRowData(row_key)} - self.assertEqual(partial_rows_data.rows, expected_rows) - - def test_consume_next_row_exists(self): - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) - from gcloud.bigtable.row_data import PartialRowData - - row_key = b'row-key' - chunk = messages_v1_pb2.ReadRowsResponse.Chunk(commit_row=True) - value_pb = messages_v1_pb2.ReadRowsResponse( - row_key=row_key, chunks=[chunk]) - response_iterator = _MockCancellableIterator(value_pb) - partial_rows_data = self._makeOne(response_iterator) - existing_values = PartialRowData(row_key) - partial_rows_data._rows[row_key] = existing_values - self.assertFalse(existing_values.committed) - partial_rows_data.consume_next() - self.assertTrue(existing_values.committed) - self.assertEqual(existing_values.cells, {}) - - def test_consume_next_empty_iter(self): - response_iterator = _MockCancellableIterator() - partial_rows_data = self._makeOne(response_iterator) - with self.assertRaises(StopIteration): - partial_rows_data.consume_next() + # 'consume_nest' tested via 'TestPartialRowsData_JSON_acceptance_tests' def test_consume_all(self): klass = self._getDoNothingClass() @@ -518,41 +287,6 @@ def test_consume_all_with_max_loops(self): self.assertEqual( list(response_iterator.iter_values), [value2, value3]) - -class TestPartialRowsDataV2(unittest2.TestCase): - - _json_tests = None - - def _getTargetClass(self): - from gcloud.bigtable.row_data import PartialRowsDataV2 - return PartialRowsDataV2 - - def _makeOne(self, *args, **kwargs): - return self._getTargetClass()(*args, **kwargs) - - def _load_json_test(self, test_name): - import os - if self.__class__._json_tests is None: - dirname = os.path.dirname(__file__) - filename = os.path.join(dirname, 'read-rows-acceptance-test.json') - raw = _parse_readrows_acceptance_tests(filename) - tests = self.__class__._json_tests = {} - for (name, chunks, results) in raw: - tests[name] = chunks, results - return self.__class__._json_tests[test_name] - - # Not part of the JSON acceptance tests. - - def test_state_start(self): - prd = self._makeOne([]) - self.assertEqual(prd.state, prd.START) - - def test_state_new_row_w_row(self): - prd = self._makeOne([]) - prd._last_scanned_row_key = '' - prd._row = object() - self.assertEqual(prd.state, prd.NEW_ROW) - def test__copy_from_current_unset(self): prd = self._makeOne([]) chunks = _generate_cell_chunks(['']) @@ -571,7 +305,7 @@ def test__copy_from_current_blank(self): TIMESTAMP_MICROS = 100 LABELS = ['L1', 'L2'] prd = self._makeOne([]) - prd._cell = _PartialCellV2() + prd._cell = _PartialCellData() chunks = _generate_cell_chunks(['']) chunk = chunks[0] chunk.row_key = ROW_KEY @@ -588,7 +322,7 @@ def test__copy_from_current_blank(self): def test__copy_from_previous_unset(self): prd = self._makeOne([]) - cell = _PartialCellV2() + cell = _PartialCellData() prd._copy_from_previous(cell) self.assertEqual(cell.row_key, '') self.assertEqual(cell.family_name, u'') @@ -603,14 +337,14 @@ def test__copy_from_previous_blank(self): TIMESTAMP_MICROS = 100 LABELS = ['L1', 'L2'] prd = self._makeOne([]) - cell = _PartialCellV2( + cell = _PartialCellData( row_key=ROW_KEY, family_name=FAMILY_NAME, qualifier=QUALIFIER, timestamp_micros=TIMESTAMP_MICROS, labels=LABELS, ) - prd._previous_cell = _PartialCellV2() + prd._previous_cell = _PartialCellData() prd._copy_from_previous(cell) self.assertEqual(cell.row_key, ROW_KEY) self.assertEqual(cell.family_name, FAMILY_NAME) @@ -625,14 +359,14 @@ def test__copy_from_previous_filled(self): TIMESTAMP_MICROS = 100 LABELS = ['L1', 'L2'] prd = self._makeOne([]) - prd._previous_cell = _PartialCellV2( + prd._previous_cell = _PartialCellData( row_key=ROW_KEY, family_name=FAMILY_NAME, qualifier=QUALIFIER, timestamp_micros=TIMESTAMP_MICROS, labels=LABELS, ) - cell = _PartialCellV2() + cell = _PartialCellData() prd._copy_from_previous(cell) self.assertEqual(cell.row_key, ROW_KEY) self.assertEqual(cell.family_name, FAMILY_NAME) @@ -687,6 +421,29 @@ def test_invalid_empty_second_chunk(self): with self.assertRaises(InvalidChunk): prd.consume_next() + +class TestPartialRowsData_JSON_acceptance_tests(unittest2.TestCase): + + _json_tests = None + + def _getTargetClass(self): + from gcloud.bigtable.row_data import PartialRowsData + return PartialRowsData + + def _makeOne(self, *args, **kwargs): + return self._getTargetClass()(*args, **kwargs) + + def _load_json_test(self, test_name): + import os + if self.__class__._json_tests is None: + dirname = os.path.dirname(__file__) + filename = os.path.join(dirname, 'read-rows-acceptance-test.json') + raw = _parse_readrows_acceptance_tests(filename) + tests = self.__class__._json_tests = {} + for (name, chunks, results) in raw: + tests[name] = chunks, results + return self.__class__._json_tests[test_name] + # JSON Error cases: invalid chunks def _fail_during_consume(self, testcase_name): @@ -910,6 +667,9 @@ def cancel(self): def next(self): return next(self.iter_values) + def __next__(self): # pragma: NO COVER Py3k + return self.next() + class _Dummy(object): @@ -917,7 +677,7 @@ def __init__(self, **kw): self.__dict__.update(kw) -class _PartialCellV2(object): +class _PartialCellData(object): row_key = '' family_name = u'' diff --git a/gcloud/bigtable/test_row_filters.py b/gcloud/bigtable/test_row_filters.py index 768ffb79bd32..594a4fe47c2b 100644 --- a/gcloud/bigtable/test_row_filters.py +++ b/gcloud/bigtable/test_row_filters.py @@ -60,13 +60,10 @@ def _makeOne(self, *args, **kwargs): return self._getTargetClass()(*args, **kwargs) def test_to_pb(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - flag = True row_filter = self._makeOne(flag) pb_val = row_filter.to_pb() - expected_pb = data_v1_pb2.RowFilter(sink=flag) + expected_pb = _RowFilterPB(sink=flag) self.assertEqual(pb_val, expected_pb) @@ -80,13 +77,10 @@ def _makeOne(self, *args, **kwargs): return self._getTargetClass()(*args, **kwargs) def test_to_pb(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - flag = True row_filter = self._makeOne(flag) pb_val = row_filter.to_pb() - expected_pb = data_v1_pb2.RowFilter(pass_all_filter=flag) + expected_pb = _RowFilterPB(pass_all_filter=flag) self.assertEqual(pb_val, expected_pb) @@ -100,13 +94,10 @@ def _makeOne(self, *args, **kwargs): return self._getTargetClass()(*args, **kwargs) def test_to_pb(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - flag = True row_filter = self._makeOne(flag) pb_val = row_filter.to_pb() - expected_pb = data_v1_pb2.RowFilter(block_all_filter=flag) + expected_pb = _RowFilterPB(block_all_filter=flag) self.assertEqual(pb_val, expected_pb) @@ -159,13 +150,10 @@ def _makeOne(self, *args, **kwargs): return self._getTargetClass()(*args, **kwargs) def test_to_pb(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - regex = b'row-key-regex' row_filter = self._makeOne(regex) pb_val = row_filter.to_pb() - expected_pb = data_v1_pb2.RowFilter(row_key_regex_filter=regex) + expected_pb = _RowFilterPB(row_key_regex_filter=regex) self.assertEqual(pb_val, expected_pb) @@ -196,13 +184,10 @@ def test___eq__same_value(self): self.assertEqual(row_filter1, row_filter2) def test_to_pb(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - sample = 0.25 row_filter = self._makeOne(sample) pb_val = row_filter.to_pb() - expected_pb = data_v1_pb2.RowFilter(row_sample_filter=sample) + expected_pb = _RowFilterPB(row_sample_filter=sample) self.assertEqual(pb_val, expected_pb) @@ -216,13 +201,10 @@ def _makeOne(self, *args, **kwargs): return self._getTargetClass()(*args, **kwargs) def test_to_pb(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - regex = u'family-regex' row_filter = self._makeOne(regex) pb_val = row_filter.to_pb() - expected_pb = data_v1_pb2.RowFilter(family_name_regex_filter=regex) + expected_pb = _RowFilterPB(family_name_regex_filter=regex) self.assertEqual(pb_val, expected_pb) @@ -236,13 +218,10 @@ def _makeOne(self, *args, **kwargs): return self._getTargetClass()(*args, **kwargs) def test_to_pb(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - regex = b'column-regex' row_filter = self._makeOne(regex) pb_val = row_filter.to_pb() - expected_pb = data_v1_pb2.RowFilter( + expected_pb = _RowFilterPB( column_qualifier_regex_filter=regex) self.assertEqual(pb_val, expected_pb) @@ -288,9 +267,6 @@ def test___ne__same_value(self): def _to_pb_helper(self, start_micros=None, end_micros=None): import datetime from gcloud._helpers import _EPOCH - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - pb_kwargs = {} start = None @@ -303,7 +279,7 @@ def _to_pb_helper(self, start_micros=None, end_micros=None): pb_kwargs['end_timestamp_micros'] = end_micros time_range = self._makeOne(start=start, end=end) - expected_pb = data_v1_pb2.TimestampRange(**pb_kwargs) + expected_pb = _TimestampRangePB(**pb_kwargs) self.assertEqual(time_range.to_pb(), expected_pb) def test_to_pb(self): @@ -351,15 +327,13 @@ def test___eq__same_value(self): self.assertEqual(row_filter1, row_filter2) def test_to_pb(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) from gcloud.bigtable.row_filters import TimestampRange range_ = TimestampRange() row_filter = self._makeOne(range_) pb_val = row_filter.to_pb() - expected_pb = data_v1_pb2.RowFilter( - timestamp_range_filter=data_v1_pb2.TimestampRange()) + expected_pb = _RowFilterPB( + timestamp_range_filter=_TimestampRangePB()) self.assertEqual(pb_val, expected_pb) @@ -434,71 +408,56 @@ def test___eq__type_differ(self): self.assertNotEqual(row_filter1, row_filter2) def test_to_pb(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - column_family_id = u'column-family-id' row_filter = self._makeOne(column_family_id) - col_range_pb = data_v1_pb2.ColumnRange(family_name=column_family_id) - expected_pb = data_v1_pb2.RowFilter(column_range_filter=col_range_pb) + col_range_pb = _ColumnRangePB(family_name=column_family_id) + expected_pb = _RowFilterPB(column_range_filter=col_range_pb) self.assertEqual(row_filter.to_pb(), expected_pb) def test_to_pb_inclusive_start(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - column_family_id = u'column-family-id' column = b'column' row_filter = self._makeOne(column_family_id, start_column=column) - col_range_pb = data_v1_pb2.ColumnRange( + col_range_pb = _ColumnRangePB( family_name=column_family_id, - start_qualifier_inclusive=column, + start_qualifier_closed=column, ) - expected_pb = data_v1_pb2.RowFilter(column_range_filter=col_range_pb) + expected_pb = _RowFilterPB(column_range_filter=col_range_pb) self.assertEqual(row_filter.to_pb(), expected_pb) def test_to_pb_exclusive_start(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - column_family_id = u'column-family-id' column = b'column' row_filter = self._makeOne(column_family_id, start_column=column, inclusive_start=False) - col_range_pb = data_v1_pb2.ColumnRange( + col_range_pb = _ColumnRangePB( family_name=column_family_id, - start_qualifier_exclusive=column, + start_qualifier_open=column, ) - expected_pb = data_v1_pb2.RowFilter(column_range_filter=col_range_pb) + expected_pb = _RowFilterPB(column_range_filter=col_range_pb) self.assertEqual(row_filter.to_pb(), expected_pb) def test_to_pb_inclusive_end(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - column_family_id = u'column-family-id' column = b'column' row_filter = self._makeOne(column_family_id, end_column=column) - col_range_pb = data_v1_pb2.ColumnRange( + col_range_pb = _ColumnRangePB( family_name=column_family_id, - end_qualifier_inclusive=column, + end_qualifier_closed=column, ) - expected_pb = data_v1_pb2.RowFilter(column_range_filter=col_range_pb) + expected_pb = _RowFilterPB(column_range_filter=col_range_pb) self.assertEqual(row_filter.to_pb(), expected_pb) def test_to_pb_exclusive_end(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - column_family_id = u'column-family-id' column = b'column' row_filter = self._makeOne(column_family_id, end_column=column, inclusive_end=False) - col_range_pb = data_v1_pb2.ColumnRange( + col_range_pb = _ColumnRangePB( family_name=column_family_id, - end_qualifier_exclusive=column, + end_qualifier_open=column, ) - expected_pb = data_v1_pb2.RowFilter(column_range_filter=col_range_pb) + expected_pb = _RowFilterPB(column_range_filter=col_range_pb) self.assertEqual(row_filter.to_pb(), expected_pb) @@ -512,13 +471,10 @@ def _makeOne(self, *args, **kwargs): return self._getTargetClass()(*args, **kwargs) def test_to_pb(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - regex = b'value-regex' row_filter = self._makeOne(regex) pb_val = row_filter.to_pb() - expected_pb = data_v1_pb2.RowFilter(value_regex_filter=regex) + expected_pb = _RowFilterPB(value_regex_filter=regex) self.assertEqual(pb_val, expected_pb) @@ -579,52 +535,37 @@ def test___eq__type_differ(self): self.assertNotEqual(row_filter1, row_filter2) def test_to_pb(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - row_filter = self._makeOne() - expected_pb = data_v1_pb2.RowFilter( - value_range_filter=data_v1_pb2.ValueRange()) + expected_pb = _RowFilterPB( + value_range_filter=_ValueRangePB()) self.assertEqual(row_filter.to_pb(), expected_pb) def test_to_pb_inclusive_start(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - value = b'some-value' row_filter = self._makeOne(start_value=value) - val_range_pb = data_v1_pb2.ValueRange(start_value_inclusive=value) - expected_pb = data_v1_pb2.RowFilter(value_range_filter=val_range_pb) + val_range_pb = _ValueRangePB(start_value_closed=value) + expected_pb = _RowFilterPB(value_range_filter=val_range_pb) self.assertEqual(row_filter.to_pb(), expected_pb) def test_to_pb_exclusive_start(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - value = b'some-value' row_filter = self._makeOne(start_value=value, inclusive_start=False) - val_range_pb = data_v1_pb2.ValueRange(start_value_exclusive=value) - expected_pb = data_v1_pb2.RowFilter(value_range_filter=val_range_pb) + val_range_pb = _ValueRangePB(start_value_open=value) + expected_pb = _RowFilterPB(value_range_filter=val_range_pb) self.assertEqual(row_filter.to_pb(), expected_pb) def test_to_pb_inclusive_end(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - value = b'some-value' row_filter = self._makeOne(end_value=value) - val_range_pb = data_v1_pb2.ValueRange(end_value_inclusive=value) - expected_pb = data_v1_pb2.RowFilter(value_range_filter=val_range_pb) + val_range_pb = _ValueRangePB(end_value_closed=value) + expected_pb = _RowFilterPB(value_range_filter=val_range_pb) self.assertEqual(row_filter.to_pb(), expected_pb) def test_to_pb_exclusive_end(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - value = b'some-value' row_filter = self._makeOne(end_value=value, inclusive_end=False) - val_range_pb = data_v1_pb2.ValueRange(end_value_exclusive=value) - expected_pb = data_v1_pb2.RowFilter(value_range_filter=val_range_pb) + val_range_pb = _ValueRangePB(end_value_open=value) + expected_pb = _RowFilterPB(value_range_filter=val_range_pb) self.assertEqual(row_filter.to_pb(), expected_pb) @@ -672,13 +613,10 @@ def _makeOne(self, *args, **kwargs): return self._getTargetClass()(*args, **kwargs) def test_to_pb(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - num_cells = 76 row_filter = self._makeOne(num_cells) pb_val = row_filter.to_pb() - expected_pb = data_v1_pb2.RowFilter( + expected_pb = _RowFilterPB( cells_per_row_offset_filter=num_cells) self.assertEqual(pb_val, expected_pb) @@ -693,13 +631,10 @@ def _makeOne(self, *args, **kwargs): return self._getTargetClass()(*args, **kwargs) def test_to_pb(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - num_cells = 189 row_filter = self._makeOne(num_cells) pb_val = row_filter.to_pb() - expected_pb = data_v1_pb2.RowFilter( + expected_pb = _RowFilterPB( cells_per_row_limit_filter=num_cells) self.assertEqual(pb_val, expected_pb) @@ -714,13 +649,10 @@ def _makeOne(self, *args, **kwargs): return self._getTargetClass()(*args, **kwargs) def test_to_pb(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - num_cells = 10 row_filter = self._makeOne(num_cells) pb_val = row_filter.to_pb() - expected_pb = data_v1_pb2.RowFilter( + expected_pb = _RowFilterPB( cells_per_column_limit_filter=num_cells) self.assertEqual(pb_val, expected_pb) @@ -735,13 +667,10 @@ def _makeOne(self, *args, **kwargs): return self._getTargetClass()(*args, **kwargs) def test_to_pb(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - flag = True row_filter = self._makeOne(flag) pb_val = row_filter.to_pb() - expected_pb = data_v1_pb2.RowFilter(strip_value_transformer=flag) + expected_pb = _RowFilterPB(strip_value_transformer=flag) self.assertEqual(pb_val, expected_pb) @@ -772,13 +701,10 @@ def test___eq__same_value(self): self.assertEqual(row_filter1, row_filter2) def test_to_pb(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - label = u'label' row_filter = self._makeOne(label) pb_val = row_filter.to_pb() - expected_pb = data_v1_pb2.RowFilter(apply_label_transformer=label) + expected_pb = _RowFilterPB(apply_label_transformer=label) self.assertEqual(pb_val, expected_pb) @@ -823,8 +749,6 @@ def _makeOne(self, *args, **kwargs): return self._getTargetClass()(*args, **kwargs) def test_to_pb(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) from gcloud.bigtable.row_filters import RowSampleFilter from gcloud.bigtable.row_filters import StripValueTransformerFilter @@ -837,16 +761,14 @@ def test_to_pb(self): row_filter3 = self._makeOne(filters=[row_filter1, row_filter2]) filter_pb = row_filter3.to_pb() - expected_pb = data_v1_pb2.RowFilter( - chain=data_v1_pb2.RowFilter.Chain( + expected_pb = _RowFilterPB( + chain=_RowFilterChainPB( filters=[row_filter1_pb, row_filter2_pb], ), ) self.assertEqual(filter_pb, expected_pb) def test_to_pb_nested(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) from gcloud.bigtable.row_filters import CellsRowLimitFilter from gcloud.bigtable.row_filters import RowSampleFilter from gcloud.bigtable.row_filters import StripValueTransformerFilter @@ -863,8 +785,8 @@ def test_to_pb_nested(self): row_filter5 = self._makeOne(filters=[row_filter3, row_filter4]) filter_pb = row_filter5.to_pb() - expected_pb = data_v1_pb2.RowFilter( - chain=data_v1_pb2.RowFilter.Chain( + expected_pb = _RowFilterPB( + chain=_RowFilterChainPB( filters=[row_filter3_pb, row_filter4_pb], ), ) @@ -881,8 +803,6 @@ def _makeOne(self, *args, **kwargs): return self._getTargetClass()(*args, **kwargs) def test_to_pb(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) from gcloud.bigtable.row_filters import RowSampleFilter from gcloud.bigtable.row_filters import StripValueTransformerFilter @@ -895,16 +815,14 @@ def test_to_pb(self): row_filter3 = self._makeOne(filters=[row_filter1, row_filter2]) filter_pb = row_filter3.to_pb() - expected_pb = data_v1_pb2.RowFilter( - interleave=data_v1_pb2.RowFilter.Interleave( + expected_pb = _RowFilterPB( + interleave=_RowFilterInterleavePB( filters=[row_filter1_pb, row_filter2_pb], ), ) self.assertEqual(filter_pb, expected_pb) def test_to_pb_nested(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) from gcloud.bigtable.row_filters import CellsRowLimitFilter from gcloud.bigtable.row_filters import RowSampleFilter from gcloud.bigtable.row_filters import StripValueTransformerFilter @@ -921,8 +839,8 @@ def test_to_pb_nested(self): row_filter5 = self._makeOne(filters=[row_filter3, row_filter4]) filter_pb = row_filter5.to_pb() - expected_pb = data_v1_pb2.RowFilter( - interleave=data_v1_pb2.RowFilter.Interleave( + expected_pb = _RowFilterPB( + interleave=_RowFilterInterleavePB( filters=[row_filter3_pb, row_filter4_pb], ), ) @@ -972,8 +890,6 @@ def test___eq__type_differ(self): self.assertNotEqual(cond_filter1, cond_filter2) def test_to_pb(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) from gcloud.bigtable.row_filters import CellsRowOffsetFilter from gcloud.bigtable.row_filters import RowSampleFilter from gcloud.bigtable.row_filters import StripValueTransformerFilter @@ -991,8 +907,8 @@ def test_to_pb(self): false_filter=row_filter3) filter_pb = row_filter4.to_pb() - expected_pb = data_v1_pb2.RowFilter( - condition=data_v1_pb2.RowFilter.Condition( + expected_pb = _RowFilterPB( + condition=_RowFilterConditionPB( predicate_filter=row_filter1_pb, true_filter=row_filter2_pb, false_filter=row_filter3_pb, @@ -1001,8 +917,6 @@ def test_to_pb(self): self.assertEqual(filter_pb, expected_pb) def test_to_pb_true_only(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) from gcloud.bigtable.row_filters import RowSampleFilter from gcloud.bigtable.row_filters import StripValueTransformerFilter @@ -1015,8 +929,8 @@ def test_to_pb_true_only(self): row_filter3 = self._makeOne(row_filter1, true_filter=row_filter2) filter_pb = row_filter3.to_pb() - expected_pb = data_v1_pb2.RowFilter( - condition=data_v1_pb2.RowFilter.Condition( + expected_pb = _RowFilterPB( + condition=_RowFilterConditionPB( predicate_filter=row_filter1_pb, true_filter=row_filter2_pb, ), @@ -1024,8 +938,6 @@ def test_to_pb_true_only(self): self.assertEqual(filter_pb, expected_pb) def test_to_pb_false_only(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) from gcloud.bigtable.row_filters import RowSampleFilter from gcloud.bigtable.row_filters import StripValueTransformerFilter @@ -1038,10 +950,52 @@ def test_to_pb_false_only(self): row_filter3 = self._makeOne(row_filter1, false_filter=row_filter2) filter_pb = row_filter3.to_pb() - expected_pb = data_v1_pb2.RowFilter( - condition=data_v1_pb2.RowFilter.Condition( + expected_pb = _RowFilterPB( + condition=_RowFilterConditionPB( predicate_filter=row_filter1_pb, false_filter=row_filter2_pb, ), ) self.assertEqual(filter_pb, expected_pb) + + +def _ColumnRangePB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + data_pb2 as data_v2_pb2) + return data_v2_pb2.ColumnRange(*args, **kw) + + +def _RowFilterPB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + data_pb2 as data_v2_pb2) + return data_v2_pb2.RowFilter(*args, **kw) + + +def _RowFilterChainPB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + data_pb2 as data_v2_pb2) + return data_v2_pb2.RowFilter.Chain(*args, **kw) + + +def _RowFilterConditionPB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + data_pb2 as data_v2_pb2) + return data_v2_pb2.RowFilter.Condition(*args, **kw) + + +def _RowFilterInterleavePB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + data_pb2 as data_v2_pb2) + return data_v2_pb2.RowFilter.Interleave(*args, **kw) + + +def _TimestampRangePB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + data_pb2 as data_v2_pb2) + return data_v2_pb2.TimestampRange(*args, **kw) + + +def _ValueRangePB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + data_pb2 as data_v2_pb2) + return data_v2_pb2.ValueRange(*args, **kw) diff --git a/gcloud/bigtable/test_table.py b/gcloud/bigtable/test_table.py index 0f015777aadf..a6339329dfca 100644 --- a/gcloud/bigtable/test_table.py +++ b/gcloud/bigtable/test_table.py @@ -18,6 +18,12 @@ class TestTable(unittest2.TestCase): + ROW_KEY = b'row-key' + FAMILY_NAME = u'family' + QUALIFIER = b'qualifier' + TIMESTAMP_MICROS = 100 + VALUE = b'value' + def _getTargetClass(self): from gcloud.bigtable.table import Table return Table @@ -125,10 +131,7 @@ def test___ne__(self): self.assertNotEqual(table1, table2) def _create_test_helper(self, initial_split_keys): - from gcloud.bigtable._generated import ( - bigtable_table_data_pb2 as data_v1_pb2) - from gcloud.bigtable._generated import ( - bigtable_table_service_messages_pb2 as messages_v1_pb2) + from gcloud._helpers import _to_bytes from gcloud.bigtable._testing import _FakeStub project_id = 'project-id' @@ -144,14 +147,17 @@ def _create_test_helper(self, initial_split_keys): table = self._makeOne(table_id, cluster) # Create request_pb - request_pb = messages_v1_pb2.CreateTableRequest( - initial_split_keys=initial_split_keys, + splits_pb = [ + _CreateTableRequestSplitPB(key=_to_bytes(key)) + for key in initial_split_keys or ()] + request_pb = _CreateTableRequestPB( + initial_splits=splits_pb, name=cluster_name, table_id=table_id, ) # Create response_pb - response_pb = data_v1_pb2.Table() + response_pb = _TablePB() # Patch the stub used by the API method. client._table_stub = stub = _FakeStub(response_pb) @@ -173,14 +179,10 @@ def test_create(self): self._create_test_helper(initial_split_keys) def test_create_with_split_keys(self): - initial_split_keys = ['s1', 's2'] + initial_split_keys = [b's1', b's2'] self._create_test_helper(initial_split_keys) - def _list_column_families_helper(self, column_family_name=None): - from gcloud.bigtable._generated import ( - bigtable_table_data_pb2 as data_v1_pb2) - from gcloud.bigtable._generated import ( - bigtable_table_service_messages_pb2 as messages_v1_pb2) + def _list_column_families_helper(self): from gcloud.bigtable._testing import _FakeStub project_id = 'project-id' @@ -197,15 +199,12 @@ def _list_column_families_helper(self, column_family_name=None): # Create request_pb table_name = cluster_name + '/tables/' + table_id - request_pb = messages_v1_pb2.GetTableRequest(name=table_name) + request_pb = _GetTableRequestPB(name=table_name) # Create response_pb column_family_id = 'foo' - if column_family_name is None: - column_family_name = (table_name + '/columnFamilies/' + - column_family_id) - column_family = data_v1_pb2.ColumnFamily(name=column_family_name) - response_pb = data_v1_pb2.Table( + column_family = _ColumnFamilyPB() + response_pb = _TablePB( column_families={column_family_id: column_family}, ) @@ -229,16 +228,8 @@ def _list_column_families_helper(self, column_family_name=None): def test_list_column_families(self): self._list_column_families_helper() - def test_list_column_families_failure(self): - column_family_name = 'not-the-right-format' - with self.assertRaises(ValueError): - self._list_column_families_helper( - column_family_name=column_family_name) - def test_delete(self): from google.protobuf import empty_pb2 - from gcloud.bigtable._generated import ( - bigtable_table_service_messages_pb2 as messages_v1_pb2) from gcloud.bigtable._testing import _FakeStub project_id = 'project-id' @@ -255,7 +246,7 @@ def test_delete(self): # Create request_pb table_name = cluster_name + '/tables/' + table_id - request_pb = messages_v1_pb2.DeleteTableRequest(name=table_name) + request_pb = _DeleteTableRequestPB(name=table_name) # Create response_pb response_pb = empty_pb2.Empty() @@ -275,12 +266,9 @@ def test_delete(self): {}, )]) - def _read_row_helper(self, chunks): + def _read_row_helper(self, chunks, expected_result): from gcloud._testing import _Monkey - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) from gcloud.bigtable._testing import _FakeStub - from gcloud.bigtable.row_data import PartialRowData from gcloud.bigtable import table as MUT project_id = 'project-id' @@ -303,26 +291,16 @@ def mock_create_row_request(table_name, row_key, filter_): return request_pb # Create response_iterator - row_key = b'row-key' - response_pb = messages_v1_pb2.ReadRowsResponse( - row_key=row_key, chunks=chunks) - response_iterator = [response_pb] + response_pb = _ReadRowsResponsePB(chunks=chunks) + response_iterator = iter([response_pb]) # Patch the stub used by the API method. client._data_stub = stub = _FakeStub(response_iterator) - # Create expected_result. - if chunks: - expected_result = PartialRowData(row_key) - expected_result._committed = True - expected_result._chunks_encountered = True - else: - expected_result = None - # Perform the method and check the result. filter_obj = object() with _Monkey(MUT, _create_row_request=mock_create_row_request): - result = table.read_row(row_key, filter_=filter_obj) + result = table.read_row(self.ROW_KEY, filter_=filter_obj) self.assertEqual(result, expected_result) self.assertEqual(stub.method_calls, [( @@ -330,29 +308,44 @@ def mock_create_row_request(table_name, row_key, filter_): (request_pb, timeout_seconds), {}, )]) - self.assertEqual(mock_created, [(table.name, row_key, filter_obj)]) + self.assertEqual(mock_created, + [(table.name, self.ROW_KEY, filter_obj)]) + + def test_read_empty_row(self): + chunks = [] + self._read_row_helper(chunks, None) def test_read_row(self): - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) + from gcloud.bigtable.row_data import Cell + from gcloud.bigtable.row_data import PartialRowData - chunk = messages_v1_pb2.ReadRowsResponse.Chunk(commit_row=True) + chunk = _ReadRowsResponseCellChunkPB( + row_key=self.ROW_KEY, + family_name=self.FAMILY_NAME, + qualifier=self.QUALIFIER, + timestamp_micros=self.TIMESTAMP_MICROS, + value=self.VALUE, + commit_row=True, + ) chunks = [chunk] - self._read_row_helper(chunks) - - def test_read_empty_row(self): - chunks = [] - self._read_row_helper(chunks) + expected_result = PartialRowData(row_key=self.ROW_KEY) + family = expected_result._cells.setdefault(self.FAMILY_NAME, {}) + column = family.setdefault(self.QUALIFIER, []) + column.append(Cell.from_pb(chunk)) + self._read_row_helper(chunks, expected_result) def test_read_row_still_partial(self): - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) - - # There is never a "commit row". - chunk = messages_v1_pb2.ReadRowsResponse.Chunk(reset_row=True) + chunk = _ReadRowsResponseCellChunkPB( + row_key=self.ROW_KEY, + family_name=self.FAMILY_NAME, + qualifier=self.QUALIFIER, + timestamp_micros=self.TIMESTAMP_MICROS, + value=self.VALUE, + ) + # No "commit row". chunks = [chunk] with self.assertRaises(ValueError): - self._read_row_helper(chunks) + self._read_row_helper(chunks, None) def test_read_rows(self): from gcloud._testing import _Monkey @@ -392,12 +385,11 @@ def mock_create_row_request(table_name, **kwargs): start_key = b'start-key' end_key = b'end-key' filter_obj = object() - allow_row_interleaving = True limit = 22 with _Monkey(MUT, _create_row_request=mock_create_row_request): result = table.read_rows( start_key=start_key, end_key=end_key, filter_=filter_obj, - allow_row_interleaving=allow_row_interleaving, limit=limit) + limit=limit) self.assertEqual(result, expected_result) self.assertEqual(stub.method_calls, [( @@ -409,14 +401,11 @@ def mock_create_row_request(table_name, **kwargs): 'start_key': start_key, 'end_key': end_key, 'filter_': filter_obj, - 'allow_row_interleaving': allow_row_interleaving, 'limit': limit, } self.assertEqual(mock_created, [(table.name, created_kwargs)]) def test_sample_row_keys(self): - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) from gcloud.bigtable._testing import _FakeStub project_id = 'project-id' @@ -433,7 +422,7 @@ def test_sample_row_keys(self): # Create request_pb table_name = cluster_name + '/tables/' + table_id - request_pb = messages_v1_pb2.SampleRowKeysRequest( + request_pb = _SampleRowKeysRequestPB( table_name=table_name) # Create response_iterator @@ -458,20 +447,16 @@ def test_sample_row_keys(self): class Test__create_row_request(unittest2.TestCase): def _callFUT(self, table_name, row_key=None, start_key=None, end_key=None, - filter_=None, allow_row_interleaving=None, limit=None): + filter_=None, limit=None): from gcloud.bigtable.table import _create_row_request return _create_row_request( table_name, row_key=row_key, start_key=start_key, end_key=end_key, - filter_=filter_, allow_row_interleaving=allow_row_interleaving, - limit=limit) + filter_=filter_, limit=limit) def test_table_name_only(self): - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) - table_name = 'table_name' result = self._callFUT(table_name) - expected_result = messages_v1_pb2.ReadRowsRequest( + expected_result = _ReadRowsRequestPB( table_name=table_name) self.assertEqual(result, expected_result) @@ -480,108 +465,129 @@ def test_row_key_row_range_conflict(self): self._callFUT(None, row_key=object(), end_key=object()) def test_row_key(self): - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) - table_name = 'table_name' row_key = b'row_key' result = self._callFUT(table_name, row_key=row_key) - expected_result = messages_v1_pb2.ReadRowsRequest( + expected_result = _ReadRowsRequestPB( table_name=table_name, - row_key=row_key, ) + expected_result.rows.row_keys.append(row_key) self.assertEqual(result, expected_result) def test_row_range_start_key(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) - table_name = 'table_name' start_key = b'start_key' result = self._callFUT(table_name, start_key=start_key) - expected_result = messages_v1_pb2.ReadRowsRequest( - table_name=table_name, - row_range=data_v1_pb2.RowRange(start_key=start_key), - ) + expected_result = _ReadRowsRequestPB(table_name=table_name) + expected_result.rows.row_ranges.add(start_key_closed=start_key) self.assertEqual(result, expected_result) def test_row_range_end_key(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) - table_name = 'table_name' end_key = b'end_key' result = self._callFUT(table_name, end_key=end_key) - expected_result = messages_v1_pb2.ReadRowsRequest( - table_name=table_name, - row_range=data_v1_pb2.RowRange(end_key=end_key), - ) + expected_result = _ReadRowsRequestPB(table_name=table_name) + expected_result.rows.row_ranges.add(end_key_open=end_key) self.assertEqual(result, expected_result) def test_row_range_both_keys(self): - from gcloud.bigtable._generated import ( - bigtable_data_pb2 as data_v1_pb2) - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) - table_name = 'table_name' start_key = b'start_key' end_key = b'end_key' result = self._callFUT(table_name, start_key=start_key, end_key=end_key) - expected_result = messages_v1_pb2.ReadRowsRequest( - table_name=table_name, - row_range=data_v1_pb2.RowRange( - start_key=start_key, end_key=end_key), - ) + expected_result = _ReadRowsRequestPB(table_name=table_name) + expected_result.rows.row_ranges.add( + start_key_closed=start_key, end_key_open=end_key) self.assertEqual(result, expected_result) def test_with_filter(self): - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) from gcloud.bigtable.row_filters import RowSampleFilter - table_name = 'table_name' row_filter = RowSampleFilter(0.33) result = self._callFUT(table_name, filter_=row_filter) - expected_result = messages_v1_pb2.ReadRowsRequest( + expected_result = _ReadRowsRequestPB( table_name=table_name, filter=row_filter.to_pb(), ) self.assertEqual(result, expected_result) - def test_with_allow_row_interleaving(self): - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) - - table_name = 'table_name' - allow_row_interleaving = True - result = self._callFUT(table_name, - allow_row_interleaving=allow_row_interleaving) - expected_result = messages_v1_pb2.ReadRowsRequest( - table_name=table_name, - allow_row_interleaving=allow_row_interleaving, - ) - self.assertEqual(result, expected_result) - def test_with_limit(self): - from gcloud.bigtable._generated import ( - bigtable_service_messages_pb2 as messages_v1_pb2) - table_name = 'table_name' limit = 1337 result = self._callFUT(table_name, limit=limit) - expected_result = messages_v1_pb2.ReadRowsRequest( + expected_result = _ReadRowsRequestPB( table_name=table_name, - num_rows_limit=limit, + rows_limit=limit, ) self.assertEqual(result, expected_result) +def _CreateTableRequestPB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + bigtable_table_admin_pb2 as table_admin_v2_pb2) + return table_admin_v2_pb2.CreateTableRequest(*args, **kw) + + +def _CreateTableRequestSplitPB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + bigtable_table_admin_pb2 as table_admin_v2_pb2) + return table_admin_v2_pb2.CreateTableRequest.Split(*args, **kw) + + +def _DeleteTableRequestPB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + bigtable_table_admin_pb2 as table_admin_v2_pb2) + return table_admin_v2_pb2.DeleteTableRequest(*args, **kw) + + +def _GetTableRequestPB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + bigtable_table_admin_pb2 as table_admin_v2_pb2) + return table_admin_v2_pb2.GetTableRequest(*args, **kw) + + +def _ReadRowsRequestPB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + bigtable_pb2 as messages_v2_pb2) + return messages_v2_pb2.ReadRowsRequest(*args, **kw) + + +def _ReadRowsResponseCellChunkPB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + bigtable_pb2 as messages_v2_pb2) + family_name = kw.pop('family_name') + qualifier = kw.pop('qualifier') + message = messages_v2_pb2.ReadRowsResponse.CellChunk(*args, **kw) + message.family_name.value = family_name + message.qualifier.value = qualifier + return message + + +def _ReadRowsResponsePB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + bigtable_pb2 as messages_v2_pb2) + return messages_v2_pb2.ReadRowsResponse(*args, **kw) + + +def _SampleRowKeysRequestPB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + bigtable_pb2 as messages_v2_pb2) + return messages_v2_pb2.SampleRowKeysRequest(*args, **kw) + + +def _TablePB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + table_pb2 as table_v2_pb2) + return table_v2_pb2.Table(*args, **kw) + + +def _ColumnFamilyPB(*args, **kw): + from gcloud.bigtable._generated_v2 import ( + table_pb2 as table_v2_pb2) + return table_v2_pb2.ColumnFamily(*args, **kw) + + class _Client(object): data_stub = None