From 9c219d44d4a096bdd5482c5125777218bf9d78bc Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Fri, 5 Nov 2021 17:02:03 +0100 Subject: [PATCH 01/29] remove obsolete classes from bigtable_dataset_kernel --- .../core/kernels/bigtable/bigtable_dataset_kernel.cc | 8 ++++++++ tensorflow_io/core/ops/bigtable_ops.cc | 9 +++++++++ 2 files changed, 17 insertions(+) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index 400683c96..f9b0ea157 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -88,6 +88,10 @@ class BigtableClientResource : public ResourceBase { return cbt::Table(data_client_, table_id); } + ~BigtableClientResource(){ + VLOG(1) << "BigtableClientResource dtor"; + } + string DebugString() const override { return "BigtableClientResource"; } private: @@ -108,6 +112,10 @@ class BigtableClientOp : public OpKernel { VLOG(1) << "BigtableClientOp ctor"; } + ~BigtableClientOp(){ + VLOG(1) << "BigtableClientOp dtor"; + } + void Compute(OpKernelContext* ctx) override TF_LOCKS_EXCLUDED(mu_) { VLOG(1) << "BigtableClientOp compute"; ResourceMgr* mgr = ctx->resource_manager(); diff --git a/tensorflow_io/core/ops/bigtable_ops.cc b/tensorflow_io/core/ops/bigtable_ops.cc index 4ebff7f3f..271f655a2 100644 --- a/tensorflow_io/core/ops/bigtable_ops.cc +++ b/tensorflow_io/core/ops/bigtable_ops.cc @@ -93,3 +93,12 @@ REGISTER_OP("BigtableRowSetIntersect") .Output("result_row_set: resource") .SetShapeFn(shape_inference::ScalarShape); + +REGISTER_OP("BigtableSampleRowKeys") + .Input("client: resource") + .Attr("table_id: string") + .Output("samples: string") + .SetShapeFn([](tensorflow::shape_inference::InferenceContext* c) { + c->set_output(0, c->Vector(c->UnknownDim())); + return tensorflow::Status::OK(); + }); From c36527fd9d5686dbd919356c3caa72939271e19d Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Thu, 28 Oct 2021 17:31:38 +0200 Subject: [PATCH 02/29] outline --- tensorflow_io/core/ops/bigtable_ops.cc | 10 ++++++++++ .../ops/bigtable/bigtable_dataset_ops.py | 20 ++++++++++++++++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/tensorflow_io/core/ops/bigtable_ops.cc b/tensorflow_io/core/ops/bigtable_ops.cc index 271f655a2..a23a1ca41 100644 --- a/tensorflow_io/core/ops/bigtable_ops.cc +++ b/tensorflow_io/core/ops/bigtable_ops.cc @@ -102,3 +102,13 @@ REGISTER_OP("BigtableSampleRowKeys") c->set_output(0, c->Vector(c->UnknownDim())); return tensorflow::Status::OK(); }); + + +REGISTER_OP("BigtableSplitWork") + .Input("") + .Attr("table_id: string") + .Output("samples: string") + .SetShapeFn([](tensorflow::shape_inference::InferenceContext* c) { + c->set_output(0, c->Vector(c->UnknownDim())); + return tensorflow::Status::OK(); + }); \ No newline at end of file diff --git a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py index 81b344e65..5f142c878 100644 --- a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py +++ b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py @@ -5,6 +5,9 @@ from tensorflow.python.framework import dtypes import tensorflow as tf +from tensorflow_io.python.ops.bigtable.bigtable_row_set import from_rows_or_ranges, RowSet +from tensorflow_io.python.ops.bigtable.bigtable_row_range import infinite + class BigtableClient: """BigtableClient is the entrypoint for interacting with Cloud Bigtable in TF. @@ -28,8 +31,23 @@ def __init__(self, client_resource, table_id: str): self._table_id = table_id self._client_resource = client_resource - def read_rows(self, columns: List[str]): + def read_rows(self, columns: List[str], row_set: RowSet): return _BigtableDataset(self._client_resource, self._table_id, columns) + + def parallel_read_rows(self, columns: List[str], num_parallel_calls=1, row_set: RowSet=from_rows_or_ranges(infinite())): + samples = core_ops.bigtable_sample_row_keys(self._client_resource, self._table_id) + shards = core_ops.bigtable_split_work(self._client_resource, samples, num_parallel_calls, row_set._impl) + + + + return shards.interleave( + map_func=self.read_rows, + cycle_length=num_parallel_calls, + block_length=1, + num_parallel_calls=num_parallel_calls, + deterministic=False, + ) + class _BigtableDataset(dataset_ops.DatasetSource): From ae7e73bfb11fa5c91360ce68cd057a5ea6a25871 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Thu, 28 Oct 2021 19:24:49 +0200 Subject: [PATCH 03/29] simplified creating resource --- tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py index 5f142c878..6adf0a115 100644 --- a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py +++ b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py @@ -50,6 +50,7 @@ def parallel_read_rows(self, columns: List[str], num_parallel_calls=1, row_set: + class _BigtableDataset(dataset_ops.DatasetSource): """_BigtableDataset represents a dataset that retrieves keys and values.""" From b8fdd3b59ebcbd45d3960e35374c7230a13c6da9 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Thu, 28 Oct 2021 20:23:51 +0200 Subject: [PATCH 04/29] parallel read not exactly working --- tensorflow_io/core/ops/bigtable_ops.cc | 9 --------- .../python/ops/bigtable/bigtable_dataset_ops.py | 10 ++++------ 2 files changed, 4 insertions(+), 15 deletions(-) diff --git a/tensorflow_io/core/ops/bigtable_ops.cc b/tensorflow_io/core/ops/bigtable_ops.cc index a23a1ca41..90df3b0c3 100644 --- a/tensorflow_io/core/ops/bigtable_ops.cc +++ b/tensorflow_io/core/ops/bigtable_ops.cc @@ -103,12 +103,3 @@ REGISTER_OP("BigtableSampleRowKeys") return tensorflow::Status::OK(); }); - -REGISTER_OP("BigtableSplitWork") - .Input("") - .Attr("table_id: string") - .Output("samples: string") - .SetShapeFn([](tensorflow::shape_inference::InferenceContext* c) { - c->set_output(0, c->Vector(c->UnknownDim())); - return tensorflow::Status::OK(); - }); \ No newline at end of file diff --git a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py index 6adf0a115..72bf8f079 100644 --- a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py +++ b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py @@ -4,6 +4,7 @@ from tensorflow_io.python.ops import core_ops from tensorflow.python.framework import dtypes import tensorflow as tf +from tensorflow.python.data.ops import dataset_ops from tensorflow_io.python.ops.bigtable.bigtable_row_set import from_rows_or_ranges, RowSet from tensorflow_io.python.ops.bigtable.bigtable_row_range import infinite @@ -36,12 +37,9 @@ def read_rows(self, columns: List[str], row_set: RowSet): def parallel_read_rows(self, columns: List[str], num_parallel_calls=1, row_set: RowSet=from_rows_or_ranges(infinite())): samples = core_ops.bigtable_sample_row_keys(self._client_resource, self._table_id) - shards = core_ops.bigtable_split_work(self._client_resource, samples, num_parallel_calls, row_set._impl) - - - - return shards.interleave( - map_func=self.read_rows, + samples_ds = dataset_ops.Dataset.from_tensor_slices(samples) + return samples_ds.interleave( + map_func=lambda x: self.read_rows(columns, row_set), cycle_length=num_parallel_calls, block_length=1, num_parallel_calls=num_parallel_calls, From 93d9423747a33bab87848e2dffaba45f9ff9d5aa Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Fri, 29 Oct 2021 07:50:02 +0200 Subject: [PATCH 05/29] parallel not split work working --- tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index f9b0ea157..8f63f8b8c 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -281,7 +281,6 @@ class Dataset : public DatasetBase { std::string table_id, std::vector columns) : DatasetBase(DatasetContext(ctx)), client_resource_(*client_resource), - client_resource_unref_(client_resource), table_id_(table_id), columns_(columns) { dtypes_.push_back(DT_STRING); @@ -321,7 +320,6 @@ class Dataset : public DatasetBase { private: BigtableClientResource& client_resource_; - const core::ScopedUnref client_resource_unref_; const std::string table_id_; const std::vector columns_; DataTypeVector dtypes_; From 713716382ad85b034e108d3c00ad94f0f2809ab8 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Fri, 29 Oct 2021 11:48:09 +0200 Subject: [PATCH 06/29] sampleRowSet --- tensorflow_io/core/ops/bigtable_ops.cc | 9 +++++++++ .../python/ops/bigtable/bigtable_dataset_ops.py | 8 ++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/tensorflow_io/core/ops/bigtable_ops.cc b/tensorflow_io/core/ops/bigtable_ops.cc index 90df3b0c3..47614e870 100644 --- a/tensorflow_io/core/ops/bigtable_ops.cc +++ b/tensorflow_io/core/ops/bigtable_ops.cc @@ -103,3 +103,12 @@ REGISTER_OP("BigtableSampleRowKeys") return tensorflow::Status::OK(); }); + +REGISTER_OP("BigtableSampleRowSets") + .Input("client: resource") + .Attr("table_id: string") + .Output("samples: string") + .SetShapeFn([](tensorflow::shape_inference::InferenceContext* c) { + c->set_output(0, c->MakeShape({})); + return tensorflow::Status::OK(); + }); diff --git a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py index 72bf8f079..52f242e69 100644 --- a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py +++ b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py @@ -36,10 +36,14 @@ def read_rows(self, columns: List[str], row_set: RowSet): return _BigtableDataset(self._client_resource, self._table_id, columns) def parallel_read_rows(self, columns: List[str], num_parallel_calls=1, row_set: RowSet=from_rows_or_ranges(infinite())): - samples = core_ops.bigtable_sample_row_keys(self._client_resource, self._table_id) + samples = core_ops.bigtable_sample_row_sets(self._client_resource, self._table_id) samples_ds = dataset_ops.Dataset.from_tensor_slices(samples) + def map_func(sample): + print('sample', sample, 'shape', sample.shape) + return self.read_rows(columns, row_set) + return samples_ds.interleave( - map_func=lambda x: self.read_rows(columns, row_set), + map_func=map_func, cycle_length=num_parallel_calls, block_length=1, num_parallel_calls=num_parallel_calls, From bd87e5bfd255ec36a9af80a230a1e687b238094d Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Fri, 29 Oct 2021 12:52:53 +0200 Subject: [PATCH 07/29] parallel without row_set --- .../core/kernels/bigtable/bigtable_dataset_kernel.cc | 8 ++------ tensorflow_io/core/ops/bigtable_ops.cc | 2 ++ .../python/ops/bigtable/bigtable_dataset_ops.py | 12 +++++++----- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index 8f63f8b8c..a82810396 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -88,9 +88,7 @@ class BigtableClientResource : public ResourceBase { return cbt::Table(data_client_, table_id); } - ~BigtableClientResource(){ - VLOG(1) << "BigtableClientResource dtor"; - } + ~BigtableClientResource() { VLOG(1) << "BigtableClientResource dtor"; } string DebugString() const override { return "BigtableClientResource"; } @@ -112,9 +110,7 @@ class BigtableClientOp : public OpKernel { VLOG(1) << "BigtableClientOp ctor"; } - ~BigtableClientOp(){ - VLOG(1) << "BigtableClientOp dtor"; - } + ~BigtableClientOp() { VLOG(1) << "BigtableClientOp dtor"; } void Compute(OpKernelContext* ctx) override TF_LOCKS_EXCLUDED(mu_) { VLOG(1) << "BigtableClientOp compute"; diff --git a/tensorflow_io/core/ops/bigtable_ops.cc b/tensorflow_io/core/ops/bigtable_ops.cc index 47614e870..fc5942163 100644 --- a/tensorflow_io/core/ops/bigtable_ops.cc +++ b/tensorflow_io/core/ops/bigtable_ops.cc @@ -106,7 +106,9 @@ REGISTER_OP("BigtableSampleRowKeys") REGISTER_OP("BigtableSampleRowSets") .Input("client: resource") + .Input("row_set: resource") .Attr("table_id: string") + .Attr("num_parallel_calls: int") .Output("samples: string") .SetShapeFn([](tensorflow::shape_inference::InferenceContext* c) { c->set_output(0, c->MakeShape({})); diff --git a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py index 52f242e69..42f78f30c 100644 --- a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py +++ b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py @@ -6,8 +6,8 @@ import tensorflow as tf from tensorflow.python.data.ops import dataset_ops -from tensorflow_io.python.ops.bigtable.bigtable_row_set import from_rows_or_ranges, RowSet -from tensorflow_io.python.ops.bigtable.bigtable_row_range import infinite +from tensorflow_io.python.ops.bigtable.bigtable_row_set import from_rows_or_ranges, RowSet, intersect +from tensorflow_io.python.ops.bigtable.bigtable_row_range import infinite, right_open class BigtableClient: @@ -36,11 +36,13 @@ def read_rows(self, columns: List[str], row_set: RowSet): return _BigtableDataset(self._client_resource, self._table_id, columns) def parallel_read_rows(self, columns: List[str], num_parallel_calls=1, row_set: RowSet=from_rows_or_ranges(infinite())): - samples = core_ops.bigtable_sample_row_sets(self._client_resource, self._table_id) + samples = core_ops.bigtable_sample_row_sets(self._client_resource, row_set._impl, self._table_id, num_parallel_calls) samples_ds = dataset_ops.Dataset.from_tensor_slices(samples) def map_func(sample): - print('sample', sample, 'shape', sample.shape) - return self.read_rows(columns, row_set) + print("lalalal_mysample:", sample) + print("lslslsample:", dir(sample[0])) + rs = intersect(row_set, right_open(sample[0], sample[1])) + return self.read_rows(columns, rs) return samples_ds.interleave( map_func=map_func, From 24eb8c92fd8af41b022139c1b62929b8f536a774 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Fri, 29 Oct 2021 13:22:10 +0200 Subject: [PATCH 08/29] rowset in parallel working --- tensorflow_io/core/ops/bigtable_ops.cc | 10 +++++++++- .../python/ops/bigtable/bigtable_dataset_ops.py | 6 ++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/tensorflow_io/core/ops/bigtable_ops.cc b/tensorflow_io/core/ops/bigtable_ops.cc index fc5942163..7f8b71e74 100644 --- a/tensorflow_io/core/ops/bigtable_ops.cc +++ b/tensorflow_io/core/ops/bigtable_ops.cc @@ -93,6 +93,14 @@ REGISTER_OP("BigtableRowSetIntersect") .Output("result_row_set: resource") .SetShapeFn(shape_inference::ScalarShape); +REGISTER_OP("BigtableRowsetIntersectTensor") + .Attr("container: string = ''") + .Attr("shared_name: string = ''") + .Input("row_set_resource: resource") + .Input("row_range_tensor: string") + .Output("row_set: resource") + .SetShapeFn(shape_inference::UnchangedShape); + REGISTER_OP("BigtableSampleRowKeys") .Input("client: resource") @@ -111,6 +119,6 @@ REGISTER_OP("BigtableSampleRowSets") .Attr("num_parallel_calls: int") .Output("samples: string") .SetShapeFn([](tensorflow::shape_inference::InferenceContext* c) { - c->set_output(0, c->MakeShape({})); + c->set_output(0, c->Vector(c->UnknownDim())); return tensorflow::Status::OK(); }); diff --git a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py index 42f78f30c..1b280ce65 100644 --- a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py +++ b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py @@ -39,10 +39,8 @@ def parallel_read_rows(self, columns: List[str], num_parallel_calls=1, row_set: samples = core_ops.bigtable_sample_row_sets(self._client_resource, row_set._impl, self._table_id, num_parallel_calls) samples_ds = dataset_ops.Dataset.from_tensor_slices(samples) def map_func(sample): - print("lalalal_mysample:", sample) - print("lslslsample:", dir(sample[0])) - rs = intersect(row_set, right_open(sample[0], sample[1])) - return self.read_rows(columns, rs) + rs = RowSet(core_ops.bigtable_rowset_intersect_tensor(row_set._impl, sample)) + return self.read_rows(columns,rs) return samples_ds.interleave( map_func=map_func, From d673b991016fc0a779d71d6d485ea37400949184 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Fri, 29 Oct 2021 16:33:06 +0200 Subject: [PATCH 09/29] row_set const ref working --- tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index a82810396..e86a9a951 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. ==============================================================================*/ #include "absl/memory/memory.h" +#include "google/cloud/bigtable/row_set.h" #include "google/cloud/bigtable/table.h" #include "google/cloud/bigtable/table_admin.h" #include "tensorflow/core/framework/common_shape_fns.h" @@ -145,7 +146,7 @@ class Iterator : public DatasetIterator { columns_(ColumnsToFamiliesAndQualifiers(columns)), reader_( this->dataset()->client_resource().CreateTable(table_id).ReadRows( - cbt::RowRange::InfiniteRange(), + cbt::RowSet(cbt::RowRange::InfiniteRange()), cbt::Filter::Chain(CreateColumnsFilter(columns_), cbt::Filter::Latest(1)))), it_(this->reader_.begin()), From e8fedeae4eb726fdd2478dba9741427a9154b694 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Tue, 2 Nov 2021 10:09:41 +0100 Subject: [PATCH 10/29] working parallel all --- .../bigtable/bigtable_dataset_kernel.cc | 71 +++++++++++++++---- .../memcached_file_block_cache.cc | 2 +- tensorflow_io/core/ops/bigtable_ops.cc | 1 + .../ops/bigtable/bigtable_dataset_ops.py | 10 ++- 4 files changed, 68 insertions(+), 16 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index e86a9a951..aa24951a6 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -82,11 +82,15 @@ class BigtableClientResource : public ResourceBase { public: explicit BigtableClientResource(const std::string& project_id, const std::string& instance_id) - : data_client_(CreateDataClient(project_id, instance_id)) {} + : data_client_(CreateDataClient(project_id, instance_id)) { + VLOG(1) << "BigtableClientResource ctor"; + } cbt::Table CreateTable(const std::string& table_id) { VLOG(1) << "CreateTable"; - return cbt::Table(data_client_, table_id); + cbt::Table table(data_client_, table_id); + VLOG(1) << "table crated"; + return table; } ~BigtableClientResource() { VLOG(1) << "BigtableClientResource dtor"; } @@ -103,6 +107,35 @@ class BigtableClientResource : public ResourceBase { std::shared_ptr data_client_; }; +class BigtableRowsetResource : public ResourceBase { + public: + explicit BigtableRowsetResource(cbt::RowSet const& row_set) : row_set_(row_set){ + VLOG(1) << "BigtableRowsetResource ctor"; + } + + ~BigtableRowsetResource() { VLOG(1) << "BigtableRowsetResource dtor"; } + + std::string PrintRowSet() { + std::string res; + google::protobuf::TextFormat::PrintToString(row_set_.as_proto(), &res); + return res; + } + + void AppendStr(std::string const& row_key) { row_set_.Append(row_key); } + void AppendRowRange(cbt::RowRange const& row_range) { + row_set_.Append(row_range); + } + cbt::RowSet Intersect(cbt::RowRange const& row_range) { + return row_set_.Intersect(row_range); + } + + cbt::RowSet const& RowSet() { return row_set_; } + + string DebugString() const override { return "BigtableRowsetResource"; } + + cbt::RowSet row_set_; +}; + class BigtableClientOp : public OpKernel { public: explicit BigtableClientOp(OpKernelConstruction* ctx) : OpKernel(ctx) { @@ -144,13 +177,16 @@ class Iterator : public DatasetIterator { const std::vector& columns) : DatasetIterator(params), columns_(ColumnsToFamiliesAndQualifiers(columns)), - reader_( - this->dataset()->client_resource().CreateTable(table_id).ReadRows( - cbt::RowSet(cbt::RowRange::InfiniteRange()), + table_(this->dataset()->client_resource().CreateTable(table_id)), + reader_(this->table_.ReadRows( + this->dataset()->row_set_resource()->RowSet(), + // cbt::RowRange::InfiniteRange(), cbt::Filter::Chain(CreateColumnsFilter(columns_), cbt::Filter::Latest(1)))), it_(this->reader_.begin()), - column_to_idx_(CreateColumnToIdxMap(columns_)) {} + column_to_idx_(CreateColumnToIdxMap(columns_)) { + VLOG(1) << "DatasetIterator ctor"; + } Status GetNextInternal(IteratorContext* ctx, std::vector* out_tensors, bool* end_of_sequence) override { @@ -261,8 +297,8 @@ class Iterator : public DatasetIterator { } mutex mu_; - const std::shared_ptr data_client_; const std::vector> columns_; + cbt::Table table_; cbt::RowReader reader_ GUARDED_BY(mu_); cbt::v1::internal::RowReaderIterator it_ GUARDED_BY(mu_); // we're using a map with const refs to avoid copying strings when searching @@ -275,9 +311,11 @@ class Iterator : public DatasetIterator { class Dataset : public DatasetBase { public: Dataset(OpKernelContext* ctx, BigtableClientResource* client_resource, + BigtableRowsetResource* row_set_resource, std::string table_id, std::vector columns) : DatasetBase(DatasetContext(ctx)), client_resource_(*client_resource), + row_set_resource_(row_set_resource), table_id_(table_id), columns_(columns) { dtypes_.push_back(DT_STRING); @@ -287,10 +325,11 @@ class Dataset : public DatasetBase { std::unique_ptr MakeIteratorInternal( const std::string& prefix) const override { VLOG(1) << "MakeIteratorInternal. table=" << table_id_; - return absl::make_unique>( + Iterator * iter = new Iterator( typename DatasetIterator::Params{ this, strings::StrCat(prefix, "::BigtableDataset")}, table_id_, columns_); + return std::unique_ptr>(iter); } const DataTypeVector& output_dtypes() const override { return dtypes_; } @@ -304,6 +343,7 @@ class Dataset : public DatasetBase { } BigtableClientResource& client_resource() const { return client_resource_; } + BigtableRowsetResource* row_set_resource() const { return row_set_resource_; } protected: Status AsGraphDefInternal(SerializationContext* ctx, @@ -317,6 +357,7 @@ class Dataset : public DatasetBase { private: BigtableClientResource& client_resource_; + BigtableRowsetResource* row_set_resource_; const std::string table_id_; const std::vector columns_; DataTypeVector dtypes_; @@ -333,10 +374,16 @@ class BigtableDatasetOp : public DatasetOpKernel { void MakeDataset(OpKernelContext* ctx, DatasetBase** output) override { VLOG(1) << "Make Dataset"; BigtableClientResource* client_resource; - OP_REQUIRES_OK( - ctx, LookupResource(ctx, HandleFromInput(ctx, 0), &client_resource)); - core::ScopedUnref client_resource_unref_(client_resource); - *output = new Dataset(ctx, client_resource, table_id_, columns_); + OP_REQUIRES_OK(ctx, + GetResourceFromContext(ctx, "client", &client_resource)); + core::ScopedUnref unref_client(client_resource); + + BigtableRowsetResource* row_set_resource; + OP_REQUIRES_OK(ctx, + GetResourceFromContext(ctx, "row_set", &row_set_resource)); + core::ScopedUnref row_set_resource_unref_(row_set_resource); + + *output = new Dataset(ctx, client_resource,row_set_resource, table_id_, columns_); } private: diff --git a/tensorflow_io/core/kernels/gsmemcachedfs/memcached_file_block_cache.cc b/tensorflow_io/core/kernels/gsmemcachedfs/memcached_file_block_cache.cc index 8c2033cd9..db39b37d1 100644 --- a/tensorflow_io/core/kernels/gsmemcachedfs/memcached_file_block_cache.cc +++ b/tensorflow_io/core/kernels/gsmemcachedfs/memcached_file_block_cache.cc @@ -759,7 +759,7 @@ int64 MemcachedFileBlockCache::AddToCacheBuffer(const string& memc_key, cache_buffer_keys_.push_back(memc_key); auto page = absl::make_unique>(); page->assign(data->begin(), data->end()); - cache_buffer_map_.emplace(memc_key, page.release()); + cache_buffer_map_.emplace(memc_key, std::move(page)); } return cache_buffer_keys_.size(); } diff --git a/tensorflow_io/core/ops/bigtable_ops.cc b/tensorflow_io/core/ops/bigtable_ops.cc index 7f8b71e74..8412c99b3 100644 --- a/tensorflow_io/core/ops/bigtable_ops.cc +++ b/tensorflow_io/core/ops/bigtable_ops.cc @@ -27,6 +27,7 @@ REGISTER_OP("BigtableClient") REGISTER_OP("BigtableDataset") .Input("client: resource") + .Input("row_set: resource") .Attr("table_id: string") .Attr("columns: list(string) >= 1") .Output("handle: variant") diff --git a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py index 1b280ce65..ecd7251a8 100644 --- a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py +++ b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py @@ -33,10 +33,14 @@ def __init__(self, client_resource, table_id: str): self._client_resource = client_resource def read_rows(self, columns: List[str], row_set: RowSet): - return _BigtableDataset(self._client_resource, self._table_id, columns) + return _BigtableDataset(self._client_resource, self._table_id, columns, row_set) def parallel_read_rows(self, columns: List[str], num_parallel_calls=1, row_set: RowSet=from_rows_or_ranges(infinite())): + print("asdadsasddsadsadsada") + print("getting samples") samples = core_ops.bigtable_sample_row_sets(self._client_resource, row_set._impl, self._table_id, num_parallel_calls) + print("$"*50) + print("got samples") samples_ds = dataset_ops.Dataset.from_tensor_slices(samples) def map_func(sample): rs = RowSet(core_ops.bigtable_rowset_intersect_tensor(row_set._impl, sample)) @@ -56,7 +60,7 @@ def map_func(sample): class _BigtableDataset(dataset_ops.DatasetSource): """_BigtableDataset represents a dataset that retrieves keys and values.""" - def __init__(self, client_resource, table_id: str, columns: List[str]): + def __init__(self, client_resource, table_id: str, columns: List[str], row_set: RowSet): self._table_id = table_id self._columns = columns self._element_spec = tf.TensorSpec( @@ -64,7 +68,7 @@ def __init__(self, client_resource, table_id: str, columns: List[str]): ) variant_tensor = core_ops.bigtable_dataset( - client_resource, table_id, columns + client_resource, row_set._impl, table_id, columns ) super().__init__(variant_tensor) From c4c1792cf2fd5441329bb59accc916b4c996d08b Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Tue, 2 Nov 2021 15:36:16 +0100 Subject: [PATCH 11/29] PR comments and linting --- tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py index ecd7251a8..78aeba710 100644 --- a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py +++ b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py @@ -6,8 +6,8 @@ import tensorflow as tf from tensorflow.python.data.ops import dataset_ops -from tensorflow_io.python.ops.bigtable.bigtable_row_set import from_rows_or_ranges, RowSet, intersect -from tensorflow_io.python.ops.bigtable.bigtable_row_range import infinite, right_open +from tensorflow_io.python.ops.bigtable.bigtable_row_set import from_rows_or_ranges, RowSet +from tensorflow_io.python.ops.bigtable.bigtable_row_range import infinite class BigtableClient: From 45eedf7e70b0a16269329fd96fb0669f4b4e8d1c Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Wed, 3 Nov 2021 08:16:14 +0100 Subject: [PATCH 12/29] added more tests for parallel read --- .../test_bigtable/test_parallel_read_rows.py | 53 +++++++++++++++++++ tests/test_bigtable/test_row_set.py | 2 + 2 files changed, 55 insertions(+) create mode 100644 tests/test_bigtable/test_parallel_read_rows.py diff --git a/tests/test_bigtable/test_parallel_read_rows.py b/tests/test_bigtable/test_parallel_read_rows.py new file mode 100644 index 000000000..fe02e3a59 --- /dev/null +++ b/tests/test_bigtable/test_parallel_read_rows.py @@ -0,0 +1,53 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# disable module docstring for tests +# pylint: disable=C0114 +# disable class docstring for tests +# pylint: disable=C0115 + +import os +from .bigtable_emulator import BigtableEmulator +from tensorflow_io.python.ops.bigtable.bigtable_dataset_ops import BigtableClient +import tensorflow_io.python.ops.bigtable.bigtable_row_range as row_range +import tensorflow_io.python.ops.bigtable.bigtable_row_set as row_set +import tensorflow as tf +from tensorflow import test + + +class BigtableParallelReadTest(test.TestCase): + def setUp(self): + self.emulator = BigtableEmulator() + + def tearDown(self): + self.emulator.stop() + + def test_parallel_read(self): + os.environ["BIGTABLE_EMULATOR_HOST"] = self.emulator.get_addr() + self.emulator.create_table("fake_project", "fake_instance", "test-table", + ["fam1", "fam2"], splits=["row005", "row010", "row015"]) + + values = [[f"[{i,j}]" for j in range(2)] for i in range(20)] + + ten = tf.constant(values) + + client = BigtableClient("fake_project", "fake_instance") + table = client.get_table("test-table") + + self.emulator.write_tensor("fake_project", "fake_instance", "test-table", ten, + ["row" + str(i).rjust(3, "0") for i in range(20)], ["fam1:col1", "fam2:col2"]) + + for i, r in enumerate(table.parallel_read_rows(["fam1:col1", "fam2:col2"], row_set=row_set.from_rows_or_ranges(row_range.empty()))): + for j, c in enumerate(r): + self.assertEqual(values[i][j], c.numpy().decode()) diff --git a/tests/test_bigtable/test_row_set.py b/tests/test_bigtable/test_row_set.py index 385e835b8..cb250d89e 100644 --- a/tests/test_bigtable/test_row_set.py +++ b/tests/test_bigtable/test_row_set.py @@ -16,9 +16,11 @@ # pylint: disable=C0114 # disable class docstring for tests # pylint: disable=C0115 +from tensorflow_io.python.ops import core_ops import tensorflow_io.python.ops.bigtable.bigtable_row_range as row_range import tensorflow_io.python.ops.bigtable.bigtable_row_set as row_set from tensorflow import test +import tensorflow as tf class RowRangeTest(test.TestCase): From 9ba57730c0ddb3245dc5fca4a53b2847faf2d141 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Wed, 3 Nov 2021 08:17:43 +0100 Subject: [PATCH 13/29] removed sample row_keys because it's unused --- .../bigtable/bigtable_dataset_kernel.cc | 180 +++++++++++++----- .../core/kernels/bigtable/bigtable_row_set.h | 4 + 2 files changed, 140 insertions(+), 44 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index aa24951a6..068c6fa0a 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -83,8 +83,8 @@ class BigtableClientResource : public ResourceBase { explicit BigtableClientResource(const std::string& project_id, const std::string& instance_id) : data_client_(CreateDataClient(project_id, instance_id)) { - VLOG(1) << "BigtableClientResource ctor"; - } + VLOG(1) << "BigtableClientResource ctor"; + } cbt::Table CreateTable(const std::string& table_id) { VLOG(1) << "CreateTable"; @@ -107,35 +107,6 @@ class BigtableClientResource : public ResourceBase { std::shared_ptr data_client_; }; -class BigtableRowsetResource : public ResourceBase { - public: - explicit BigtableRowsetResource(cbt::RowSet const& row_set) : row_set_(row_set){ - VLOG(1) << "BigtableRowsetResource ctor"; - } - - ~BigtableRowsetResource() { VLOG(1) << "BigtableRowsetResource dtor"; } - - std::string PrintRowSet() { - std::string res; - google::protobuf::TextFormat::PrintToString(row_set_.as_proto(), &res); - return res; - } - - void AppendStr(std::string const& row_key) { row_set_.Append(row_key); } - void AppendRowRange(cbt::RowRange const& row_range) { - row_set_.Append(row_range); - } - cbt::RowSet Intersect(cbt::RowRange const& row_range) { - return row_set_.Intersect(row_range); - } - - cbt::RowSet const& RowSet() { return row_set_; } - - string DebugString() const override { return "BigtableRowsetResource"; } - - cbt::RowSet row_set_; -}; - class BigtableClientOp : public OpKernel { public: explicit BigtableClientOp(OpKernelConstruction* ctx) : OpKernel(ctx) { @@ -179,14 +150,14 @@ class Iterator : public DatasetIterator { columns_(ColumnsToFamiliesAndQualifiers(columns)), table_(this->dataset()->client_resource().CreateTable(table_id)), reader_(this->table_.ReadRows( - this->dataset()->row_set_resource()->RowSet(), - // cbt::RowRange::InfiniteRange(), - cbt::Filter::Chain(CreateColumnsFilter(columns_), - cbt::Filter::Latest(1)))), + this->dataset()->row_set_resource()->row_set(), + // cbt::RowRange::InfiniteRange(), + cbt::Filter::Chain(CreateColumnsFilter(columns_), + cbt::Filter::Latest(1)))), it_(this->reader_.begin()), column_to_idx_(CreateColumnToIdxMap(columns_)) { - VLOG(1) << "DatasetIterator ctor"; - } + VLOG(1) << "DatasetIterator ctor"; + } Status GetNextInternal(IteratorContext* ctx, std::vector* out_tensors, bool* end_of_sequence) override { @@ -311,8 +282,8 @@ class Iterator : public DatasetIterator { class Dataset : public DatasetBase { public: Dataset(OpKernelContext* ctx, BigtableClientResource* client_resource, - BigtableRowsetResource* row_set_resource, - std::string table_id, std::vector columns) + io::BigtableRowSetResource* row_set_resource, std::string table_id, + std::vector columns) : DatasetBase(DatasetContext(ctx)), client_resource_(*client_resource), row_set_resource_(row_set_resource), @@ -325,7 +296,7 @@ class Dataset : public DatasetBase { std::unique_ptr MakeIteratorInternal( const std::string& prefix) const override { VLOG(1) << "MakeIteratorInternal. table=" << table_id_; - Iterator * iter = new Iterator( + Iterator* iter = new Iterator( typename DatasetIterator::Params{ this, strings::StrCat(prefix, "::BigtableDataset")}, table_id_, columns_); @@ -343,7 +314,7 @@ class Dataset : public DatasetBase { } BigtableClientResource& client_resource() const { return client_resource_; } - BigtableRowsetResource* row_set_resource() const { return row_set_resource_; } + io::BigtableRowSetResource* row_set_resource() const { return row_set_resource_; } protected: Status AsGraphDefInternal(SerializationContext* ctx, @@ -357,7 +328,7 @@ class Dataset : public DatasetBase { private: BigtableClientResource& client_resource_; - BigtableRowsetResource* row_set_resource_; + io::BigtableRowSetResource* row_set_resource_; const std::string table_id_; const std::vector columns_; DataTypeVector dtypes_; @@ -378,12 +349,13 @@ class BigtableDatasetOp : public DatasetOpKernel { GetResourceFromContext(ctx, "client", &client_resource)); core::ScopedUnref unref_client(client_resource); - BigtableRowsetResource* row_set_resource; + io::BigtableRowSetResource* row_set_resource; OP_REQUIRES_OK(ctx, GetResourceFromContext(ctx, "row_set", &row_set_resource)); core::ScopedUnref row_set_resource_unref_(row_set_resource); - *output = new Dataset(ctx, client_resource,row_set_resource, table_id_, columns_); + *output = new Dataset(ctx, client_resource, row_set_resource, table_id_, + columns_); } private: @@ -394,6 +366,126 @@ class BigtableDatasetOp : public DatasetOpKernel { REGISTER_KERNEL_BUILDER(Name("BigtableDataset").Device(DEVICE_CPU), BigtableDatasetOp); +// Return the index of the tablet that a worker should start with. Each worker +// start with their first tablet and finish on tablet before next worker's first +// tablet. Each worker should get num_tablets/num_workers rounded down, plus at +// most one. If we simply round up, then the last worker may be starved. +// Consider an example where there's 100 tablets and 11 workers. If we give +// round_up(100/11) to each one, then first 10 workers get 10 tablets each, and +// the last one gets nothing. +int GetWorkerStartIndex(size_t num_tablets, size_t num_workers, + size_t worker_id) { + // if there's more workers than tablets, workers get one tablet each or less. + if (num_tablets <= num_workers) return std::min(num_tablets, worker_id); + // tablets_per_worker: minimum tablets each worker should obtain. + size_t const tablets_per_worker = num_tablets / num_workers; + // surplus_tablets: excess that has to be evenly distributed among the workers + // so that no worker gets more than tablets_per_worker + 1. + size_t const surplus_tablets = num_tablets % num_workers; + size_t const workers_before = worker_id; + return tablets_per_worker * workers_before + + std::min(surplus_tablets, workers_before); +} + +bool RowSetIntersectsRange(cbt::RowSet const& row_set, + std::string const& start_key, + std::string const& end_key) { + auto range = cbt::RowRange::Range(start_key, end_key); + return !row_set.Intersect(range).IsEmpty(); +} + +class BigtableSampleRowSetsOp : public OpKernel { + public: + explicit BigtableSampleRowSetsOp(OpKernelConstruction* ctx) : OpKernel(ctx) { + VLOG(1) << "BigtableSampleRowSetsOp ctor "; + OP_REQUIRES_OK(ctx, ctx->GetAttr("table_id", &table_id_)); + OP_REQUIRES_OK(ctx, + ctx->GetAttr("num_parallel_calls", &num_parallel_calls_)); + } + + void Compute(OpKernelContext* context) override { + mutex_lock l(mu_); + BigtableClientResource* client_resource; + OP_REQUIRES_OK(context, + GetResourceFromContext(context, "client", &client_resource)); + core::ScopedUnref unref_client(client_resource); + + io::BigtableRowSetResource* row_set_resource; + OP_REQUIRES_OK( + context, GetResourceFromContext(context, "row_set", &row_set_resource)); + core::ScopedUnref unref_row_set(row_set_resource); + VLOG(1) << "BigtableSampleRowSetsOp got resources "; + + auto table = client_resource->CreateTable(table_id_); + VLOG(1) << "created table"; + auto maybe_sample_row_keys = table.SampleRows(); + VLOG(1) << "sample rows"; + if (!maybe_sample_row_keys.ok()) + throw std::runtime_error(maybe_sample_row_keys.status().message()); + auto& sample_row_keys = maybe_sample_row_keys.value(); + VLOG(1) << "got row row_keys"; + + cbt::RowSet const& row_set = row_set_resource->row_set(); + VLOG(1) << "got row_set"; + + std::vector> tablets; + + std::string start_key; + for (auto& sample_row_key : sample_row_keys) { + auto& end_key = sample_row_key.row_key; + tablets.emplace_back(start_key, end_key); + start_key = std::move(end_key); + } + if (!start_key.empty()) { + tablets.emplace_back(start_key, ""); + } + tablets.erase(std::remove_if( + tablets.begin(), tablets.end(), + [&row_set](std::pair const& p) { + return !RowSetIntersectsRange(row_set, p.first, + p.second); + }), + tablets.end()); + + VLOG(1) << "got table of tablets of size:" << tablets.size(); + + long output_size = std::min((int)tablets.size(), num_parallel_calls_); + + VLOG(1) << ", allocating output {" << output_size << ",2}"; + + Tensor* output_tensor = NULL; + OP_REQUIRES_OK(context, context->allocate_output(0, {(long)output_size, 2}, + &output_tensor)); + auto output_v = output_tensor->tensor(); + + for (int i = 0; i < output_size; i++) { + size_t start_idx = GetWorkerStartIndex(tablets.size(), output_size, i); + size_t next_worker_start_idx = + GetWorkerStartIndex(tablets.size(), output_size, i + 1); + size_t end_idx = next_worker_start_idx - 1; + + VLOG(1) << "for i=" << i << " getting share " << start_idx << ":" + << end_idx; + + start_key = tablets.at(start_idx).first; + std::string end_key = tablets.at(end_idx).second; + + VLOG(1) << "for i=" << i << " got range [" << start_key << "," << end_key + << "]"; + output_v(i, 0) = start_key; + output_v(i, 1) = end_key; + } + } + + private: + mutable mutex mu_; + std::string table_id_; + int num_parallel_calls_; +}; + +REGISTER_KERNEL_BUILDER(Name("BigtableSampleRowSets").Device(DEVICE_CPU), + BigtableSampleRowSetsOp); + } // namespace } // namespace data } // namespace tensorflow diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_row_set.h b/tensorflow_io/core/kernels/bigtable/bigtable_row_set.h index 4dc641593..95486bf79 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_row_set.h +++ b/tensorflow_io/core/kernels/bigtable/bigtable_row_set.h @@ -50,6 +50,10 @@ class BigtableRowSetResource : public ResourceBase { return row_set_.Intersect(row_range); } + google::cloud::bigtable::RowSet const& row_set(){ + return row_set_; + } + string DebugString() const override { return "BigtableRowSetResource:{" + ToString() + "}"; } From d24fd1cd30f71d3271cbae84d03a9d2d8507462a Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Wed, 3 Nov 2021 08:27:58 +0100 Subject: [PATCH 14/29] removed obsolete code and comments --- tensorflow_io/core/ops/bigtable_ops.cc | 10 ---------- .../python/ops/bigtable/bigtable_dataset_ops.py | 4 ---- 2 files changed, 14 deletions(-) diff --git a/tensorflow_io/core/ops/bigtable_ops.cc b/tensorflow_io/core/ops/bigtable_ops.cc index 8412c99b3..45fddd0fe 100644 --- a/tensorflow_io/core/ops/bigtable_ops.cc +++ b/tensorflow_io/core/ops/bigtable_ops.cc @@ -103,16 +103,6 @@ REGISTER_OP("BigtableRowsetIntersectTensor") .SetShapeFn(shape_inference::UnchangedShape); -REGISTER_OP("BigtableSampleRowKeys") - .Input("client: resource") - .Attr("table_id: string") - .Output("samples: string") - .SetShapeFn([](tensorflow::shape_inference::InferenceContext* c) { - c->set_output(0, c->Vector(c->UnknownDim())); - return tensorflow::Status::OK(); - }); - - REGISTER_OP("BigtableSampleRowSets") .Input("client: resource") .Input("row_set: resource") diff --git a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py index 78aeba710..e76a8cce9 100644 --- a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py +++ b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py @@ -36,11 +36,7 @@ def read_rows(self, columns: List[str], row_set: RowSet): return _BigtableDataset(self._client_resource, self._table_id, columns, row_set) def parallel_read_rows(self, columns: List[str], num_parallel_calls=1, row_set: RowSet=from_rows_or_ranges(infinite())): - print("asdadsasddsadsadsada") - print("getting samples") samples = core_ops.bigtable_sample_row_sets(self._client_resource, row_set._impl, self._table_id, num_parallel_calls) - print("$"*50) - print("got samples") samples_ds = dataset_ops.Dataset.from_tensor_slices(samples) def map_func(sample): rs = RowSet(core_ops.bigtable_rowset_intersect_tensor(row_set._impl, sample)) From 96981fbe34edf33306fa6d1b384f8b61184e5d2a Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Wed, 3 Nov 2021 08:57:43 +0100 Subject: [PATCH 15/29] code cleanup 1 remove obsolete comments RowsetIntersectRange and BigtableSampleRowSets tweaks --- .../bigtable/bigtable_dataset_kernel.cc | 38 +++++++++---------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index 068c6fa0a..ac51fb195 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -150,8 +150,12 @@ class Iterator : public DatasetIterator { columns_(ColumnsToFamiliesAndQualifiers(columns)), table_(this->dataset()->client_resource().CreateTable(table_id)), reader_(this->table_.ReadRows( +<<<<<<< HEAD this->dataset()->row_set_resource()->row_set(), // cbt::RowRange::InfiniteRange(), +======= + this->dataset()->row_set_resource().RowSet(), +>>>>>>> c81fd9a... code cleanup 1 cbt::Filter::Chain(CreateColumnsFilter(columns_), cbt::Filter::Latest(1)))), it_(this->reader_.begin()), @@ -286,7 +290,7 @@ class Dataset : public DatasetBase { std::vector columns) : DatasetBase(DatasetContext(ctx)), client_resource_(*client_resource), - row_set_resource_(row_set_resource), + row_set_resource_(*row_set_resource), table_id_(table_id), columns_(columns) { dtypes_.push_back(DT_STRING); @@ -314,7 +318,11 @@ class Dataset : public DatasetBase { } BigtableClientResource& client_resource() const { return client_resource_; } +<<<<<<< HEAD io::BigtableRowSetResource* row_set_resource() const { return row_set_resource_; } +======= + BigtableRowsetResource& row_set_resource() const { return row_set_resource_; } +>>>>>>> c81fd9a... code cleanup 1 protected: Status AsGraphDefInternal(SerializationContext* ctx, @@ -328,7 +336,11 @@ class Dataset : public DatasetBase { private: BigtableClientResource& client_resource_; +<<<<<<< HEAD io::BigtableRowSetResource* row_set_resource_; +======= + BigtableRowsetResource& row_set_resource_; +>>>>>>> c81fd9a... code cleanup 1 const std::string table_id_; const std::vector columns_; DataTypeVector dtypes_; @@ -414,19 +426,12 @@ class BigtableSampleRowSetsOp : public OpKernel { OP_REQUIRES_OK( context, GetResourceFromContext(context, "row_set", &row_set_resource)); core::ScopedUnref unref_row_set(row_set_resource); - VLOG(1) << "BigtableSampleRowSetsOp got resources "; auto table = client_resource->CreateTable(table_id_); - VLOG(1) << "created table"; auto maybe_sample_row_keys = table.SampleRows(); - VLOG(1) << "sample rows"; if (!maybe_sample_row_keys.ok()) throw std::runtime_error(maybe_sample_row_keys.status().message()); auto& sample_row_keys = maybe_sample_row_keys.value(); - VLOG(1) << "got row row_keys"; - - cbt::RowSet const& row_set = row_set_resource->row_set(); - VLOG(1) << "got row_set"; std::vector> tablets; @@ -441,37 +446,28 @@ class BigtableSampleRowSetsOp : public OpKernel { } tablets.erase(std::remove_if( tablets.begin(), tablets.end(), - [&row_set](std::pair const& p) { - return !RowSetIntersectsRange(row_set, p.first, + [row_set_resource](std::pair const& p) { + return !RowSetIntersectsRange(row_set_resource->row_set(), p.first, p.second); }), tablets.end()); VLOG(1) << "got table of tablets of size:" << tablets.size(); - long output_size = std::min((int)tablets.size(), num_parallel_calls_); - - VLOG(1) << ", allocating output {" << output_size << ",2}"; + size_t output_size = std::min(tablets.size(), (size_t) num_parallel_calls_); Tensor* output_tensor = NULL; OP_REQUIRES_OK(context, context->allocate_output(0, {(long)output_size, 2}, &output_tensor)); auto output_v = output_tensor->tensor(); - for (int i = 0; i < output_size; i++) { + for (size_t i = 0; i < output_size; i++) { size_t start_idx = GetWorkerStartIndex(tablets.size(), output_size, i); size_t next_worker_start_idx = GetWorkerStartIndex(tablets.size(), output_size, i + 1); size_t end_idx = next_worker_start_idx - 1; - - VLOG(1) << "for i=" << i << " getting share " << start_idx << ":" - << end_idx; - start_key = tablets.at(start_idx).first; std::string end_key = tablets.at(end_idx).second; - - VLOG(1) << "for i=" << i << " got range [" << start_key << "," << end_key - << "]"; output_v(i, 0) = start_key; output_v(i, 1) = end_key; } From c756183c68967b4a62191124f131edc3d08e1e0e Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Wed, 3 Nov 2021 11:39:48 +0100 Subject: [PATCH 16/29] more tests for parallel read --- .../bigtable/bigtable_dataset_kernel.cc | 40 +++++--------- .../test_bigtable/test_parallel_read_rows.py | 53 +++++++++++++++++++ 2 files changed, 67 insertions(+), 26 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index ac51fb195..67a0a9408 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -150,12 +150,8 @@ class Iterator : public DatasetIterator { columns_(ColumnsToFamiliesAndQualifiers(columns)), table_(this->dataset()->client_resource().CreateTable(table_id)), reader_(this->table_.ReadRows( -<<<<<<< HEAD this->dataset()->row_set_resource()->row_set(), // cbt::RowRange::InfiniteRange(), -======= - this->dataset()->row_set_resource().RowSet(), ->>>>>>> c81fd9a... code cleanup 1 cbt::Filter::Chain(CreateColumnsFilter(columns_), cbt::Filter::Latest(1)))), it_(this->reader_.begin()), @@ -300,11 +296,10 @@ class Dataset : public DatasetBase { std::unique_ptr MakeIteratorInternal( const std::string& prefix) const override { VLOG(1) << "MakeIteratorInternal. table=" << table_id_; - Iterator* iter = new Iterator( + return absl::make_unique>( typename DatasetIterator::Params{ this, strings::StrCat(prefix, "::BigtableDataset")}, table_id_, columns_); - return std::unique_ptr>(iter); } const DataTypeVector& output_dtypes() const override { return dtypes_; } @@ -318,11 +313,7 @@ class Dataset : public DatasetBase { } BigtableClientResource& client_resource() const { return client_resource_; } -<<<<<<< HEAD - io::BigtableRowSetResource* row_set_resource() const { return row_set_resource_; } -======= - BigtableRowsetResource& row_set_resource() const { return row_set_resource_; } ->>>>>>> c81fd9a... code cleanup 1 + io::BigtableRowsetResource& row_set_resource() const { return row_set_resource_; } protected: Status AsGraphDefInternal(SerializationContext* ctx, @@ -336,11 +327,7 @@ class Dataset : public DatasetBase { private: BigtableClientResource& client_resource_; -<<<<<<< HEAD io::BigtableRowSetResource* row_set_resource_; -======= - BigtableRowsetResource& row_set_resource_; ->>>>>>> c81fd9a... code cleanup 1 const std::string table_id_; const std::vector columns_; DataTypeVector dtypes_; @@ -444,17 +431,18 @@ class BigtableSampleRowSetsOp : public OpKernel { if (!start_key.empty()) { tablets.emplace_back(start_key, ""); } - tablets.erase(std::remove_if( - tablets.begin(), tablets.end(), - [row_set_resource](std::pair const& p) { - return !RowSetIntersectsRange(row_set_resource->row_set(), p.first, - p.second); - }), - tablets.end()); - - VLOG(1) << "got table of tablets of size:" << tablets.size(); - - size_t output_size = std::min(tablets.size(), (size_t) num_parallel_calls_); + tablets.erase( + std::remove_if( + tablets.begin(), tablets.end(), + [row_set_resource](std::pair const& p) { + return !RowSetIntersectsRange(row_set_resource->row_set(), p.first, + p.second); + }), + tablets.end()); + + VLOG(1) << "got array of tablets of size:" << tablets.size(); + + size_t output_size = std::min(tablets.size(), (size_t)num_parallel_calls_); Tensor* output_tensor = NULL; OP_REQUIRES_OK(context, context->allocate_output(0, {(long)output_size, 2}, diff --git a/tests/test_bigtable/test_parallel_read_rows.py b/tests/test_bigtable/test_parallel_read_rows.py index fe02e3a59..65e5b8d7f 100644 --- a/tests/test_bigtable/test_parallel_read_rows.py +++ b/tests/test_bigtable/test_parallel_read_rows.py @@ -19,6 +19,7 @@ import os from .bigtable_emulator import BigtableEmulator +from tensorflow_io.python.ops import core_ops from tensorflow_io.python.ops.bigtable.bigtable_dataset_ops import BigtableClient import tensorflow_io.python.ops.bigtable.bigtable_row_range as row_range import tensorflow_io.python.ops.bigtable.bigtable_row_set as row_set @@ -51,3 +52,55 @@ def test_parallel_read(self): for i, r in enumerate(table.parallel_read_rows(["fam1:col1", "fam2:col2"], row_set=row_set.from_rows_or_ranges(row_range.empty()))): for j, c in enumerate(r): self.assertEqual(values[i][j], c.numpy().decode()) + + def test_not_parallel_read(self): + os.environ["BIGTABLE_EMULATOR_HOST"] = self.emulator.get_addr() + self.emulator.create_table("fake_project", "fake_instance", "test-table", + ["fam1", "fam2"], splits=["row005", "row010", "row015"]) + + values = [[f"[{i,j}]" for j in range(2)] for i in range(20)] + + ten = tf.constant(values) + + client = BigtableClient("fake_project", "fake_instance") + table = client.get_table("test-table") + + self.emulator.write_tensor("fake_project", "fake_instance", "test-table", ten, + ["row" + str(i).rjust(3, "0") for i in range(20)], ["fam1:col1", "fam2:col2"]) + + for i, r in enumerate(table.parallel_read_rows(["fam1:col1", "fam2:col2"], row_set=row_set.from_rows_or_ranges(row_range.empty()), num_parallel_calls=2)): + for j, c in enumerate(r): + self.assertEqual(values[i][j], c.numpy().decode()) + + def test_sample_row_sets(self): + os.environ["BIGTABLE_EMULATOR_HOST"] = self.emulator.get_addr() + self.emulator.create_table("fake_project", "fake_instance", "test-table", + ["fam1", "fam2"], splits=["row005", "row010", "row015", "row020", "row025", "row030"]) + + values = [[f"[{i,j}]" for j in range(2)] for i in range(40)] + + ten = tf.constant(values) + + client = BigtableClient("fake_project", "fake_instance") + + self.emulator.write_tensor("fake_project", "fake_instance", "test-table", ten, + ["row" + str(i).rjust(3, "0") for i in range(40)], ["fam1:col1", "fam2:col2"]) + + rs = row_set.from_rows_or_ranges(row_range.infinite()) + + num_parallel_calls = 2 + samples = [s for s in core_ops.bigtable_sample_row_sets(client._client_resource, rs._impl, "test-table", num_parallel_calls)] + self.assertEqual(len(samples), num_parallel_calls) + + num_parallel_calls = 6 + samples = [s for s in core_ops.bigtable_sample_row_sets(client._client_resource, rs._impl, "test-table", num_parallel_calls)] + + # The emulator may return different samples each time, so we can't + # expect an exact number, but it must be no more than num_parallel_calls + self.assertLessEqual(len(samples), num_parallel_calls) + + num_parallel_calls = 1 + samples = [s for s in core_ops.bigtable_sample_row_sets(client._client_resource, rs._impl, "test-table", num_parallel_calls)] + self.assertEqual(len(samples), num_parallel_calls) + self.assertEqual(samples[0].numpy()[0].decode(), "") + self.assertEqual(samples[0].numpy()[1].decode(), "") From ace753aeb3fd35643bc76a73fbd28a3e87f006f4 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Fri, 5 Nov 2021 15:53:58 +0100 Subject: [PATCH 17/29] run linter on python files --- .../ops/bigtable/bigtable_dataset_ops.py | 32 +++++++++++++------ .../python/ops/bigtable/bigtable_row_range.py | 1 - .../python/ops/bigtable/bigtable_row_set.py | 1 - 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py index e76a8cce9..82f7d2648 100644 --- a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py +++ b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py @@ -6,7 +6,11 @@ import tensorflow as tf from tensorflow.python.data.ops import dataset_ops -from tensorflow_io.python.ops.bigtable.bigtable_row_set import from_rows_or_ranges, RowSet +from tensorflow_io.python.ops.bigtable.bigtable_row_set import ( + from_rows_or_ranges, + RowSet, + intersect, +) from tensorflow_io.python.ops.bigtable.bigtable_row_range import infinite @@ -34,13 +38,23 @@ def __init__(self, client_resource, table_id: str): def read_rows(self, columns: List[str], row_set: RowSet): return _BigtableDataset(self._client_resource, self._table_id, columns, row_set) - - def parallel_read_rows(self, columns: List[str], num_parallel_calls=1, row_set: RowSet=from_rows_or_ranges(infinite())): - samples = core_ops.bigtable_sample_row_sets(self._client_resource, row_set._impl, self._table_id, num_parallel_calls) + + def parallel_read_rows( + self, + columns: List[str], + num_parallel_calls=1, + row_set: RowSet = from_rows_or_ranges(infinite()), + ): + samples = core_ops.bigtable_sample_row_sets( + self._client_resource, row_set._impl, self._table_id, num_parallel_calls + ) samples_ds = dataset_ops.Dataset.from_tensor_slices(samples) + def map_func(sample): - rs = RowSet(core_ops.bigtable_rowset_intersect_tensor(row_set._impl, sample)) - return self.read_rows(columns,rs) + rs = RowSet( + core_ops.bigtable_rowset_intersect_tensor(row_set._impl, sample) + ) + return self.read_rows(columns, rs) return samples_ds.interleave( map_func=map_func, @@ -51,12 +65,12 @@ def map_func(sample): ) - - class _BigtableDataset(dataset_ops.DatasetSource): """_BigtableDataset represents a dataset that retrieves keys and values.""" - def __init__(self, client_resource, table_id: str, columns: List[str], row_set: RowSet): + def __init__( + self, client_resource, table_id: str, columns: List[str], row_set: RowSet + ): self._table_id = table_id self._columns = columns self._element_spec = tf.TensorSpec( diff --git a/tensorflow_io/python/ops/bigtable/bigtable_row_range.py b/tensorflow_io/python/ops/bigtable/bigtable_row_range.py index 23192bd73..f01ec939a 100644 --- a/tensorflow_io/python/ops/bigtable/bigtable_row_range.py +++ b/tensorflow_io/python/ops/bigtable/bigtable_row_range.py @@ -15,7 +15,6 @@ """Module implementing basic functions for obtaining BigTable RowRanges""" from tensorflow_io.python.ops import core_ops -import tensorflow class RowRange: diff --git a/tensorflow_io/python/ops/bigtable/bigtable_row_set.py b/tensorflow_io/python/ops/bigtable/bigtable_row_set.py index 92f1ba9a4..7f728f63b 100644 --- a/tensorflow_io/python/ops/bigtable/bigtable_row_set.py +++ b/tensorflow_io/python/ops/bigtable/bigtable_row_set.py @@ -14,7 +14,6 @@ """Module implementing basic functions for obtaining BigTable RowSets""" -from tensorflow.python.framework import dtypes from tensorflow_io.python.ops import core_ops from . import bigtable_row_range from typing import Union From 360852e9e5a8830f90189d6aa53f421f924c40b6 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Fri, 5 Nov 2021 16:02:15 +0100 Subject: [PATCH 18/29] linter on tests --- .../test_bigtable/test_parallel_read_rows.py | 99 +++++++++++++++---- tests/test_bigtable/test_read_rows.py | 37 ++++++- tests/test_bigtable/test_row_set.py | 18 ++-- 3 files changed, 126 insertions(+), 28 deletions(-) diff --git a/tests/test_bigtable/test_parallel_read_rows.py b/tests/test_bigtable/test_parallel_read_rows.py index 65e5b8d7f..06390abd9 100644 --- a/tests/test_bigtable/test_parallel_read_rows.py +++ b/tests/test_bigtable/test_parallel_read_rows.py @@ -36,8 +36,13 @@ def tearDown(self): def test_parallel_read(self): os.environ["BIGTABLE_EMULATOR_HOST"] = self.emulator.get_addr() - self.emulator.create_table("fake_project", "fake_instance", "test-table", - ["fam1", "fam2"], splits=["row005", "row010", "row015"]) + self.emulator.create_table( + "fake_project", + "fake_instance", + "test-table", + ["fam1", "fam2"], + splits=["row005", "row010", "row015"], + ) values = [[f"[{i,j}]" for j in range(2)] for i in range(20)] @@ -46,17 +51,33 @@ def test_parallel_read(self): client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") - self.emulator.write_tensor("fake_project", "fake_instance", "test-table", ten, - ["row" + str(i).rjust(3, "0") for i in range(20)], ["fam1:col1", "fam2:col2"]) - - for i, r in enumerate(table.parallel_read_rows(["fam1:col1", "fam2:col2"], row_set=row_set.from_rows_or_ranges(row_range.empty()))): + self.emulator.write_tensor( + "fake_project", + "fake_instance", + "test-table", + ten, + ["row" + str(i).rjust(3, "0") for i in range(20)], + ["fam1:col1", "fam2:col2"], + ) + + for i, r in enumerate( + table.parallel_read_rows( + ["fam1:col1", "fam2:col2"], + row_set=row_set.from_rows_or_ranges(row_range.empty()), + ) + ): for j, c in enumerate(r): self.assertEqual(values[i][j], c.numpy().decode()) def test_not_parallel_read(self): os.environ["BIGTABLE_EMULATOR_HOST"] = self.emulator.get_addr() - self.emulator.create_table("fake_project", "fake_instance", "test-table", - ["fam1", "fam2"], splits=["row005", "row010", "row015"]) + self.emulator.create_table( + "fake_project", + "fake_instance", + "test-table", + ["fam1", "fam2"], + splits=["row005", "row010", "row015"], + ) values = [[f"[{i,j}]" for j in range(2)] for i in range(20)] @@ -65,17 +86,34 @@ def test_not_parallel_read(self): client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") - self.emulator.write_tensor("fake_project", "fake_instance", "test-table", ten, - ["row" + str(i).rjust(3, "0") for i in range(20)], ["fam1:col1", "fam2:col2"]) - - for i, r in enumerate(table.parallel_read_rows(["fam1:col1", "fam2:col2"], row_set=row_set.from_rows_or_ranges(row_range.empty()), num_parallel_calls=2)): + self.emulator.write_tensor( + "fake_project", + "fake_instance", + "test-table", + ten, + ["row" + str(i).rjust(3, "0") for i in range(20)], + ["fam1:col1", "fam2:col2"], + ) + + for i, r in enumerate( + table.parallel_read_rows( + ["fam1:col1", "fam2:col2"], + row_set=row_set.from_rows_or_ranges(row_range.empty()), + num_parallel_calls=2, + ) + ): for j, c in enumerate(r): self.assertEqual(values[i][j], c.numpy().decode()) def test_sample_row_sets(self): os.environ["BIGTABLE_EMULATOR_HOST"] = self.emulator.get_addr() - self.emulator.create_table("fake_project", "fake_instance", "test-table", - ["fam1", "fam2"], splits=["row005", "row010", "row015", "row020", "row025", "row030"]) + self.emulator.create_table( + "fake_project", + "fake_instance", + "test-table", + ["fam1", "fam2"], + splits=["row005", "row010", "row015", "row020", "row025", "row030"], + ) values = [[f"[{i,j}]" for j in range(2)] for i in range(40)] @@ -83,24 +121,45 @@ def test_sample_row_sets(self): client = BigtableClient("fake_project", "fake_instance") - self.emulator.write_tensor("fake_project", "fake_instance", "test-table", ten, - ["row" + str(i).rjust(3, "0") for i in range(40)], ["fam1:col1", "fam2:col2"]) + self.emulator.write_tensor( + "fake_project", + "fake_instance", + "test-table", + ten, + ["row" + str(i).rjust(3, "0") for i in range(40)], + ["fam1:col1", "fam2:col2"], + ) rs = row_set.from_rows_or_ranges(row_range.infinite()) num_parallel_calls = 2 - samples = [s for s in core_ops.bigtable_sample_row_sets(client._client_resource, rs._impl, "test-table", num_parallel_calls)] + samples = [ + s + for s in core_ops.bigtable_sample_row_sets( + client._client_resource, rs._impl, "test-table", num_parallel_calls + ) + ] self.assertEqual(len(samples), num_parallel_calls) num_parallel_calls = 6 - samples = [s for s in core_ops.bigtable_sample_row_sets(client._client_resource, rs._impl, "test-table", num_parallel_calls)] - + samples = [ + s + for s in core_ops.bigtable_sample_row_sets( + client._client_resource, rs._impl, "test-table", num_parallel_calls + ) + ] + # The emulator may return different samples each time, so we can't # expect an exact number, but it must be no more than num_parallel_calls self.assertLessEqual(len(samples), num_parallel_calls) num_parallel_calls = 1 - samples = [s for s in core_ops.bigtable_sample_row_sets(client._client_resource, rs._impl, "test-table", num_parallel_calls)] + samples = [ + s + for s in core_ops.bigtable_sample_row_sets( + client._client_resource, rs._impl, "test-table", num_parallel_calls + ) + ] self.assertEqual(len(samples), num_parallel_calls) self.assertEqual(samples[0].numpy()[0].decode(), "") self.assertEqual(samples[0].numpy()[1].decode(), "") diff --git a/tests/test_bigtable/test_read_rows.py b/tests/test_bigtable/test_read_rows.py index 05c8a3319..30fb64cbe 100644 --- a/tests/test_bigtable/test_read_rows.py +++ b/tests/test_bigtable/test_read_rows.py @@ -55,7 +55,40 @@ def test_read(self): ["fam1:col1", "fam2:col2"], ) - for i, r in enumerate(table.read_rows(["fam1:col1", "fam2:col2"])): + for i, r in enumerate( + table.read_rows( + ["fam1:col1", "fam2:col2"], + row_set=row_set.from_rows_or_ranges(row_range.empty()), + ) + ): for j, c in enumerate(r): - print("ij", i, j, c) self.assertEqual(values[i][j], c.numpy().decode()) + + def test_read_row_set(self): + os.environ["BIGTABLE_EMULATOR_HOST"] = self.emulator.get_addr() + self.emulator.create_table( + "fake_project", "fake_instance", "test-table", ["fam1", "fam2"] + ) + + values = [[f"[{i,j}]" for j in range(2)] for i in range(20)] + + ten = tf.constant(values) + + client = BigtableClient("fake_project", "fake_instance") + table = client.get_table("test-table") + + self.emulator.write_tensor( + "fake_project", + "fake_instance", + "test-table", + ten, + ["row" + str(i).rjust(3, "0") for i in range(20)], + ["fam1:col1", "fam2:col2"], + ) + + row_s = row_set.from_rows_or_ranges(row_range.closed_range("row000", "row009")) + + read_rows = [ + r for r in table.read_rows(["fam1:col1", "fam2:col2"], row_set=row_s) + ] + self.assertEqual(len(read_rows), 10) diff --git a/tests/test_bigtable/test_row_set.py b/tests/test_bigtable/test_row_set.py index cb250d89e..a0e2df242 100644 --- a/tests/test_bigtable/test_row_set.py +++ b/tests/test_bigtable/test_row_set.py @@ -106,14 +106,20 @@ def test_from_rows_or_ranges(self): self.assertEqual(expected, repr(r_set)) def test_intersect(self): - r_set = row_set.from_rows_or_ranges( - row_range.open_range("row1", "row5") - ) + r_set = row_set.from_rows_or_ranges(row_range.open_range("row1", "row5")) r_set = row_set.intersect(r_set, row_range.closed_range("row3", "row7")) expected = ( - "row_ranges {\n" - + ' start_key_closed: "row3"\n' - + " end_key_open: " + "row_ranges {\n" + ' start_key_closed: "row3"\n' + " end_key_open: " '"row5"\n' + "}\n" ) self.assertEqual(expected, repr(r_set)) + + def test_intersect_tensor(self): + r_set = row_set.from_rows_or_ranges(row_range.open_range("row1", "row5")) + tensor = tf.constant(["row2", "row3"]) + r_range = row_range.right_open("row2", "row3") + regular_intersect = row_set.intersect(r_set, r_range) + tensor_intersect = row_set.RowSet( + core_ops.bigtable_rowset_intersect_tensor(r_set._impl, tensor) + ) + self.assertEqual(repr(regular_intersect), repr(tensor_intersect)) From 5b8110ad69f016ca3db9e945c8ac0e6d6952e54c Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Fri, 5 Nov 2021 20:18:43 +0100 Subject: [PATCH 19/29] after rebase --- .../bigtable/bigtable_dataset_kernel.cc | 6 +-- .../core/kernels/bigtable/bigtable_row_set.cc | 51 +++++++++++++++++++ tensorflow_io/core/ops/bigtable_ops.cc | 6 +-- .../ops/bigtable/bigtable_dataset_ops.py | 2 +- 4 files changed, 58 insertions(+), 7 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index 67a0a9408..38564bdc3 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -150,7 +150,7 @@ class Iterator : public DatasetIterator { columns_(ColumnsToFamiliesAndQualifiers(columns)), table_(this->dataset()->client_resource().CreateTable(table_id)), reader_(this->table_.ReadRows( - this->dataset()->row_set_resource()->row_set(), + this->dataset()->row_set_resource().row_set(), // cbt::RowRange::InfiniteRange(), cbt::Filter::Chain(CreateColumnsFilter(columns_), cbt::Filter::Latest(1)))), @@ -313,7 +313,7 @@ class Dataset : public DatasetBase { } BigtableClientResource& client_resource() const { return client_resource_; } - io::BigtableRowsetResource& row_set_resource() const { return row_set_resource_; } + io::BigtableRowSetResource& row_set_resource() const { return row_set_resource_; } protected: Status AsGraphDefInternal(SerializationContext* ctx, @@ -327,7 +327,7 @@ class Dataset : public DatasetBase { private: BigtableClientResource& client_resource_; - io::BigtableRowSetResource* row_set_resource_; + io::BigtableRowSetResource& row_set_resource_; const std::string table_id_; const std::vector columns_; DataTypeVector dtypes_; diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_row_set.cc b/tensorflow_io/core/kernels/bigtable/bigtable_row_set.cc index 1841d01ed..99d2e3f7f 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_row_set.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_row_set.cc @@ -151,5 +151,56 @@ class BigtableRowSetIntersectOp : public OpKernel { REGISTER_KERNEL_BUILDER(Name("BigtableRowSetIntersect").Device(DEVICE_CPU), BigtableRowSetIntersectOp); + +class BigtableRowSetIntersectTensorOp : public OpKernel { + public: + explicit BigtableRowSetIntersectTensorOp(OpKernelConstruction* context) + : OpKernel(context) {} + + void Compute(OpKernelContext* context) override TF_LOCKS_EXCLUDED(mu_) { + mutex_lock l(mu_); + ResourceMgr* mgr = context->resource_manager(); + OP_REQUIRES_OK(context, cinfo_.Init(mgr, def())); + + BigtableRowSetResource* row_set_resource; + OP_REQUIRES_OK(context, GetResourceFromContext(context, "row_set", + &row_set_resource)); + core::ScopedUnref row_set_resource_unref(row_set_resource); + + + const Tensor* row_keys_tensor; + OP_REQUIRES_OK(context, context->input("row_range_tensor", &row_keys_tensor)); + auto row_keys = row_keys_tensor->tensor(); + + VLOG(1) << "RowsetIntersectTensor intersecting: [" << row_keys(0) << "," << row_keys(1) << ")"; + + BigtableRowSetResource* result_resource; + OP_REQUIRES_OK( + context, + mgr->LookupOrCreate( + cinfo_.container(), cinfo_.name(), &result_resource, + [this, row_set_resource, &row_keys]( + BigtableRowSetResource** ret) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { + *ret = new BigtableRowSetResource( + row_set_resource->Intersect( + cbt::RowRange::RightOpen(row_keys(0), row_keys(1)) + )); + return Status::OK(); + })); + + OP_REQUIRES_OK(context, MakeResourceHandleToOutput( + context, 0, cinfo_.container(), cinfo_.name(), + TypeIndex::Make())); + } + + protected: + // Variables accessible from subclasses. + mutex mu_; + ContainerInfo cinfo_ TF_GUARDED_BY(mu_); +}; + +REGISTER_KERNEL_BUILDER(Name("BigtableRowSetIntersectTensor").Device(DEVICE_CPU), + BigtableRowSetIntersectTensorOp); + } // namespace io } // namespace tensorflow diff --git a/tensorflow_io/core/ops/bigtable_ops.cc b/tensorflow_io/core/ops/bigtable_ops.cc index 45fddd0fe..9b6092992 100644 --- a/tensorflow_io/core/ops/bigtable_ops.cc +++ b/tensorflow_io/core/ops/bigtable_ops.cc @@ -94,12 +94,12 @@ REGISTER_OP("BigtableRowSetIntersect") .Output("result_row_set: resource") .SetShapeFn(shape_inference::ScalarShape); -REGISTER_OP("BigtableRowsetIntersectTensor") +REGISTER_OP("BigtableRowSetIntersectTensor") .Attr("container: string = ''") .Attr("shared_name: string = ''") - .Input("row_set_resource: resource") + .Input("row_set: resource") .Input("row_range_tensor: string") - .Output("row_set: resource") + .Output("result_row_set: resource") .SetShapeFn(shape_inference::UnchangedShape); diff --git a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py index 82f7d2648..8b60de977 100644 --- a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py +++ b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py @@ -52,7 +52,7 @@ def parallel_read_rows( def map_func(sample): rs = RowSet( - core_ops.bigtable_rowset_intersect_tensor(row_set._impl, sample) + core_ops.bigtable_row_set_intersect_tensor(row_set._impl, sample) ) return self.read_rows(columns, rs) From 8975bc94fc742e4e07db1a3def8cada08d770779 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Fri, 5 Nov 2021 20:19:40 +0100 Subject: [PATCH 20/29] add tests --- tests/test_bigtable/test_read_rows.py | 2 ++ tests/test_bigtable/test_row_set.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_bigtable/test_read_rows.py b/tests/test_bigtable/test_read_rows.py index 30fb64cbe..690dcf764 100644 --- a/tests/test_bigtable/test_read_rows.py +++ b/tests/test_bigtable/test_read_rows.py @@ -22,6 +22,8 @@ from tensorflow_io.python.ops.bigtable.bigtable_dataset_ops import ( BigtableClient, ) +import tensorflow_io.python.ops.bigtable.bigtable_row_range as row_range +import tensorflow_io.python.ops.bigtable.bigtable_row_set as row_set import tensorflow as tf from tensorflow import test diff --git a/tests/test_bigtable/test_row_set.py b/tests/test_bigtable/test_row_set.py index a0e2df242..fdaccd41d 100644 --- a/tests/test_bigtable/test_row_set.py +++ b/tests/test_bigtable/test_row_set.py @@ -120,6 +120,6 @@ def test_intersect_tensor(self): r_range = row_range.right_open("row2", "row3") regular_intersect = row_set.intersect(r_set, r_range) tensor_intersect = row_set.RowSet( - core_ops.bigtable_rowset_intersect_tensor(r_set._impl, tensor) + core_ops.bigtable_row_set_intersect_tensor(r_set._impl, tensor) ) self.assertEqual(repr(regular_intersect), repr(tensor_intersect)) From 963d3b35e24138fb0719811e097c02cff5c69887 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Fri, 5 Nov 2021 20:26:17 +0100 Subject: [PATCH 21/29] linting --- .../bigtable/bigtable_dataset_kernel.cc | 17 +++--- .../kernels/bigtable/bigtable_row_range.cc | 3 +- .../core/kernels/bigtable/bigtable_row_set.cc | 59 +++++++++---------- .../core/kernels/bigtable/bigtable_row_set.h | 5 +- .../ops/bigtable/bigtable_dataset_ops.py | 19 ++++-- .../test_bigtable/test_parallel_read_rows.py | 19 ++++-- tests/test_bigtable/test_read_rows.py | 7 ++- tests/test_bigtable/test_row_set.py | 12 +++- 8 files changed, 84 insertions(+), 57 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index 38564bdc3..dd6966ec0 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -30,7 +30,8 @@ namespace tensorflow { namespace data { namespace { -tensorflow::error::Code GoogleCloudErrorCodeToTfErrorCode(::google::cloud::StatusCode code) { +tensorflow::error::Code GoogleCloudErrorCodeToTfErrorCode( + ::google::cloud::StatusCode code) { switch (code) { case ::google::cloud::StatusCode::kOk: return ::tensorflow::error::OK; @@ -73,9 +74,9 @@ Status GoogleCloudStatusToTfStatus(const ::google::cloud::Status& status) { if (status.ok()) { return Status::OK(); } - return Status(GoogleCloudErrorCodeToTfErrorCode(status.code()), - strings::StrCat("Error reading from Cloud Bigtable: ", - status.message())); + return Status( + GoogleCloudErrorCodeToTfErrorCode(status.code()), + strings::StrCat("Error reading from Cloud Bigtable: ", status.message())); } class BigtableClientResource : public ResourceBase { @@ -313,7 +314,9 @@ class Dataset : public DatasetBase { } BigtableClientResource& client_resource() const { return client_resource_; } - io::BigtableRowSetResource& row_set_resource() const { return row_set_resource_; } + io::BigtableRowSetResource& row_set_resource() const { + return row_set_resource_; + } protected: Status AsGraphDefInternal(SerializationContext* ctx, @@ -435,8 +438,8 @@ class BigtableSampleRowSetsOp : public OpKernel { std::remove_if( tablets.begin(), tablets.end(), [row_set_resource](std::pair const& p) { - return !RowSetIntersectsRange(row_set_resource->row_set(), p.first, - p.second); + return !RowSetIntersectsRange(row_set_resource->row_set(), + p.first, p.second); }), tablets.end()); diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_row_range.cc b/tensorflow_io/core/kernels/bigtable/bigtable_row_range.cc index 49278219a..7e33154a1 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_row_range.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_row_range.cc @@ -132,8 +132,7 @@ class BigtablePrefixRowRangeOp } private: - StatusOr CreateResource() - override { + StatusOr CreateResource() override { return new BigtableRowRangeResource(cbt::RowRange::Prefix(prefix_)); } diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_row_set.cc b/tensorflow_io/core/kernels/bigtable/bigtable_row_set.cc index 99d2e3f7f..1b7f81f6e 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_row_set.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_row_set.cc @@ -28,8 +28,7 @@ class BigtableEmptyRowSetOp } private: - StatusOr CreateResource() - override { + StatusOr CreateResource() override { return new BigtableRowSetResource(cbt::RowSet()); } }; @@ -90,14 +89,13 @@ class BigtableRowSetAppendRowRangeOp : public OpKernel { void Compute(OpKernelContext* context) override { mutex_lock lock(mu_); BigtableRowSetResource* row_set_resource; - OP_REQUIRES_OK(context, GetResourceFromContext(context, "row_set", - &row_set_resource)); + OP_REQUIRES_OK( + context, GetResourceFromContext(context, "row_set", &row_set_resource)); core::ScopedUnref row_set_resource_unref(row_set_resource); BigtableRowRangeResource* row_range_resource; - OP_REQUIRES_OK(context, - GetResourceFromContext(context, "row_range", - &row_range_resource)); + OP_REQUIRES_OK(context, GetResourceFromContext(context, "row_range", + &row_range_resource)); core::ScopedUnref row_range_resource_unref(row_range_resource); row_set_resource->AppendRowRange(row_range_resource->row_range()); @@ -122,21 +120,21 @@ class BigtableRowSetIntersectOp : public OpKernel { OP_REQUIRES_OK(context, cinfo_.Init(mgr, def())); BigtableRowSetResource* row_set_resource; - OP_REQUIRES_OK(context, GetResourceFromContext(context, "row_set", - &row_set_resource)); + OP_REQUIRES_OK( + context, GetResourceFromContext(context, "row_set", &row_set_resource)); core::ScopedUnref row_set_resource_unref(row_set_resource); BigtableRowRangeResource* row_range_resource; - OP_REQUIRES_OK(context, - GetResourceFromContext(context, "row_range", - &row_range_resource)); + OP_REQUIRES_OK(context, GetResourceFromContext(context, "row_range", + &row_range_resource)); core::ScopedUnref row_range_resource_unref(row_range_resource); BigtableRowSetResource* result_resource = new BigtableRowSetResource( - row_set_resource->Intersect(row_range_resource->row_range())); + row_set_resource->Intersect(row_range_resource->row_range())); - OP_REQUIRES_OK(context, mgr->Create( - cinfo_.container(), cinfo_.name(), result_resource)); + OP_REQUIRES_OK(context, + mgr->Create( + cinfo_.container(), cinfo_.name(), result_resource)); OP_REQUIRES_OK(context, MakeResourceHandleToOutput( context, 0, cinfo_.container(), cinfo_.name(), @@ -151,7 +149,6 @@ class BigtableRowSetIntersectOp : public OpKernel { REGISTER_KERNEL_BUILDER(Name("BigtableRowSetIntersect").Device(DEVICE_CPU), BigtableRowSetIntersectOp); - class BigtableRowSetIntersectTensorOp : public OpKernel { public: explicit BigtableRowSetIntersectTensorOp(OpKernelConstruction* context) @@ -163,30 +160,29 @@ class BigtableRowSetIntersectTensorOp : public OpKernel { OP_REQUIRES_OK(context, cinfo_.Init(mgr, def())); BigtableRowSetResource* row_set_resource; - OP_REQUIRES_OK(context, GetResourceFromContext(context, "row_set", - &row_set_resource)); + OP_REQUIRES_OK( + context, GetResourceFromContext(context, "row_set", &row_set_resource)); core::ScopedUnref row_set_resource_unref(row_set_resource); - const Tensor* row_keys_tensor; - OP_REQUIRES_OK(context, context->input("row_range_tensor", &row_keys_tensor)); + OP_REQUIRES_OK(context, + context->input("row_range_tensor", &row_keys_tensor)); auto row_keys = row_keys_tensor->tensor(); - VLOG(1) << "RowsetIntersectTensor intersecting: [" << row_keys(0) << "," << row_keys(1) << ")"; + VLOG(1) << "RowsetIntersectTensor intersecting: [" << row_keys(0) << "," + << row_keys(1) << ")"; BigtableRowSetResource* result_resource; OP_REQUIRES_OK( context, mgr->LookupOrCreate( cinfo_.container(), cinfo_.name(), &result_resource, - [this, row_set_resource, &row_keys]( - BigtableRowSetResource** ret) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { - *ret = new BigtableRowSetResource( - row_set_resource->Intersect( - cbt::RowRange::RightOpen(row_keys(0), row_keys(1)) - )); - return Status::OK(); - })); + [this, row_set_resource, &row_keys](BigtableRowSetResource** ret) + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { + *ret = new BigtableRowSetResource(row_set_resource->Intersect( + cbt::RowRange::RightOpen(row_keys(0), row_keys(1)))); + return Status::OK(); + })); OP_REQUIRES_OK(context, MakeResourceHandleToOutput( context, 0, cinfo_.container(), cinfo_.name(), @@ -199,8 +195,9 @@ class BigtableRowSetIntersectTensorOp : public OpKernel { ContainerInfo cinfo_ TF_GUARDED_BY(mu_); }; -REGISTER_KERNEL_BUILDER(Name("BigtableRowSetIntersectTensor").Device(DEVICE_CPU), - BigtableRowSetIntersectTensorOp); +REGISTER_KERNEL_BUILDER( + Name("BigtableRowSetIntersectTensor").Device(DEVICE_CPU), + BigtableRowSetIntersectTensorOp); } // namespace io } // namespace tensorflow diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_row_set.h b/tensorflow_io/core/kernels/bigtable/bigtable_row_set.h index 95486bf79..14fc49d71 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_row_set.h +++ b/tensorflow_io/core/kernels/bigtable/bigtable_row_set.h @@ -16,7 +16,6 @@ limitations under the License. #ifndef BIGTABLE_ROW_SET_H #define BIGTABLE_ROW_SET_H - #include "google/cloud/bigtable/table.h" #include "tensorflow/core/framework/op_kernel.h" #include "tensorflow/core/framework/resource_mgr.h" @@ -50,9 +49,7 @@ class BigtableRowSetResource : public ResourceBase { return row_set_.Intersect(row_range); } - google::cloud::bigtable::RowSet const& row_set(){ - return row_set_; - } + google::cloud::bigtable::RowSet const& row_set() { return row_set_; } string DebugString() const override { return "BigtableRowSetResource:{" + ToString() + "}"; diff --git a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py index 8b60de977..e44d79cae 100644 --- a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py +++ b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py @@ -37,7 +37,9 @@ def __init__(self, client_resource, table_id: str): self._client_resource = client_resource def read_rows(self, columns: List[str], row_set: RowSet): - return _BigtableDataset(self._client_resource, self._table_id, columns, row_set) + return _BigtableDataset( + self._client_resource, self._table_id, columns, row_set + ) def parallel_read_rows( self, @@ -46,13 +48,18 @@ def parallel_read_rows( row_set: RowSet = from_rows_or_ranges(infinite()), ): samples = core_ops.bigtable_sample_row_sets( - self._client_resource, row_set._impl, self._table_id, num_parallel_calls + self._client_resource, + row_set._impl, + self._table_id, + num_parallel_calls, ) samples_ds = dataset_ops.Dataset.from_tensor_slices(samples) def map_func(sample): rs = RowSet( - core_ops.bigtable_row_set_intersect_tensor(row_set._impl, sample) + core_ops.bigtable_row_set_intersect_tensor( + row_set._impl, sample + ) ) return self.read_rows(columns, rs) @@ -69,7 +76,11 @@ class _BigtableDataset(dataset_ops.DatasetSource): """_BigtableDataset represents a dataset that retrieves keys and values.""" def __init__( - self, client_resource, table_id: str, columns: List[str], row_set: RowSet + self, + client_resource, + table_id: str, + columns: List[str], + row_set: RowSet, ): self._table_id = table_id self._columns = columns diff --git a/tests/test_bigtable/test_parallel_read_rows.py b/tests/test_bigtable/test_parallel_read_rows.py index 06390abd9..e0c1f333a 100644 --- a/tests/test_bigtable/test_parallel_read_rows.py +++ b/tests/test_bigtable/test_parallel_read_rows.py @@ -20,7 +20,9 @@ import os from .bigtable_emulator import BigtableEmulator from tensorflow_io.python.ops import core_ops -from tensorflow_io.python.ops.bigtable.bigtable_dataset_ops import BigtableClient +from tensorflow_io.python.ops.bigtable.bigtable_dataset_ops import ( + BigtableClient, +) import tensorflow_io.python.ops.bigtable.bigtable_row_range as row_range import tensorflow_io.python.ops.bigtable.bigtable_row_set as row_set import tensorflow as tf @@ -136,7 +138,10 @@ def test_sample_row_sets(self): samples = [ s for s in core_ops.bigtable_sample_row_sets( - client._client_resource, rs._impl, "test-table", num_parallel_calls + client._client_resource, + rs._impl, + "test-table", + num_parallel_calls, ) ] self.assertEqual(len(samples), num_parallel_calls) @@ -145,7 +150,10 @@ def test_sample_row_sets(self): samples = [ s for s in core_ops.bigtable_sample_row_sets( - client._client_resource, rs._impl, "test-table", num_parallel_calls + client._client_resource, + rs._impl, + "test-table", + num_parallel_calls, ) ] @@ -157,7 +165,10 @@ def test_sample_row_sets(self): samples = [ s for s in core_ops.bigtable_sample_row_sets( - client._client_resource, rs._impl, "test-table", num_parallel_calls + client._client_resource, + rs._impl, + "test-table", + num_parallel_calls, ) ] self.assertEqual(len(samples), num_parallel_calls) diff --git a/tests/test_bigtable/test_read_rows.py b/tests/test_bigtable/test_read_rows.py index 690dcf764..9fcac1e24 100644 --- a/tests/test_bigtable/test_read_rows.py +++ b/tests/test_bigtable/test_read_rows.py @@ -88,9 +88,12 @@ def test_read_row_set(self): ["fam1:col1", "fam2:col2"], ) - row_s = row_set.from_rows_or_ranges(row_range.closed_range("row000", "row009")) + row_s = row_set.from_rows_or_ranges( + row_range.closed_range("row000", "row009") + ) read_rows = [ - r for r in table.read_rows(["fam1:col1", "fam2:col2"], row_set=row_s) + r + for r in table.read_rows(["fam1:col1", "fam2:col2"], row_set=row_s) ] self.assertEqual(len(read_rows), 10) diff --git a/tests/test_bigtable/test_row_set.py b/tests/test_bigtable/test_row_set.py index fdaccd41d..327db3e04 100644 --- a/tests/test_bigtable/test_row_set.py +++ b/tests/test_bigtable/test_row_set.py @@ -106,16 +106,22 @@ def test_from_rows_or_ranges(self): self.assertEqual(expected, repr(r_set)) def test_intersect(self): - r_set = row_set.from_rows_or_ranges(row_range.open_range("row1", "row5")) + r_set = row_set.from_rows_or_ranges( + row_range.open_range("row1", "row5") + ) r_set = row_set.intersect(r_set, row_range.closed_range("row3", "row7")) expected = ( - "row_ranges {\n" + ' start_key_closed: "row3"\n' + " end_key_open: " + "row_ranges {\n" + + ' start_key_closed: "row3"\n' + + " end_key_open: " '"row5"\n' + "}\n" ) self.assertEqual(expected, repr(r_set)) def test_intersect_tensor(self): - r_set = row_set.from_rows_or_ranges(row_range.open_range("row1", "row5")) + r_set = row_set.from_rows_or_ranges( + row_range.open_range("row1", "row5") + ) tensor = tf.constant(["row2", "row3"]) r_range = row_range.right_open("row2", "row3") regular_intersect = row_set.intersect(r_set, r_range) From 71ca23271fa0dfde9c07523a29a8ac83e78346ef Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Tue, 9 Nov 2021 20:09:52 +0100 Subject: [PATCH 22/29] samples working but ugly --- .../bigtable/bigtable_dataset_kernel.cc | 26 ++++++++++++++++--- tensorflow_io/core/ops/bigtable_ops.cc | 13 +++------- .../ops/bigtable/bigtable_dataset_ops.py | 17 ++++++------ .../test_bigtable/test_parallel_read_rows.py | 2 -- tests/test_bigtable/test_row_set.py | 12 --------- 5 files changed, 34 insertions(+), 36 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index dd6966ec0..43be30f65 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -407,6 +407,11 @@ class BigtableSampleRowSetsOp : public OpKernel { void Compute(OpKernelContext* context) override { mutex_lock l(mu_); + + ResourceMgr* mgr = context->resource_manager(); + ContainerInfo cinfo; + OP_REQUIRES_OK(context, cinfo.Init(mgr, def())); + BigtableClientResource* client_resource; OP_REQUIRES_OK(context, GetResourceFromContext(context, "client", &client_resource)); @@ -448,9 +453,9 @@ class BigtableSampleRowSetsOp : public OpKernel { size_t output_size = std::min(tablets.size(), (size_t)num_parallel_calls_); Tensor* output_tensor = NULL; - OP_REQUIRES_OK(context, context->allocate_output(0, {(long)output_size, 2}, + OP_REQUIRES_OK(context, context->allocate_output(0, {(long)output_size,1}, &output_tensor)); - auto output_v = output_tensor->tensor(); + auto output_v = output_tensor->tensor(); for (size_t i = 0; i < output_size; i++) { size_t start_idx = GetWorkerStartIndex(tablets.size(), output_size, i); @@ -459,8 +464,21 @@ class BigtableSampleRowSetsOp : public OpKernel { size_t end_idx = next_worker_start_idx - 1; start_key = tablets.at(start_idx).first; std::string end_key = tablets.at(end_idx).second; - output_v(i, 0) = start_key; - output_v(i, 1) = end_key; + io::BigtableRowSetResource* row_range = + new io::BigtableRowSetResource(row_set_resource->Intersect( + cbt::RowRange::RightOpen(start_key, end_key))); + + std::string container_name = cinfo.name() + std::to_string(i); + + VLOG(1) << "creating resource:" << cinfo.container() << ":" + << container_name; + + OP_REQUIRES_OK(context, + mgr->Create( + cinfo.container(), container_name, row_range)); + output_v(i,0) = MakeResourceHandle( + cinfo.container(), container_name, *context->device(), + TypeIndex::Make()); } } diff --git a/tensorflow_io/core/ops/bigtable_ops.cc b/tensorflow_io/core/ops/bigtable_ops.cc index 9b6092992..17e24e32e 100644 --- a/tensorflow_io/core/ops/bigtable_ops.cc +++ b/tensorflow_io/core/ops/bigtable_ops.cc @@ -94,21 +94,16 @@ REGISTER_OP("BigtableRowSetIntersect") .Output("result_row_set: resource") .SetShapeFn(shape_inference::ScalarShape); -REGISTER_OP("BigtableRowSetIntersectTensor") - .Attr("container: string = ''") - .Attr("shared_name: string = ''") - .Input("row_set: resource") - .Input("row_range_tensor: string") - .Output("result_row_set: resource") - .SetShapeFn(shape_inference::UnchangedShape); - REGISTER_OP("BigtableSampleRowSets") + .Attr("container: string = ''") + .Attr("shared_name: string = ''") .Input("client: resource") .Input("row_set: resource") .Attr("table_id: string") .Attr("num_parallel_calls: int") - .Output("samples: string") + .Output("samples: resource") + .SetIsStateful() .SetShapeFn([](tensorflow::shape_inference::InferenceContext* c) { c->set_output(0, c->Vector(c->UnknownDim())); return tensorflow::Status::OK(); diff --git a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py index e44d79cae..18a04c088 100644 --- a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py +++ b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py @@ -53,17 +53,16 @@ def parallel_read_rows( self._table_id, num_parallel_calls, ) - samples_ds = dataset_ops.Dataset.from_tensor_slices(samples) + # samples_ds = dataset_ops.Dataset.from_tensor_slices(samples) - def map_func(sample): - rs = RowSet( - core_ops.bigtable_row_set_intersect_tensor( - row_set._impl, sample - ) - ) - return self.read_rows(columns, rs) + # for sam in samples_ds: + # print("#"*80) + # print(sam) + + def map_func(idx): + return self.read_rows(columns, RowSet(samples[idx])) - return samples_ds.interleave( + return tf.data.Dataset.range(samples.shape[0]).interleave( map_func=map_func, cycle_length=num_parallel_calls, block_length=1, diff --git a/tests/test_bigtable/test_parallel_read_rows.py b/tests/test_bigtable/test_parallel_read_rows.py index e0c1f333a..b284f0c12 100644 --- a/tests/test_bigtable/test_parallel_read_rows.py +++ b/tests/test_bigtable/test_parallel_read_rows.py @@ -172,5 +172,3 @@ def test_sample_row_sets(self): ) ] self.assertEqual(len(samples), num_parallel_calls) - self.assertEqual(samples[0].numpy()[0].decode(), "") - self.assertEqual(samples[0].numpy()[1].decode(), "") diff --git a/tests/test_bigtable/test_row_set.py b/tests/test_bigtable/test_row_set.py index 327db3e04..cb250d89e 100644 --- a/tests/test_bigtable/test_row_set.py +++ b/tests/test_bigtable/test_row_set.py @@ -117,15 +117,3 @@ def test_intersect(self): '"row5"\n' + "}\n" ) self.assertEqual(expected, repr(r_set)) - - def test_intersect_tensor(self): - r_set = row_set.from_rows_or_ranges( - row_range.open_range("row1", "row5") - ) - tensor = tf.constant(["row2", "row3"]) - r_range = row_range.right_open("row2", "row3") - regular_intersect = row_set.intersect(r_set, r_range) - tensor_intersect = row_set.RowSet( - core_ops.bigtable_row_set_intersect_tensor(r_set._impl, tensor) - ) - self.assertEqual(repr(regular_intersect), repr(tensor_intersect)) From 67095206db97d8adc6438345f4f6f04948e4a571 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Wed, 10 Nov 2021 12:23:16 +0100 Subject: [PATCH 23/29] removed accidental change --- .../core/kernels/gsmemcachedfs/memcached_file_block_cache.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorflow_io/core/kernels/gsmemcachedfs/memcached_file_block_cache.cc b/tensorflow_io/core/kernels/gsmemcachedfs/memcached_file_block_cache.cc index db39b37d1..8c2033cd9 100644 --- a/tensorflow_io/core/kernels/gsmemcachedfs/memcached_file_block_cache.cc +++ b/tensorflow_io/core/kernels/gsmemcachedfs/memcached_file_block_cache.cc @@ -759,7 +759,7 @@ int64 MemcachedFileBlockCache::AddToCacheBuffer(const string& memc_key, cache_buffer_keys_.push_back(memc_key); auto page = absl::make_unique>(); page->assign(data->begin(), data->end()); - cache_buffer_map_.emplace(memc_key, std::move(page)); + cache_buffer_map_.emplace(memc_key, page.release()); } return cache_buffer_keys_.size(); } From 0ca85bfb29937629c103c6d212370d585c5e5654 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Wed, 10 Nov 2021 12:46:13 +0100 Subject: [PATCH 24/29] run linter and fixed namimg --- tensorflow_io/core/BUILD | 4 +-- .../bigtable/bigtable_dataset_kernel.cc | 14 ++++---- tensorflow_io/core/ops/bigtable_ops.cc | 2 -- .../ops/bigtable/bigtable_dataset_ops.py | 32 ++++++------------- .../python/ops/bigtable/bigtable_row_set.py | 8 ++--- tests/test_bigtable/bigtable_emulator.py | 8 ++--- .../test_bigtable/test_parallel_read_rows.py | 19 +++-------- tests/test_bigtable/test_read_rows.py | 11 ++----- tests/test_bigtable/test_row_set.py | 8 ++--- 9 files changed, 31 insertions(+), 75 deletions(-) diff --git a/tensorflow_io/core/BUILD b/tensorflow_io/core/BUILD index 17213605f..8979b00b4 100644 --- a/tensorflow_io/core/BUILD +++ b/tensorflow_io/core/BUILD @@ -180,10 +180,10 @@ cc_library( srcs = [ "kernels/bigtable/bigtable_dataset_kernel.cc", "kernels/bigtable/bigtable_resource_kernel.h", - "kernels/bigtable/bigtable_row_range.h", - "kernels/bigtable/bigtable_row_set.h", "kernels/bigtable/bigtable_row_range.cc", + "kernels/bigtable/bigtable_row_range.h", "kernels/bigtable/bigtable_row_set.cc", + "kernels/bigtable/bigtable_row_set.h", "ops/bigtable_ops.cc", ], copts = tf_io_copts(), diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index 43be30f65..3c33226cd 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -453,9 +453,9 @@ class BigtableSampleRowSetsOp : public OpKernel { size_t output_size = std::min(tablets.size(), (size_t)num_parallel_calls_); Tensor* output_tensor = NULL; - OP_REQUIRES_OK(context, context->allocate_output(0, {(long)output_size,1}, + OP_REQUIRES_OK(context, context->allocate_output(0, {(long)output_size}, &output_tensor)); - auto output_v = output_tensor->tensor(); + auto output_v = output_tensor->tensor(); for (size_t i = 0; i < output_size; i++) { size_t start_idx = GetWorkerStartIndex(tablets.size(), output_size, i); @@ -464,7 +464,7 @@ class BigtableSampleRowSetsOp : public OpKernel { size_t end_idx = next_worker_start_idx - 1; start_key = tablets.at(start_idx).first; std::string end_key = tablets.at(end_idx).second; - io::BigtableRowSetResource* row_range = + io::BigtableRowSetResource* work_chunk_row_set = new io::BigtableRowSetResource(row_set_resource->Intersect( cbt::RowRange::RightOpen(start_key, end_key))); @@ -473,10 +473,10 @@ class BigtableSampleRowSetsOp : public OpKernel { VLOG(1) << "creating resource:" << cinfo.container() << ":" << container_name; - OP_REQUIRES_OK(context, - mgr->Create( - cinfo.container(), container_name, row_range)); - output_v(i,0) = MakeResourceHandle( + OP_REQUIRES_OK( + context, mgr->Create( + cinfo.container(), container_name, work_chunk_row_set)); + output_v(i) = MakeResourceHandle( cinfo.container(), container_name, *context->device(), TypeIndex::Make()); } diff --git a/tensorflow_io/core/ops/bigtable_ops.cc b/tensorflow_io/core/ops/bigtable_ops.cc index 17e24e32e..b97c867fc 100644 --- a/tensorflow_io/core/ops/bigtable_ops.cc +++ b/tensorflow_io/core/ops/bigtable_ops.cc @@ -34,7 +34,6 @@ REGISTER_OP("BigtableDataset") .SetIsStateful() .SetShapeFn(shape_inference::ScalarShape); - REGISTER_OP("BigtableEmptyRowSet") .Attr("container: string = ''") .Attr("shared_name: string = ''") @@ -94,7 +93,6 @@ REGISTER_OP("BigtableRowSetIntersect") .Output("result_row_set: resource") .SetShapeFn(shape_inference::ScalarShape); - REGISTER_OP("BigtableSampleRowSets") .Attr("container: string = ''") .Attr("shared_name: string = ''") diff --git a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py index 18a04c088..1384f46c6 100644 --- a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py +++ b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py @@ -23,9 +23,7 @@ class BigtableClient: def __init__(self, project_id: str, instance_id: str): """Creates a BigtableClient to start Bigtable read sessions.""" - self._client_resource = core_ops.bigtable_client( - project_id, instance_id - ) + self._client_resource = core_ops.bigtable_client(project_id, instance_id) def get_table(self, table_id): return BigtableTable(self._client_resource, table_id) @@ -37,9 +35,7 @@ def __init__(self, client_resource, table_id: str): self._client_resource = client_resource def read_rows(self, columns: List[str], row_set: RowSet): - return _BigtableDataset( - self._client_resource, self._table_id, columns, row_set - ) + return _BigtableDataset(self._client_resource, self._table_id, columns, row_set) def parallel_read_rows( self, @@ -48,20 +44,16 @@ def parallel_read_rows( row_set: RowSet = from_rows_or_ranges(infinite()), ): samples = core_ops.bigtable_sample_row_sets( - self._client_resource, - row_set._impl, - self._table_id, - num_parallel_calls, + self._client_resource, row_set._impl, self._table_id, num_parallel_calls, ) - # samples_ds = dataset_ops.Dataset.from_tensor_slices(samples) - # for sam in samples_ds: - # print("#"*80) - # print(sam) - def map_func(idx): return self.read_rows(columns, RowSet(samples[idx])) + # We interleave a dataset of sample's indexes instead of a dataset of + # samples, because Dataset.from_tensor_slices attempts to copy the + # resource tensors using DeepCopy from tensor_util.cc which is not + # possible for tensors of type DT_RESOURCE. return tf.data.Dataset.range(samples.shape[0]).interleave( map_func=map_func, cycle_length=num_parallel_calls, @@ -75,17 +67,11 @@ class _BigtableDataset(dataset_ops.DatasetSource): """_BigtableDataset represents a dataset that retrieves keys and values.""" def __init__( - self, - client_resource, - table_id: str, - columns: List[str], - row_set: RowSet, + self, client_resource, table_id: str, columns: List[str], row_set: RowSet, ): self._table_id = table_id self._columns = columns - self._element_spec = tf.TensorSpec( - shape=[len(columns)], dtype=dtypes.string - ) + self._element_spec = tf.TensorSpec(shape=[len(columns)], dtype=dtypes.string) variant_tensor = core_ops.bigtable_dataset( client_resource, row_set._impl, table_id, columns diff --git a/tensorflow_io/python/ops/bigtable/bigtable_row_set.py b/tensorflow_io/python/ops/bigtable/bigtable_row_set.py index 7f728f63b..acded8f7f 100644 --- a/tensorflow_io/python/ops/bigtable/bigtable_row_set.py +++ b/tensorflow_io/python/ops/bigtable/bigtable_row_set.py @@ -30,9 +30,7 @@ def append(self, row_or_range): if isinstance(row_or_range, str): core_ops.bigtable_row_set_append_row(self._impl, row_or_range) else: - core_ops.bigtable_row_set_append_row_range( - self._impl, row_or_range._impl - ) + core_ops.bigtable_row_set_append_row_range(self._impl, row_or_range._impl) def empty(): @@ -71,6 +69,4 @@ def intersect(row_set: RowSet, row_range: bigtable_row_range.RowRange): Returns: RowSet: an intersection of the given row set and row range. """ - return RowSet( - core_ops.bigtable_row_set_intersect(row_set._impl, row_range._impl) - ) + return RowSet(core_ops.bigtable_row_set_intersect(row_set._impl, row_range._impl)) diff --git a/tests/test_bigtable/bigtable_emulator.py b/tests/test_bigtable/bigtable_emulator.py index 7258a7ac6..d42144fa3 100644 --- a/tests/test_bigtable/bigtable_emulator.py +++ b/tests/test_bigtable/bigtable_emulator.py @@ -67,9 +67,7 @@ def _get_cbt_emulator_path(): def _get_cbt_cli_path(): - return _get_cbt_binary_path( - CBT_CLI_PATH_ENV_VAR, CBT_CLI_SEARCH_PATHS, "cbt cli" - ) + return _get_cbt_binary_path(CBT_CLI_PATH_ENV_VAR, CBT_CLI_SEARCH_PATHS, "cbt cli") def _extract_emulator_addr_from_output(emulator_output): @@ -82,9 +80,7 @@ def _extract_emulator_addr_from_output(emulator_output): for word in words: if re.fullmatch("[a-z.0-9]+:[0-9]+", word): return word - raise RuntimeError( - f"Failed to find CBT emulator in the line {line}" - ) + raise RuntimeError(f"Failed to find CBT emulator in the line {line}") class BigtableEmulator: diff --git a/tests/test_bigtable/test_parallel_read_rows.py b/tests/test_bigtable/test_parallel_read_rows.py index b284f0c12..7d014af4f 100644 --- a/tests/test_bigtable/test_parallel_read_rows.py +++ b/tests/test_bigtable/test_parallel_read_rows.py @@ -20,9 +20,7 @@ import os from .bigtable_emulator import BigtableEmulator from tensorflow_io.python.ops import core_ops -from tensorflow_io.python.ops.bigtable.bigtable_dataset_ops import ( - BigtableClient, -) +from tensorflow_io.python.ops.bigtable.bigtable_dataset_ops import BigtableClient import tensorflow_io.python.ops.bigtable.bigtable_row_range as row_range import tensorflow_io.python.ops.bigtable.bigtable_row_set as row_set import tensorflow as tf @@ -138,10 +136,7 @@ def test_sample_row_sets(self): samples = [ s for s in core_ops.bigtable_sample_row_sets( - client._client_resource, - rs._impl, - "test-table", - num_parallel_calls, + client._client_resource, rs._impl, "test-table", num_parallel_calls, ) ] self.assertEqual(len(samples), num_parallel_calls) @@ -150,10 +145,7 @@ def test_sample_row_sets(self): samples = [ s for s in core_ops.bigtable_sample_row_sets( - client._client_resource, - rs._impl, - "test-table", - num_parallel_calls, + client._client_resource, rs._impl, "test-table", num_parallel_calls, ) ] @@ -165,10 +157,7 @@ def test_sample_row_sets(self): samples = [ s for s in core_ops.bigtable_sample_row_sets( - client._client_resource, - rs._impl, - "test-table", - num_parallel_calls, + client._client_resource, rs._impl, "test-table", num_parallel_calls, ) ] self.assertEqual(len(samples), num_parallel_calls) diff --git a/tests/test_bigtable/test_read_rows.py b/tests/test_bigtable/test_read_rows.py index 9fcac1e24..823908713 100644 --- a/tests/test_bigtable/test_read_rows.py +++ b/tests/test_bigtable/test_read_rows.py @@ -19,9 +19,7 @@ import os from .bigtable_emulator import BigtableEmulator -from tensorflow_io.python.ops.bigtable.bigtable_dataset_ops import ( - BigtableClient, -) +from tensorflow_io.python.ops.bigtable.bigtable_dataset_ops import BigtableClient import tensorflow_io.python.ops.bigtable.bigtable_row_range as row_range import tensorflow_io.python.ops.bigtable.bigtable_row_set as row_set import tensorflow as tf @@ -88,12 +86,9 @@ def test_read_row_set(self): ["fam1:col1", "fam2:col2"], ) - row_s = row_set.from_rows_or_ranges( - row_range.closed_range("row000", "row009") - ) + row_s = row_set.from_rows_or_ranges(row_range.closed_range("row000", "row009")) read_rows = [ - r - for r in table.read_rows(["fam1:col1", "fam2:col2"], row_set=row_s) + r for r in table.read_rows(["fam1:col1", "fam2:col2"], row_set=row_s) ] self.assertEqual(len(read_rows), 10) diff --git a/tests/test_bigtable/test_row_set.py b/tests/test_bigtable/test_row_set.py index cb250d89e..949f53d31 100644 --- a/tests/test_bigtable/test_row_set.py +++ b/tests/test_bigtable/test_row_set.py @@ -106,14 +106,10 @@ def test_from_rows_or_ranges(self): self.assertEqual(expected, repr(r_set)) def test_intersect(self): - r_set = row_set.from_rows_or_ranges( - row_range.open_range("row1", "row5") - ) + r_set = row_set.from_rows_or_ranges(row_range.open_range("row1", "row5")) r_set = row_set.intersect(r_set, row_range.closed_range("row3", "row7")) expected = ( - "row_ranges {\n" - + ' start_key_closed: "row3"\n' - + " end_key_open: " + "row_ranges {\n" + ' start_key_closed: "row3"\n' + " end_key_open: " '"row5"\n' + "}\n" ) self.assertEqual(expected, repr(r_set)) From 127c3552a7c87e18862933f8d1b429bdc4ffe91b Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Wed, 10 Nov 2021 12:55:29 +0100 Subject: [PATCH 25/29] fix naming --- .../bigtable/bigtable_dataset_kernel.cc | 11 ++-- .../core/kernels/bigtable/bigtable_row_set.cc | 50 ------------------- tensorflow_io/core/ops/bigtable_ops.cc | 2 +- .../ops/bigtable/bigtable_dataset_ops.py | 2 +- 4 files changed, 7 insertions(+), 58 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index 3c33226cd..b847c2850 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -152,7 +152,6 @@ class Iterator : public DatasetIterator { table_(this->dataset()->client_resource().CreateTable(table_id)), reader_(this->table_.ReadRows( this->dataset()->row_set_resource().row_set(), - // cbt::RowRange::InfiniteRange(), cbt::Filter::Chain(CreateColumnsFilter(columns_), cbt::Filter::Latest(1)))), it_(this->reader_.begin()), @@ -396,10 +395,10 @@ bool RowSetIntersectsRange(cbt::RowSet const& row_set, return !row_set.Intersect(range).IsEmpty(); } -class BigtableSampleRowSetsOp : public OpKernel { +class BigtableSplitRowSetEvenlyOp : public OpKernel { public: - explicit BigtableSampleRowSetsOp(OpKernelConstruction* ctx) : OpKernel(ctx) { - VLOG(1) << "BigtableSampleRowSetsOp ctor "; + explicit BigtableSplitRowSetEvenlyOp(OpKernelConstruction* ctx) : OpKernel(ctx) { + VLOG(1) << "BigtableSplitRowSetEvenlyOp ctor "; OP_REQUIRES_OK(ctx, ctx->GetAttr("table_id", &table_id_)); OP_REQUIRES_OK(ctx, ctx->GetAttr("num_parallel_calls", &num_parallel_calls_)); @@ -488,8 +487,8 @@ class BigtableSampleRowSetsOp : public OpKernel { int num_parallel_calls_; }; -REGISTER_KERNEL_BUILDER(Name("BigtableSampleRowSets").Device(DEVICE_CPU), - BigtableSampleRowSetsOp); +REGISTER_KERNEL_BUILDER(Name("BigtableSplitRowSetEvenly").Device(DEVICE_CPU), + BigtableSplitRowSetEvenlyOp); } // namespace } // namespace data diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_row_set.cc b/tensorflow_io/core/kernels/bigtable/bigtable_row_set.cc index 1b7f81f6e..fb8297a3b 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_row_set.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_row_set.cc @@ -149,55 +149,5 @@ class BigtableRowSetIntersectOp : public OpKernel { REGISTER_KERNEL_BUILDER(Name("BigtableRowSetIntersect").Device(DEVICE_CPU), BigtableRowSetIntersectOp); -class BigtableRowSetIntersectTensorOp : public OpKernel { - public: - explicit BigtableRowSetIntersectTensorOp(OpKernelConstruction* context) - : OpKernel(context) {} - - void Compute(OpKernelContext* context) override TF_LOCKS_EXCLUDED(mu_) { - mutex_lock l(mu_); - ResourceMgr* mgr = context->resource_manager(); - OP_REQUIRES_OK(context, cinfo_.Init(mgr, def())); - - BigtableRowSetResource* row_set_resource; - OP_REQUIRES_OK( - context, GetResourceFromContext(context, "row_set", &row_set_resource)); - core::ScopedUnref row_set_resource_unref(row_set_resource); - - const Tensor* row_keys_tensor; - OP_REQUIRES_OK(context, - context->input("row_range_tensor", &row_keys_tensor)); - auto row_keys = row_keys_tensor->tensor(); - - VLOG(1) << "RowsetIntersectTensor intersecting: [" << row_keys(0) << "," - << row_keys(1) << ")"; - - BigtableRowSetResource* result_resource; - OP_REQUIRES_OK( - context, - mgr->LookupOrCreate( - cinfo_.container(), cinfo_.name(), &result_resource, - [this, row_set_resource, &row_keys](BigtableRowSetResource** ret) - TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { - *ret = new BigtableRowSetResource(row_set_resource->Intersect( - cbt::RowRange::RightOpen(row_keys(0), row_keys(1)))); - return Status::OK(); - })); - - OP_REQUIRES_OK(context, MakeResourceHandleToOutput( - context, 0, cinfo_.container(), cinfo_.name(), - TypeIndex::Make())); - } - - protected: - // Variables accessible from subclasses. - mutex mu_; - ContainerInfo cinfo_ TF_GUARDED_BY(mu_); -}; - -REGISTER_KERNEL_BUILDER( - Name("BigtableRowSetIntersectTensor").Device(DEVICE_CPU), - BigtableRowSetIntersectTensorOp); - } // namespace io } // namespace tensorflow diff --git a/tensorflow_io/core/ops/bigtable_ops.cc b/tensorflow_io/core/ops/bigtable_ops.cc index b97c867fc..d36f508a6 100644 --- a/tensorflow_io/core/ops/bigtable_ops.cc +++ b/tensorflow_io/core/ops/bigtable_ops.cc @@ -93,7 +93,7 @@ REGISTER_OP("BigtableRowSetIntersect") .Output("result_row_set: resource") .SetShapeFn(shape_inference::ScalarShape); -REGISTER_OP("BigtableSampleRowSets") +REGISTER_OP("BigtableSplitRowSetEvenly") .Attr("container: string = ''") .Attr("shared_name: string = ''") .Input("client: resource") diff --git a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py index 1384f46c6..fef979c85 100644 --- a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py +++ b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py @@ -43,7 +43,7 @@ def parallel_read_rows( num_parallel_calls=1, row_set: RowSet = from_rows_or_ranges(infinite()), ): - samples = core_ops.bigtable_sample_row_sets( + samples = core_ops.bigtable_split_row_set_evenly( self._client_resource, row_set._impl, self._table_id, num_parallel_calls, ) From c5a8fae921571b904bdda526a8732a0fe408bed4 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Wed, 10 Nov 2021 14:44:11 +0100 Subject: [PATCH 26/29] handled empty row_set --- .../bigtable/bigtable_dataset_kernel.cc | 17 ++++++-- .../ops/bigtable/bigtable_dataset_ops.py | 4 +- .../test_bigtable/test_parallel_read_rows.py | 41 ++++++++++++++----- 3 files changed, 46 insertions(+), 16 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index b847c2850..63f3392f4 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -397,7 +397,8 @@ bool RowSetIntersectsRange(cbt::RowSet const& row_set, class BigtableSplitRowSetEvenlyOp : public OpKernel { public: - explicit BigtableSplitRowSetEvenlyOp(OpKernelConstruction* ctx) : OpKernel(ctx) { + explicit BigtableSplitRowSetEvenlyOp(OpKernelConstruction* ctx) + : OpKernel(ctx) { VLOG(1) << "BigtableSplitRowSetEvenlyOp ctor "; OP_REQUIRES_OK(ctx, ctx->GetAttr("table_id", &table_id_)); OP_REQUIRES_OK(ctx, @@ -421,10 +422,18 @@ class BigtableSplitRowSetEvenlyOp : public OpKernel { context, GetResourceFromContext(context, "row_set", &row_set_resource)); core::ScopedUnref unref_row_set(row_set_resource); + VLOG(1) << "BigtableSplitRowSetEvenlyOp got RowSet: " + << row_set_resource->ToString(); + if (row_set_resource->row_set().IsEmpty()) { + OP_REQUIRES_OK(context, + errors::FailedPrecondition("row_set cannot be empty!")); + } + auto table = client_resource->CreateTable(table_id_); auto maybe_sample_row_keys = table.SampleRows(); - if (!maybe_sample_row_keys.ok()) - throw std::runtime_error(maybe_sample_row_keys.status().message()); + OP_REQUIRES_OK(context, + GoogleCloudStatusToTfStatus(maybe_sample_row_keys.status())); + auto& sample_row_keys = maybe_sample_row_keys.value(); std::vector> tablets; @@ -435,7 +444,7 @@ class BigtableSplitRowSetEvenlyOp : public OpKernel { tablets.emplace_back(start_key, end_key); start_key = std::move(end_key); } - if (!start_key.empty()) { + if (!start_key.empty() || tablets.size() == 0) { tablets.emplace_back(start_key, ""); } tablets.erase( diff --git a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py index fef979c85..ca047a4d8 100644 --- a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py +++ b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py @@ -40,9 +40,11 @@ def read_rows(self, columns: List[str], row_set: RowSet): def parallel_read_rows( self, columns: List[str], - num_parallel_calls=1, + num_parallel_calls=tf.data.AUTOTUNE, row_set: RowSet = from_rows_or_ranges(infinite()), ): + + print("calling parallel read_rows with row_set:", row_set) samples = core_ops.bigtable_split_row_set_evenly( self._client_resource, row_set._impl, self._table_id, num_parallel_calls, ) diff --git a/tests/test_bigtable/test_parallel_read_rows.py b/tests/test_bigtable/test_parallel_read_rows.py index 7d014af4f..a7d5bf9b0 100644 --- a/tests/test_bigtable/test_parallel_read_rows.py +++ b/tests/test_bigtable/test_parallel_read_rows.py @@ -18,6 +18,7 @@ # pylint: disable=C0115 import os +from re import escape from .bigtable_emulator import BigtableEmulator from tensorflow_io.python.ops import core_ops from tensorflow_io.python.ops.bigtable.bigtable_dataset_ops import BigtableClient @@ -45,6 +46,7 @@ def test_parallel_read(self): ) values = [[f"[{i,j}]" for j in range(2)] for i in range(20)] + flat_values = [value for row in values for value in row] ten = tf.constant(values) @@ -60,14 +62,12 @@ def test_parallel_read(self): ["fam1:col1", "fam2:col2"], ) - for i, r in enumerate( - table.parallel_read_rows( - ["fam1:col1", "fam2:col2"], - row_set=row_set.from_rows_or_ranges(row_range.empty()), - ) + for r in table.parallel_read_rows( + ["fam1:col1", "fam2:col2"], + row_set=row_set.from_rows_or_ranges(row_range.infinite()), ): - for j, c in enumerate(r): - self.assertEqual(values[i][j], c.numpy().decode()) + for c in r: + self.assertTrue(c.numpy().decode() in flat_values) def test_not_parallel_read(self): os.environ["BIGTABLE_EMULATOR_HOST"] = self.emulator.get_addr() @@ -105,7 +105,7 @@ def test_not_parallel_read(self): for j, c in enumerate(r): self.assertEqual(values[i][j], c.numpy().decode()) - def test_sample_row_sets(self): + def test_split_row_set(self): os.environ["BIGTABLE_EMULATOR_HOST"] = self.emulator.get_addr() self.emulator.create_table( "fake_project", @@ -135,7 +135,7 @@ def test_sample_row_sets(self): num_parallel_calls = 2 samples = [ s - for s in core_ops.bigtable_sample_row_sets( + for s in core_ops.bigtable_split_row_set_evenly( client._client_resource, rs._impl, "test-table", num_parallel_calls, ) ] @@ -144,7 +144,7 @@ def test_sample_row_sets(self): num_parallel_calls = 6 samples = [ s - for s in core_ops.bigtable_sample_row_sets( + for s in core_ops.bigtable_split_row_set_evenly( client._client_resource, rs._impl, "test-table", num_parallel_calls, ) ] @@ -156,8 +156,27 @@ def test_sample_row_sets(self): num_parallel_calls = 1 samples = [ s - for s in core_ops.bigtable_sample_row_sets( + for s in core_ops.bigtable_split_row_set_evenly( client._client_resource, rs._impl, "test-table", num_parallel_calls, ) ] self.assertEqual(len(samples), num_parallel_calls) + + def test_split_empty(self): + os.environ["BIGTABLE_EMULATOR_HOST"] = self.emulator.get_addr() + self.emulator.create_table( + "fake_project", "fake_instance", "test-table", ["fam1", "fam2"], + ) + + client = BigtableClient("fake_project", "fake_instance") + + rs = row_set.from_rows_or_ranges(row_range.empty()) + + num_parallel_calls = 2 + + self.assertRaises( + tf.errors.FailedPreconditionError, + lambda: core_ops.bigtable_split_row_set_evenly( + client._client_resource, rs._impl, "test-table", num_parallel_calls, + ), + ) From 628a07f7881630e6e4e2f4a19ba5e515258fe1a6 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Fri, 19 Nov 2021 14:40:06 +0100 Subject: [PATCH 27/29] pr comments --- .../bigtable/bigtable_dataset_kernel.cc | 63 ++++++++++--------- tensorflow_io/core/ops/bigtable_ops.cc | 2 +- .../test_bigtable/test_parallel_read_rows.py | 45 ++++++++----- 3 files changed, 64 insertions(+), 46 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index 63f3392f4..b7397e7e0 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -87,11 +87,8 @@ class BigtableClientResource : public ResourceBase { VLOG(1) << "BigtableClientResource ctor"; } - cbt::Table CreateTable(const std::string& table_id) { - VLOG(1) << "CreateTable"; - cbt::Table table(data_client_, table_id); - VLOG(1) << "table crated"; - return table; + const std::shared_ptr& data_client() const { + return data_client_; } ~BigtableClientResource() { VLOG(1) << "BigtableClientResource dtor"; } @@ -145,13 +142,11 @@ template class Iterator : public DatasetIterator { public: explicit Iterator(const typename DatasetIterator::Params& params, - const std::string& table_id, const std::vector& columns) : DatasetIterator(params), columns_(ColumnsToFamiliesAndQualifiers(columns)), - table_(this->dataset()->client_resource().CreateTable(table_id)), - reader_(this->table_.ReadRows( - this->dataset()->row_set_resource().row_set(), + reader_(this->dataset()->CreateTable().ReadRows( + this->dataset()->row_set(), cbt::Filter::Chain(CreateColumnsFilter(columns_), cbt::Filter::Latest(1)))), it_(this->reader_.begin()), @@ -269,7 +264,7 @@ class Iterator : public DatasetIterator { mutex mu_; const std::vector> columns_; - cbt::Table table_; + // cbt::Table table_; cbt::RowReader reader_ GUARDED_BY(mu_); cbt::v1::internal::RowReaderIterator it_ GUARDED_BY(mu_); // we're using a map with const refs to avoid copying strings when searching @@ -281,12 +276,13 @@ class Iterator : public DatasetIterator { class Dataset : public DatasetBase { public: - Dataset(OpKernelContext* ctx, BigtableClientResource* client_resource, - io::BigtableRowSetResource* row_set_resource, std::string table_id, + Dataset(OpKernelContext* ctx, + const std::shared_ptr& data_client, + cbt::RowSet row_set, std::string table_id, std::vector columns) : DatasetBase(DatasetContext(ctx)), - client_resource_(*client_resource), - row_set_resource_(*row_set_resource), + data_client_(data_client), + row_set_(std::move(row_set)), table_id_(table_id), columns_(columns) { dtypes_.push_back(DT_STRING); @@ -299,7 +295,7 @@ class Dataset : public DatasetBase { return absl::make_unique>( typename DatasetIterator::Params{ this, strings::StrCat(prefix, "::BigtableDataset")}, - table_id_, columns_); + columns_); } const DataTypeVector& output_dtypes() const override { return dtypes_; } @@ -312,9 +308,16 @@ class Dataset : public DatasetBase { return "BigtableDatasetOp::Dataset"; } - BigtableClientResource& client_resource() const { return client_resource_; } - io::BigtableRowSetResource& row_set_resource() const { - return row_set_resource_; + const std::shared_ptr& data_client() const { + return data_client_; + } + const cbt::RowSet& row_set() const { return row_set_; } + + cbt::Table CreateTable() const { + VLOG(1) << "CreateTable"; + cbt::Table table(data_client_, table_id_); + VLOG(1) << "table crated"; + return table; } protected: @@ -328,8 +331,8 @@ class Dataset : public DatasetBase { Status CheckExternalState() const override { return Status::OK(); } private: - BigtableClientResource& client_resource_; - io::BigtableRowSetResource& row_set_resource_; + std::shared_ptr const& data_client_; + const cbt::RowSet row_set_; const std::string table_id_; const std::vector columns_; DataTypeVector dtypes_; @@ -355,8 +358,8 @@ class BigtableDatasetOp : public DatasetOpKernel { GetResourceFromContext(ctx, "row_set", &row_set_resource)); core::ScopedUnref row_set_resource_unref_(row_set_resource); - *output = new Dataset(ctx, client_resource, row_set_resource, table_id_, - columns_); + *output = new Dataset(ctx, client_resource->data_client(), + row_set_resource->row_set(), table_id_, columns_); } private: @@ -401,8 +404,7 @@ class BigtableSplitRowSetEvenlyOp : public OpKernel { : OpKernel(ctx) { VLOG(1) << "BigtableSplitRowSetEvenlyOp ctor "; OP_REQUIRES_OK(ctx, ctx->GetAttr("table_id", &table_id_)); - OP_REQUIRES_OK(ctx, - ctx->GetAttr("num_parallel_calls", &num_parallel_calls_)); + OP_REQUIRES_OK(ctx, ctx->GetAttr("num_splits", &num_splits_)); } void Compute(OpKernelContext* context) override { @@ -429,7 +431,7 @@ class BigtableSplitRowSetEvenlyOp : public OpKernel { errors::FailedPrecondition("row_set cannot be empty!")); } - auto table = client_resource->CreateTable(table_id_); + auto table = cbt::Table(client_resource->data_client(), table_id_); auto maybe_sample_row_keys = table.SampleRows(); OP_REQUIRES_OK(context, GoogleCloudStatusToTfStatus(maybe_sample_row_keys.status())); @@ -458,11 +460,12 @@ class BigtableSplitRowSetEvenlyOp : public OpKernel { VLOG(1) << "got array of tablets of size:" << tablets.size(); - size_t output_size = std::min(tablets.size(), (size_t)num_parallel_calls_); + size_t output_size = std::min(tablets.size(), num_splits_); Tensor* output_tensor = NULL; - OP_REQUIRES_OK(context, context->allocate_output(0, {(long)output_size}, - &output_tensor)); + OP_REQUIRES_OK(context, + context->allocate_output(0, {static_cast(output_size)}, + &output_tensor)); auto output_v = output_tensor->tensor(); for (size_t i = 0; i < output_size; i++) { @@ -492,8 +495,8 @@ class BigtableSplitRowSetEvenlyOp : public OpKernel { private: mutable mutex mu_; - std::string table_id_; - int num_parallel_calls_; + std::string table_id_ GUARDED_BY(mu_); + int num_splits_ GUARDED_BY(mu_); }; REGISTER_KERNEL_BUILDER(Name("BigtableSplitRowSetEvenly").Device(DEVICE_CPU), diff --git a/tensorflow_io/core/ops/bigtable_ops.cc b/tensorflow_io/core/ops/bigtable_ops.cc index d36f508a6..73492e8e6 100644 --- a/tensorflow_io/core/ops/bigtable_ops.cc +++ b/tensorflow_io/core/ops/bigtable_ops.cc @@ -99,7 +99,7 @@ REGISTER_OP("BigtableSplitRowSetEvenly") .Input("client: resource") .Input("row_set: resource") .Attr("table_id: string") - .Attr("num_parallel_calls: int") + .Attr("num_splits: int") .Output("samples: resource") .SetIsStateful() .SetShapeFn([](tensorflow::shape_inference::InferenceContext* c) { diff --git a/tests/test_bigtable/test_parallel_read_rows.py b/tests/test_bigtable/test_parallel_read_rows.py index a7d5bf9b0..664e4a73c 100644 --- a/tests/test_bigtable/test_parallel_read_rows.py +++ b/tests/test_bigtable/test_parallel_read_rows.py @@ -21,7 +21,9 @@ from re import escape from .bigtable_emulator import BigtableEmulator from tensorflow_io.python.ops import core_ops -from tensorflow_io.python.ops.bigtable.bigtable_dataset_ops import BigtableClient +from tensorflow_io.python.ops.bigtable.bigtable_dataset_ops import ( + BigtableClient, +) import tensorflow_io.python.ops.bigtable.bigtable_row_range as row_range import tensorflow_io.python.ops.bigtable.bigtable_row_set as row_set import tensorflow as tf @@ -95,15 +97,13 @@ def test_not_parallel_read(self): ["fam1:col1", "fam2:col2"], ) - for i, r in enumerate( - table.parallel_read_rows( - ["fam1:col1", "fam2:col2"], - row_set=row_set.from_rows_or_ranges(row_range.empty()), - num_parallel_calls=2, - ) - ): - for j, c in enumerate(r): - self.assertEqual(values[i][j], c.numpy().decode()) + dataset = table.parallel_read_rows( + ["fam1:col1", "fam2:col2"], + row_set=row_set.from_rows_or_ranges(row_range.infinite()), + num_parallel_calls=2, + ) + results = [[v.numpy().decode() for v in row] for row in dataset] + self.assertEqual(repr(sorted(values)), repr(sorted(results))) def test_split_row_set(self): os.environ["BIGTABLE_EMULATOR_HOST"] = self.emulator.get_addr() @@ -136,7 +136,10 @@ def test_split_row_set(self): samples = [ s for s in core_ops.bigtable_split_row_set_evenly( - client._client_resource, rs._impl, "test-table", num_parallel_calls, + client._client_resource, + rs._impl, + "test-table", + num_parallel_calls, ) ] self.assertEqual(len(samples), num_parallel_calls) @@ -145,7 +148,10 @@ def test_split_row_set(self): samples = [ s for s in core_ops.bigtable_split_row_set_evenly( - client._client_resource, rs._impl, "test-table", num_parallel_calls, + client._client_resource, + rs._impl, + "test-table", + num_parallel_calls, ) ] @@ -157,7 +163,10 @@ def test_split_row_set(self): samples = [ s for s in core_ops.bigtable_split_row_set_evenly( - client._client_resource, rs._impl, "test-table", num_parallel_calls, + client._client_resource, + rs._impl, + "test-table", + num_parallel_calls, ) ] self.assertEqual(len(samples), num_parallel_calls) @@ -165,7 +174,10 @@ def test_split_row_set(self): def test_split_empty(self): os.environ["BIGTABLE_EMULATOR_HOST"] = self.emulator.get_addr() self.emulator.create_table( - "fake_project", "fake_instance", "test-table", ["fam1", "fam2"], + "fake_project", + "fake_instance", + "test-table", + ["fam1", "fam2"], ) client = BigtableClient("fake_project", "fake_instance") @@ -177,6 +189,9 @@ def test_split_empty(self): self.assertRaises( tf.errors.FailedPreconditionError, lambda: core_ops.bigtable_split_row_set_evenly( - client._client_resource, rs._impl, "test-table", num_parallel_calls, + client._client_resource, + rs._impl, + "test-table", + num_parallel_calls, ), ) From e9bf2efaf0e075a4ca5e01d307993880bd276df6 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Fri, 19 Nov 2021 14:41:53 +0100 Subject: [PATCH 28/29] linter --- .../test_bigtable/test_parallel_read_rows.py | 29 ++++--------------- 1 file changed, 6 insertions(+), 23 deletions(-) diff --git a/tests/test_bigtable/test_parallel_read_rows.py b/tests/test_bigtable/test_parallel_read_rows.py index 664e4a73c..b20b1ef56 100644 --- a/tests/test_bigtable/test_parallel_read_rows.py +++ b/tests/test_bigtable/test_parallel_read_rows.py @@ -21,9 +21,7 @@ from re import escape from .bigtable_emulator import BigtableEmulator from tensorflow_io.python.ops import core_ops -from tensorflow_io.python.ops.bigtable.bigtable_dataset_ops import ( - BigtableClient, -) +from tensorflow_io.python.ops.bigtable.bigtable_dataset_ops import BigtableClient import tensorflow_io.python.ops.bigtable.bigtable_row_range as row_range import tensorflow_io.python.ops.bigtable.bigtable_row_set as row_set import tensorflow as tf @@ -136,10 +134,7 @@ def test_split_row_set(self): samples = [ s for s in core_ops.bigtable_split_row_set_evenly( - client._client_resource, - rs._impl, - "test-table", - num_parallel_calls, + client._client_resource, rs._impl, "test-table", num_parallel_calls, ) ] self.assertEqual(len(samples), num_parallel_calls) @@ -148,10 +143,7 @@ def test_split_row_set(self): samples = [ s for s in core_ops.bigtable_split_row_set_evenly( - client._client_resource, - rs._impl, - "test-table", - num_parallel_calls, + client._client_resource, rs._impl, "test-table", num_parallel_calls, ) ] @@ -163,10 +155,7 @@ def test_split_row_set(self): samples = [ s for s in core_ops.bigtable_split_row_set_evenly( - client._client_resource, - rs._impl, - "test-table", - num_parallel_calls, + client._client_resource, rs._impl, "test-table", num_parallel_calls, ) ] self.assertEqual(len(samples), num_parallel_calls) @@ -174,10 +163,7 @@ def test_split_row_set(self): def test_split_empty(self): os.environ["BIGTABLE_EMULATOR_HOST"] = self.emulator.get_addr() self.emulator.create_table( - "fake_project", - "fake_instance", - "test-table", - ["fam1", "fam2"], + "fake_project", "fake_instance", "test-table", ["fam1", "fam2"], ) client = BigtableClient("fake_project", "fake_instance") @@ -189,9 +175,6 @@ def test_split_empty(self): self.assertRaises( tf.errors.FailedPreconditionError, lambda: core_ops.bigtable_split_row_set_evenly( - client._client_resource, - rs._impl, - "test-table", - num_parallel_calls, + client._client_resource, rs._impl, "test-table", num_parallel_calls, ), ) From 759afcb9d4930224121d3f6ccbb5cbec2c4b01cc Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Mon, 22 Nov 2021 12:55:09 +0100 Subject: [PATCH 29/29] removed missed comment --- tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index b7397e7e0..e77bd39fd 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -264,7 +264,6 @@ class Iterator : public DatasetIterator { mutex mu_; const std::vector> columns_; - // cbt::Table table_; cbt::RowReader reader_ GUARDED_BY(mu_); cbt::v1::internal::RowReaderIterator it_ GUARDED_BY(mu_); // we're using a map with const refs to avoid copying strings when searching