From ebbb3057e66149f8e5d04cc03a36e60933aff7ab Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Fri, 5 Nov 2021 16:25:19 +0100 Subject: [PATCH 1/9] version filters --- .../bigtable/bigtable_version_filters.cc | 97 +++++++++++++++++++ .../bigtable/bigtable_version_filters.h | 63 ++++++++++++ tensorflow_io/core/ops/bigtable_ops.cc | 14 +++ .../ops/bigtable/bigtable_version_filters.py | 80 +++++++++++++++ 4 files changed, 254 insertions(+) create mode 100644 tensorflow_io/core/kernels/bigtable/bigtable_version_filters.cc create mode 100644 tensorflow_io/core/kernels/bigtable/bigtable_version_filters.h create mode 100644 tensorflow_io/python/ops/bigtable/bigtable_version_filters.py diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_version_filters.cc b/tensorflow_io/core/kernels/bigtable/bigtable_version_filters.cc new file mode 100644 index 000000000..63923e6b3 --- /dev/null +++ b/tensorflow_io/core/kernels/bigtable/bigtable_version_filters.cc @@ -0,0 +1,97 @@ +/* Copyright 2021 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ +#include "tensorflow_io/core/kernels/bigtable/bigtable_version_filters.h" + +namespace cbt = ::google::cloud::bigtable; + +namespace tensorflow { +namespace io { + +class BigtableLatestFilterOp + : public OpKernelCreatingResource { + public: + explicit BigtableLatestFilterOp(OpKernelConstruction* ctx) + : OpKernelCreatingResource(ctx) { + VLOG(1) << "BigtableLatestFilterOp ctor "; + } + + private: + Status CreateResource(BigtableFilterResource** resource) + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override { + *resource = new BigtableFilterResource(cbt::Filter::Latest(1)); + return Status::OK(); + } + + private: + mutable mutex mu_; +}; + +REGISTER_KERNEL_BUILDER(Name("BigtableLatestFilter").Device(DEVICE_CPU), + BigtableLatestFilterOp); + +class BigtableTimestampRangeFilterOp + : public OpKernelCreatingResource { + public: + explicit BigtableTimestampRangeFilterOp(OpKernelConstruction* ctx) + : OpKernelCreatingResource(ctx) { + VLOG(1) << "BigtableTimestampRangeFilterOp ctor "; + OP_REQUIRES_OK(ctx, ctx->GetAttr("start", &start_)); + OP_REQUIRES_OK(ctx, ctx->GetAttr("end", &end_)); + } + + private: + Status CreateResource(BigtableFilterResource** resource) + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override { + *resource = new BigtableFilterResource(cbt::Filter::TimestampRangeMicros(start_, end_)); + return Status::OK(); + } + + private: + mutable mutex mu_; + int64_t start_; + int64_t end_; +}; + +REGISTER_KERNEL_BUILDER(Name("BigtableTimestampRangeFilter").Device(DEVICE_CPU), + BigtableTimestampRangeFilterOp); + +class BigtablePrintFilterOp : public OpKernel { + public: + explicit BigtablePrintFilterOp(OpKernelConstruction* context) + : OpKernel(context) {} + + void Compute(OpKernelContext* context) override { + BigtableFilterResource* resource; + OP_REQUIRES_OK(context, + GetResourceFromContext(context, "filter", &resource)); + core::ScopedUnref unref(resource); + + // Create an output tensor + Tensor* output_tensor = NULL; + OP_REQUIRES_OK(context, context->allocate_output(0, {1}, &output_tensor)); + auto output_v = output_tensor->tensor(); + + output_v(0) = resource->ToString(); + } + + private: + mutable mutex mu_; +}; + +REGISTER_KERNEL_BUILDER(Name("BigtablePrintFilter").Device(DEVICE_CPU), + BigtablePrintFilterOp); + +} // namespace io +} // namespace tensorflow diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_version_filters.h b/tensorflow_io/core/kernels/bigtable/bigtable_version_filters.h new file mode 100644 index 000000000..235a1b21c --- /dev/null +++ b/tensorflow_io/core/kernels/bigtable/bigtable_version_filters.h @@ -0,0 +1,63 @@ +/* Copyright 2021 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#ifndef BIGTABLE_VERSION_FILTERS_H +#define BIGTABLE_VERSION_FILTERS_H + +#include "absl/memory/memory.h" +#include "google/cloud/bigtable/table.h" +#include "google/cloud/bigtable/table_admin.h" +#include "tensorflow/core/framework/common_shape_fns.h" +#include "tensorflow/core/framework/dataset.h" +#include "tensorflow/core/framework/op.h" +#include "tensorflow/core/framework/op_kernel.h" +#include "tensorflow/core/framework/resource_mgr.h" +#include "tensorflow/core/framework/resource_op_kernel.h" +#include "tensorflow_io/core/kernels/bigtable/bigtable_resource_kernel.h" + + +namespace tensorflow { +namespace io { + +class BigtableFilterResource : public ResourceBase { + public: + explicit BigtableFilterResource(google::cloud::bigtable::Filter filter) + : filter_(std::move(filter)) { + VLOG(1) << "BigtableFilterResource ctor"; + } + + ~BigtableFilterResource() { VLOG(1) << "BigtableFilterResource dtor"; } + + std::string ToString() const { + std::string res; + google::protobuf::TextFormat::PrintToString(filter_.as_proto(), &res); + return res; + } + + google::cloud::bigtable::Filter& filter() { return filter_; } + + string DebugString() const override { + return "BigtableFilterResource:{" + ToString() + "}"; + } + + private: + google::cloud::bigtable::Filter filter_; +}; + + +} // namespace io +} // namespace tensorflow + +#endif /* BIGTABLE_ROW_SET_H */ diff --git a/tensorflow_io/core/ops/bigtable_ops.cc b/tensorflow_io/core/ops/bigtable_ops.cc index 73492e8e6..f58a586bd 100644 --- a/tensorflow_io/core/ops/bigtable_ops.cc +++ b/tensorflow_io/core/ops/bigtable_ops.cc @@ -106,3 +106,17 @@ REGISTER_OP("BigtableSplitRowSetEvenly") c->set_output(0, c->Vector(c->UnknownDim())); return tensorflow::Status::OK(); }); + +REGISTER_OP("BigtableTimestampRangeFilter") + .Attr("container: string = ''") + .Attr("shared_name: string = ''") + .Attr("start: int") + .Attr("start: int") + .Output("filter: resource") + .SetIsStateful() + .SetShapeFn(shape_inference::ScalarShape); + +REGISTER_OP("BigtablePrintFilter") + .Input("filter: resource") + .Output("output: string") + .SetShapeFn(shape_inference::ScalarShape); diff --git a/tensorflow_io/python/ops/bigtable/bigtable_version_filters.py b/tensorflow_io/python/ops/bigtable/bigtable_version_filters.py new file mode 100644 index 000000000..c209886c6 --- /dev/null +++ b/tensorflow_io/python/ops/bigtable/bigtable_version_filters.py @@ -0,0 +1,80 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Module implementing basic functions for obtaining BigTable Filters +for version filtering. +""" +from typing import Union +from datetime import datetime +from tensorflow_io.python.ops import core_ops + + +class BigtableFilter: + def __init__(self, impl): + self._impl = impl + + def __repr__(self) -> str: + return core_ops.bigtable_print_filter(self._impl).numpy()[0].decode() + + +def latest(): + """Create a filter passing only the latest version of + column's value for each row. + Returns: + pbt_C.Filter: Filter passing only most recent version of a value. + """ + return core_ops.bigtable_latest_filter() + + +def timestamp_range( + start: Union[int, float, datetime], end: Union[int, float, datetime] +): + """Create a filter passing all values which timestamp is + from the specified range, exclusive at the start and inclusive at the end. + Args: + start: The start of the row range (inclusive). This can be either a python + datetime or a number (int of float) representing seconds since epoch. + end: The end of the row range (exclusive). Same as start, this can be a + datetime or a number of seconds since epoch. + Returns: + pbt_C.Filter: Filter passing only values' versions from the specified range. + """ + if isinstance(start, datetime): + start_timestamp = int(start.timestamp() * 1e6) + else: + start_timestamp = int(start * 1e6) + + if isinstance(end, datetime): + end_timestamp = int(end.timestamp() * 1e6) + else: + end_timestamp = int(end * 1e6) + + return core_ops.bigtable_timestamp_range_filter(start_timestamp, end_timestamp) + + +def timestamp_range_micros( + start_timestamp: Union[int, float], end_timestamp: Union[int, float] +): + """Create a filter passing all values which timestamp is + from the specified range, exclusive at the start and inclusive at the end. + Args: + start_timestamp: The start of the row range (inclusive). It is a number ( + int or float) representing number of microseconds since epoch. + end_timestamp: The end of the row range (exclusive). + Returns: + pbt_C.Filter: Filter passing only values' versions from the specified range. + """ + return core_ops.bigtable_timestamp_range_filter( + int(start_timestamp), int(end_timestamp) + ) From dd7fb509e2669b7404610d597e5211e09a6238c7 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Mon, 8 Nov 2021 12:07:00 +0100 Subject: [PATCH 2/9] add filters to python api --- tensorflow_io/core/BUILD | 2 ++ .../bigtable/bigtable_dataset_kernel.cc | 26 ++++++++++++++++++- .../bigtable/bigtable_version_filters.cc | 20 ++++++-------- tensorflow_io/core/ops/bigtable_ops.cc | 13 +++++++++- .../ops/bigtable/bigtable_dataset_ops.py | 12 +++++---- .../ops/bigtable/bigtable_version_filters.py | 12 ++++++--- 6 files changed, 62 insertions(+), 23 deletions(-) diff --git a/tensorflow_io/core/BUILD b/tensorflow_io/core/BUILD index 8979b00b4..faa6935f6 100644 --- a/tensorflow_io/core/BUILD +++ b/tensorflow_io/core/BUILD @@ -184,6 +184,8 @@ cc_library( "kernels/bigtable/bigtable_row_range.h", "kernels/bigtable/bigtable_row_set.cc", "kernels/bigtable/bigtable_row_set.h", + "kernels/bigtable/bigtable_version_filters.cc", + "kernels/bigtable/bigtable_version_filters.h", "ops/bigtable_ops.cc", ], copts = tf_io_copts(), diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index e77bd39fd..3e0108e41 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -23,6 +23,7 @@ limitations under the License. #include "tensorflow/core/framework/resource_mgr.h" #include "tensorflow/core/framework/resource_op_kernel.h" #include "tensorflow_io/core/kernels/bigtable/bigtable_row_set.h" +#include "tensorflow_io/core/kernels/bigtable/bigtable_version_filters.h" namespace cbt = ::google::cloud::bigtable; @@ -145,10 +146,19 @@ class Iterator : public DatasetIterator { const std::vector& columns) : DatasetIterator(params), columns_(ColumnsToFamiliesAndQualifiers(columns)), +<<<<<<< HEAD reader_(this->dataset()->CreateTable().ReadRows( this->dataset()->row_set(), cbt::Filter::Chain(CreateColumnsFilter(columns_), cbt::Filter::Latest(1)))), +======= + reader_( + this->dataset()->client_resource().CreateTable(table_id).ReadRows( + cbt::RowRange::InfiniteRange(), + cbt::Filter::Chain(CreateColumnsFilter(columns_), + this->dataset()->filter_resource().filter(), + cbt::Filter::Latest(1)))), +>>>>>>> 4489e24... add filters to python api it_(this->reader_.begin()), column_to_idx_(CreateColumnToIdxMap(columns_)) { VLOG(1) << "DatasetIterator ctor"; @@ -278,10 +288,13 @@ class Dataset : public DatasetBase { Dataset(OpKernelContext* ctx, const std::shared_ptr& data_client, cbt::RowSet row_set, std::string table_id, + io::BigtableFilterResource* filter_resource, std::vector columns) : DatasetBase(DatasetContext(ctx)), data_client_(data_client), row_set_(std::move(row_set)), + filter_resource_(*filter_resource), + filter_resource_unref_(filter_resource), table_id_(table_id), columns_(columns) { dtypes_.push_back(DT_STRING); @@ -319,6 +332,10 @@ class Dataset : public DatasetBase { return table; } + io::BigtableFilterResource& filter_resource() const { + return filter_resource_; + } + protected: Status AsGraphDefInternal(SerializationContext* ctx, DatasetGraphDefBuilder* b, @@ -332,6 +349,8 @@ class Dataset : public DatasetBase { private: std::shared_ptr const& data_client_; const cbt::RowSet row_set_; + io::BigtableFilterResource& filter_resource_; + const core::ScopedUnref filter_resource_unref_; const std::string table_id_; const std::vector columns_; DataTypeVector dtypes_; @@ -356,9 +375,14 @@ class BigtableDatasetOp : public DatasetOpKernel { OP_REQUIRES_OK(ctx, GetResourceFromContext(ctx, "row_set", &row_set_resource)); core::ScopedUnref row_set_resource_unref_(row_set_resource); + + io::BigtableFilterResource* filter_resource; + OP_REQUIRES_OK(ctx, + GetResourceFromContext(ctx, "filter", &filter_resource)); + core::ScopedUnref filter_resource_unref_(filter_resource); *output = new Dataset(ctx, client_resource->data_client(), - row_set_resource->row_set(), table_id_, columns_); + row_set_resource->row_set(), table_id_, filter_resource, columns_); } private: diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_version_filters.cc b/tensorflow_io/core/kernels/bigtable/bigtable_version_filters.cc index 63923e6b3..416015e3f 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_version_filters.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_version_filters.cc @@ -20,18 +20,16 @@ namespace tensorflow { namespace io { class BigtableLatestFilterOp - : public OpKernelCreatingResource { + : public AbstractBigtableResourceOp { public: explicit BigtableLatestFilterOp(OpKernelConstruction* ctx) - : OpKernelCreatingResource(ctx) { + : AbstractBigtableResourceOp(ctx) { VLOG(1) << "BigtableLatestFilterOp ctor "; } private: - Status CreateResource(BigtableFilterResource** resource) - TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override { - *resource = new BigtableFilterResource(cbt::Filter::Latest(1)); - return Status::OK(); + StatusOr CreateResource() override { + return new BigtableFilterResource(cbt::Filter::Latest(1)); } private: @@ -42,20 +40,18 @@ REGISTER_KERNEL_BUILDER(Name("BigtableLatestFilter").Device(DEVICE_CPU), BigtableLatestFilterOp); class BigtableTimestampRangeFilterOp - : public OpKernelCreatingResource { + : public AbstractBigtableResourceOp { public: explicit BigtableTimestampRangeFilterOp(OpKernelConstruction* ctx) - : OpKernelCreatingResource(ctx) { + : AbstractBigtableResourceOp(ctx) { VLOG(1) << "BigtableTimestampRangeFilterOp ctor "; OP_REQUIRES_OK(ctx, ctx->GetAttr("start", &start_)); OP_REQUIRES_OK(ctx, ctx->GetAttr("end", &end_)); } private: - Status CreateResource(BigtableFilterResource** resource) - TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override { - *resource = new BigtableFilterResource(cbt::Filter::TimestampRangeMicros(start_, end_)); - return Status::OK(); + StatusOr CreateResource() override { + return new BigtableFilterResource(cbt::Filter::TimestampRangeMicros(start_, end_)); } private: diff --git a/tensorflow_io/core/ops/bigtable_ops.cc b/tensorflow_io/core/ops/bigtable_ops.cc index f58a586bd..f9602df84 100644 --- a/tensorflow_io/core/ops/bigtable_ops.cc +++ b/tensorflow_io/core/ops/bigtable_ops.cc @@ -27,7 +27,11 @@ REGISTER_OP("BigtableClient") REGISTER_OP("BigtableDataset") .Input("client: resource") +<<<<<<< HEAD .Input("row_set: resource") +======= + .Input("filter: resource") +>>>>>>> 4489e24... add filters to python api .Attr("table_id: string") .Attr("columns: list(string) >= 1") .Output("handle: variant") @@ -107,11 +111,18 @@ REGISTER_OP("BigtableSplitRowSetEvenly") return tensorflow::Status::OK(); }); +REGISTER_OP("BigtableLatestFilter") + .Attr("container: string = ''") + .Attr("shared_name: string = ''") + .Output("filter: resource") + .SetIsStateful() + .SetShapeFn(shape_inference::ScalarShape); + REGISTER_OP("BigtableTimestampRangeFilter") .Attr("container: string = ''") .Attr("shared_name: string = ''") .Attr("start: int") - .Attr("start: int") + .Attr("end: int") .Output("filter: resource") .SetIsStateful() .SetShapeFn(shape_inference::ScalarShape); diff --git a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py index ca047a4d8..c7b8d12c6 100644 --- a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py +++ b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py @@ -2,6 +2,7 @@ from tensorflow.python.data.ops import dataset_ops from tensorflow.python.framework import tensor_spec from tensorflow_io.python.ops import core_ops +import tensorflow_io.python.ops.bigtable.bigtable_version_filters as filters from tensorflow.python.framework import dtypes import tensorflow as tf from tensorflow.python.data.ops import dataset_ops @@ -34,14 +35,15 @@ def __init__(self, client_resource, table_id: str): self._table_id = table_id self._client_resource = client_resource - def read_rows(self, columns: List[str], row_set: RowSet): - return _BigtableDataset(self._client_resource, self._table_id, columns, row_set) + def read_rows(self, columns: List[str], row_set: RowSet, filter: filters.BigtableFilter = filters.latest()): + return _BigtableDataset(self._client_resource, self._table_id, columns, row_set, filter) def parallel_read_rows( self, columns: List[str], num_parallel_calls=tf.data.AUTOTUNE, row_set: RowSet = from_rows_or_ranges(infinite()), + filter: filters.BigtableFilter = filters.latest() ): print("calling parallel read_rows with row_set:", row_set) @@ -50,7 +52,7 @@ def parallel_read_rows( ) def map_func(idx): - return self.read_rows(columns, RowSet(samples[idx])) + return self.read_rows(columns, RowSet(samples[idx]), filter) # We interleave a dataset of sample's indexes instead of a dataset of # samples, because Dataset.from_tensor_slices attempts to copy the @@ -69,14 +71,14 @@ class _BigtableDataset(dataset_ops.DatasetSource): """_BigtableDataset represents a dataset that retrieves keys and values.""" def __init__( - self, client_resource, table_id: str, columns: List[str], row_set: RowSet, + self, client_resource, table_id: str, columns: List[str], row_set: RowSet, filter ): self._table_id = table_id self._columns = columns self._element_spec = tf.TensorSpec(shape=[len(columns)], dtype=dtypes.string) variant_tensor = core_ops.bigtable_dataset( - client_resource, row_set._impl, table_id, columns + client_resource, row_set._impl, filter._impl, table_id, columns ) super().__init__(variant_tensor) diff --git a/tensorflow_io/python/ops/bigtable/bigtable_version_filters.py b/tensorflow_io/python/ops/bigtable/bigtable_version_filters.py index c209886c6..186c94af4 100644 --- a/tensorflow_io/python/ops/bigtable/bigtable_version_filters.py +++ b/tensorflow_io/python/ops/bigtable/bigtable_version_filters.py @@ -34,7 +34,7 @@ def latest(): Returns: pbt_C.Filter: Filter passing only most recent version of a value. """ - return core_ops.bigtable_latest_filter() + return BigtableFilter(core_ops.bigtable_latest_filter()) def timestamp_range( @@ -60,7 +60,9 @@ def timestamp_range( else: end_timestamp = int(end * 1e6) - return core_ops.bigtable_timestamp_range_filter(start_timestamp, end_timestamp) + return BigtableFilter( + core_ops.bigtable_timestamp_range_filter(start_timestamp, end_timestamp) + ) def timestamp_range_micros( @@ -75,6 +77,8 @@ def timestamp_range_micros( Returns: pbt_C.Filter: Filter passing only values' versions from the specified range. """ - return core_ops.bigtable_timestamp_range_filter( - int(start_timestamp), int(end_timestamp) + return BigtableFilter( + core_ops.bigtable_timestamp_range_filter( + int(start_timestamp), int(end_timestamp) + ) ) From 7dff227e89ebef23018f0d797dbfc8304f1ec013 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Mon, 22 Nov 2021 13:11:59 +0100 Subject: [PATCH 3/9] keep filter not resource, pr comments --- .../kernels/bigtable/bigtable_dataset_kernel.cc | 17 ++++++----------- .../bigtable/bigtable_version_filters.cc | 17 +++++------------ .../kernels/bigtable/bigtable_version_filters.h | 4 ++-- tensorflow_io/core/ops/bigtable_ops.cc | 3 --- 4 files changed, 13 insertions(+), 28 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index 3e0108e41..9ad398875 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -156,7 +156,7 @@ class Iterator : public DatasetIterator { this->dataset()->client_resource().CreateTable(table_id).ReadRows( cbt::RowRange::InfiniteRange(), cbt::Filter::Chain(CreateColumnsFilter(columns_), - this->dataset()->filter_resource().filter(), + this->dataset()->filter(), cbt::Filter::Latest(1)))), >>>>>>> 4489e24... add filters to python api it_(this->reader_.begin()), @@ -287,14 +287,12 @@ class Dataset : public DatasetBase { public: Dataset(OpKernelContext* ctx, const std::shared_ptr& data_client, - cbt::RowSet row_set, std::string table_id, - io::BigtableFilterResource* filter_resource, + cbt::RowSet row_set, cbt::Filter filter, std::string table_id, std::vector columns) : DatasetBase(DatasetContext(ctx)), data_client_(data_client), row_set_(std::move(row_set)), - filter_resource_(*filter_resource), - filter_resource_unref_(filter_resource), + filter_(std::move(filter)), table_id_(table_id), columns_(columns) { dtypes_.push_back(DT_STRING); @@ -332,9 +330,7 @@ class Dataset : public DatasetBase { return table; } - io::BigtableFilterResource& filter_resource() const { - return filter_resource_; - } + const cbt::Filter& filter() const { return filter_; } protected: Status AsGraphDefInternal(SerializationContext* ctx, @@ -349,8 +345,7 @@ class Dataset : public DatasetBase { private: std::shared_ptr const& data_client_; const cbt::RowSet row_set_; - io::BigtableFilterResource& filter_resource_; - const core::ScopedUnref filter_resource_unref_; + cbt::Filter filter_; const std::string table_id_; const std::vector columns_; DataTypeVector dtypes_; @@ -382,7 +377,7 @@ class BigtableDatasetOp : public DatasetOpKernel { core::ScopedUnref filter_resource_unref_(filter_resource); *output = new Dataset(ctx, client_resource->data_client(), - row_set_resource->row_set(), table_id_, filter_resource, columns_); + row_set_resource->row_set(), filter_resource->filter(), table_id_, columns_); } private: diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_version_filters.cc b/tensorflow_io/core/kernels/bigtable/bigtable_version_filters.cc index 416015e3f..d4793b6c8 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_version_filters.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_version_filters.cc @@ -31,9 +31,6 @@ class BigtableLatestFilterOp StatusOr CreateResource() override { return new BigtableFilterResource(cbt::Filter::Latest(1)); } - - private: - mutable mutex mu_; }; REGISTER_KERNEL_BUILDER(Name("BigtableLatestFilter").Device(DEVICE_CPU), @@ -45,19 +42,18 @@ class BigtableTimestampRangeFilterOp explicit BigtableTimestampRangeFilterOp(OpKernelConstruction* ctx) : AbstractBigtableResourceOp(ctx) { VLOG(1) << "BigtableTimestampRangeFilterOp ctor "; - OP_REQUIRES_OK(ctx, ctx->GetAttr("start", &start_)); - OP_REQUIRES_OK(ctx, ctx->GetAttr("end", &end_)); + OP_REQUIRES_OK(ctx, ctx->GetAttr("start", &start_ts_us_)); + OP_REQUIRES_OK(ctx, ctx->GetAttr("end", &end_ts_us_)); } private: StatusOr CreateResource() override { - return new BigtableFilterResource(cbt::Filter::TimestampRangeMicros(start_, end_)); + return new BigtableFilterResource(cbt::Filter::TimestampRangeMicros(start_ts_us_, end_ts_us_)); } private: - mutable mutex mu_; - int64_t start_; - int64_t end_; + int64_t start_ts_us_; + int64_t end_ts_us_; }; REGISTER_KERNEL_BUILDER(Name("BigtableTimestampRangeFilter").Device(DEVICE_CPU), @@ -81,9 +77,6 @@ class BigtablePrintFilterOp : public OpKernel { output_v(0) = resource->ToString(); } - - private: - mutable mutex mu_; }; REGISTER_KERNEL_BUILDER(Name("BigtablePrintFilter").Device(DEVICE_CPU), diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_version_filters.h b/tensorflow_io/core/kernels/bigtable/bigtable_version_filters.h index 235a1b21c..750168de3 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_version_filters.h +++ b/tensorflow_io/core/kernels/bigtable/bigtable_version_filters.h @@ -46,14 +46,14 @@ class BigtableFilterResource : public ResourceBase { return res; } - google::cloud::bigtable::Filter& filter() { return filter_; } + const google::cloud::bigtable::Filter& filter() const { return filter_; } string DebugString() const override { return "BigtableFilterResource:{" + ToString() + "}"; } private: - google::cloud::bigtable::Filter filter_; + const google::cloud::bigtable::Filter filter_; }; diff --git a/tensorflow_io/core/ops/bigtable_ops.cc b/tensorflow_io/core/ops/bigtable_ops.cc index f9602df84..1440b8807 100644 --- a/tensorflow_io/core/ops/bigtable_ops.cc +++ b/tensorflow_io/core/ops/bigtable_ops.cc @@ -27,11 +27,8 @@ REGISTER_OP("BigtableClient") REGISTER_OP("BigtableDataset") .Input("client: resource") -<<<<<<< HEAD .Input("row_set: resource") -======= .Input("filter: resource") ->>>>>>> 4489e24... add filters to python api .Attr("table_id: string") .Attr("columns: list(string) >= 1") .Output("handle: variant") From 473a9f6a12a00da0ac5e70ff3ac6187f07bda234 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Mon, 22 Nov 2021 13:14:24 +0100 Subject: [PATCH 4/9] fixed one more name in BigtableTimestampRangeFilterOp --- .../core/kernels/bigtable/bigtable_version_filters.cc | 4 ++-- tensorflow_io/core/ops/bigtable_ops.cc | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_version_filters.cc b/tensorflow_io/core/kernels/bigtable/bigtable_version_filters.cc index d4793b6c8..fde9425d0 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_version_filters.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_version_filters.cc @@ -42,8 +42,8 @@ class BigtableTimestampRangeFilterOp explicit BigtableTimestampRangeFilterOp(OpKernelConstruction* ctx) : AbstractBigtableResourceOp(ctx) { VLOG(1) << "BigtableTimestampRangeFilterOp ctor "; - OP_REQUIRES_OK(ctx, ctx->GetAttr("start", &start_ts_us_)); - OP_REQUIRES_OK(ctx, ctx->GetAttr("end", &end_ts_us_)); + OP_REQUIRES_OK(ctx, ctx->GetAttr("start_ts_us", &start_ts_us_)); + OP_REQUIRES_OK(ctx, ctx->GetAttr("end_ts_us", &end_ts_us_)); } private: diff --git a/tensorflow_io/core/ops/bigtable_ops.cc b/tensorflow_io/core/ops/bigtable_ops.cc index 1440b8807..35dcb7ad2 100644 --- a/tensorflow_io/core/ops/bigtable_ops.cc +++ b/tensorflow_io/core/ops/bigtable_ops.cc @@ -118,8 +118,8 @@ REGISTER_OP("BigtableLatestFilter") REGISTER_OP("BigtableTimestampRangeFilter") .Attr("container: string = ''") .Attr("shared_name: string = ''") - .Attr("start: int") - .Attr("end: int") + .Attr("start_ts_us: int") + .Attr("end_ts_us: int") .Output("filter: resource") .SetIsStateful() .SetShapeFn(shape_inference::ScalarShape); From 20551bbf69b88ba08897906f34d20d1869d99a89 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Mon, 22 Nov 2021 13:17:01 +0100 Subject: [PATCH 5/9] linter --- tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py index c7b8d12c6..15cab5dad 100644 --- a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py +++ b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py @@ -75,6 +75,7 @@ def __init__( ): self._table_id = table_id self._columns = columns + self._filter = filter self._element_spec = tf.TensorSpec(shape=[len(columns)], dtype=dtypes.string) variant_tensor = core_ops.bigtable_dataset( From 54b5b1985312b2c68a93164859321c7cde2275b1 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Mon, 22 Nov 2021 15:12:45 +0100 Subject: [PATCH 6/9] missed one merge conflict --- .../core/kernels/bigtable/bigtable_dataset_kernel.cc | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index 9ad398875..fa641ea6c 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -146,19 +146,12 @@ class Iterator : public DatasetIterator { const std::vector& columns) : DatasetIterator(params), columns_(ColumnsToFamiliesAndQualifiers(columns)), -<<<<<<< HEAD reader_(this->dataset()->CreateTable().ReadRows( this->dataset()->row_set(), - cbt::Filter::Chain(CreateColumnsFilter(columns_), - cbt::Filter::Latest(1)))), -======= - reader_( - this->dataset()->client_resource().CreateTable(table_id).ReadRows( - cbt::RowRange::InfiniteRange(), + cbt::Filter::Chain(CreateColumnsFilter(columns_), this->dataset()->filter(), cbt::Filter::Latest(1)))), ->>>>>>> 4489e24... add filters to python api it_(this->reader_.begin()), column_to_idx_(CreateColumnToIdxMap(columns_)) { VLOG(1) << "DatasetIterator ctor"; From 6f633e53fd98c59095c2edcd3f5eedfaf9ce0de1 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Mon, 22 Nov 2021 15:27:40 +0100 Subject: [PATCH 7/9] linting --- .../ops/bigtable/bigtable_dataset_ops.py | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py index 15cab5dad..e4af448f8 100644 --- a/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py +++ b/tensorflow_io/python/ops/bigtable/bigtable_dataset_ops.py @@ -35,15 +35,22 @@ def __init__(self, client_resource, table_id: str): self._table_id = table_id self._client_resource = client_resource - def read_rows(self, columns: List[str], row_set: RowSet, filter: filters.BigtableFilter = filters.latest()): - return _BigtableDataset(self._client_resource, self._table_id, columns, row_set, filter) + def read_rows( + self, + columns: List[str], + row_set: RowSet, + filter: filters.BigtableFilter = filters.latest(), + ): + return _BigtableDataset( + self._client_resource, self._table_id, columns, row_set, filter + ) def parallel_read_rows( self, columns: List[str], num_parallel_calls=tf.data.AUTOTUNE, row_set: RowSet = from_rows_or_ranges(infinite()), - filter: filters.BigtableFilter = filters.latest() + filter: filters.BigtableFilter = filters.latest(), ): print("calling parallel read_rows with row_set:", row_set) @@ -71,7 +78,12 @@ class _BigtableDataset(dataset_ops.DatasetSource): """_BigtableDataset represents a dataset that retrieves keys and values.""" def __init__( - self, client_resource, table_id: str, columns: List[str], row_set: RowSet, filter + self, + client_resource, + table_id: str, + columns: List[str], + row_set: RowSet, + filter, ): self._table_id = table_id self._columns = columns From 63a294a411a98f47c21d527e135efc593a863fa8 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Mon, 22 Nov 2021 15:28:04 +0100 Subject: [PATCH 8/9] lint2 --- .../kernels/bigtable/bigtable_dataset_kernel.cc | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index fa641ea6c..1ffa91178 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -148,10 +148,10 @@ class Iterator : public DatasetIterator { columns_(ColumnsToFamiliesAndQualifiers(columns)), reader_(this->dataset()->CreateTable().ReadRows( this->dataset()->row_set(), - - cbt::Filter::Chain(CreateColumnsFilter(columns_), - this->dataset()->filter(), - cbt::Filter::Latest(1)))), + + cbt::Filter::Chain(CreateColumnsFilter(columns_), + this->dataset()->filter(), + cbt::Filter::Latest(1)))), it_(this->reader_.begin()), column_to_idx_(CreateColumnToIdxMap(columns_)) { VLOG(1) << "DatasetIterator ctor"; @@ -363,14 +363,15 @@ class BigtableDatasetOp : public DatasetOpKernel { OP_REQUIRES_OK(ctx, GetResourceFromContext(ctx, "row_set", &row_set_resource)); core::ScopedUnref row_set_resource_unref_(row_set_resource); - + io::BigtableFilterResource* filter_resource; OP_REQUIRES_OK(ctx, GetResourceFromContext(ctx, "filter", &filter_resource)); core::ScopedUnref filter_resource_unref_(filter_resource); *output = new Dataset(ctx, client_resource->data_client(), - row_set_resource->row_set(), filter_resource->filter(), table_id_, columns_); + row_set_resource->row_set(), + filter_resource->filter(), table_id_, columns_); } private: From e093e3f745e64eb0537121a55473e4d5c33abd06 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Mon, 22 Nov 2021 15:39:34 +0100 Subject: [PATCH 9/9] removed obsolete line --- tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index 1ffa91178..4293bf96e 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -148,7 +148,6 @@ class Iterator : public DatasetIterator { columns_(ColumnsToFamiliesAndQualifiers(columns)), reader_(this->dataset()->CreateTable().ReadRows( this->dataset()->row_set(), - cbt::Filter::Chain(CreateColumnsFilter(columns_), this->dataset()->filter(), cbt::Filter::Latest(1)))),