Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enrichment Transform with BigTable handler #30001

Merged
merged 45 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
0ad521c
enrichment v1
riteshghorse Dec 15, 2023
e83bad7
add documentation
riteshghorse Dec 15, 2023
5162960
add doc comment
riteshghorse Dec 15, 2023
a45392d
rerun
riteshghorse Dec 18, 2023
9fdbeb3
update docs, lint
riteshghorse Dec 18, 2023
c541148
update docs, lint
riteshghorse Dec 18, 2023
5c9be0e
add generic type
riteshghorse Dec 18, 2023
9df679c
add generic type
riteshghorse Dec 18, 2023
883ff0d
adjust doc path
riteshghorse Dec 18, 2023
818bb8a
create test row
riteshghorse Dec 18, 2023
e1feeb8
use request type
riteshghorse Dec 18, 2023
40275e9
use request type
riteshghorse Dec 18, 2023
be67a88
change module name
riteshghorse Dec 20, 2023
27ed250
more tests
riteshghorse Jan 2, 2024
4af90f5
remove non-functional params
riteshghorse Jan 3, 2024
041fcd0
lint, doc
riteshghorse Jan 3, 2024
91f58b5
change types for general use
riteshghorse Jan 4, 2024
9fd6813
callable type
riteshghorse Jan 4, 2024
036eceb
dict type
riteshghorse Jan 4, 2024
021f9c4
update signatures
riteshghorse Jan 9, 2024
062b9ef
fix unit test
riteshghorse Jan 9, 2024
b11d3ea
bigtable with column family, ids, rrio-throttler
riteshghorse Jan 11, 2024
46e3a6d
update tests for row filter
riteshghorse Jan 11, 2024
433d5fa
convert handler types from dict to Row
riteshghorse Jan 11, 2024
d18d583
update tests for bigtable
riteshghorse Jan 12, 2024
c5e792c
ran pydocs
riteshghorse Jan 12, 2024
9285a5b
ran pydocs
riteshghorse Jan 12, 2024
641bdf7
mark postcommit
riteshghorse Jan 12, 2024
c36a21e
remove _test file, fix import
riteshghorse Jan 12, 2024
3989c16
enable postcommit
riteshghorse Jan 12, 2024
7102acd
add more tests
riteshghorse Jan 12, 2024
87e32bb
skip tests when dependencies are not installed
riteshghorse Jan 16, 2024
57efa52
add deleted imports from last commit
riteshghorse Jan 16, 2024
282c608
add skip test condition
riteshghorse Jan 16, 2024
deecdbc
fix import order, add TooManyRequests to try-catch
riteshghorse Jan 16, 2024
253633e
make throttler, repeater non-optional
riteshghorse Jan 16, 2024
932fae3
add exception level and tests
riteshghorse Jan 17, 2024
cf88d6f
correct pydoc statement
riteshghorse Jan 17, 2024
5b702d2
add throttle tests
riteshghorse Jan 17, 2024
18d9539
add bigtable improvements
riteshghorse Jan 18, 2024
6e251ce
default app_profile_id
riteshghorse Jan 18, 2024
20b8ba6
add documentation, ignore None assignment
riteshghorse Jan 18, 2024
27974b8
add to changes.md
riteshghorse Jan 18, 2024
7c9a03c
change test structure that throws exception, skip http test for now
riteshghorse Jan 18, 2024
5a626e3
drop postcommit trigger file
riteshghorse Jan 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions sdks/python/apache_beam/io/requestresponse.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,14 +278,14 @@ def __init__(
timeout (float): timeout value in seconds to wait for response from API.
should_backoff (~apache_beam.io.requestresponse.ShouldBackOff):
(Optional) provides methods for backoff.
repeater (~apache_beam.io.requestresponse.Repeater): (Optional)
provides methods to repeat requests to API.
repeater (~apache_beam.io.requestresponse.Repeater): provides methods to
repeat requests to API.
cache_reader (~apache_beam.io.requestresponse.CacheReader): (Optional)
provides methods to read external cache.
cache_writer (~apache_beam.io.requestresponse.CacheWriter): (Optional)
provides methods to write to external cache.
throttler (~apache_beam.io.requestresponse.PreCallThrottler):
(Optional) provides methods to pre-throttle a request.
provides methods to pre-throttle a request.
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
"""
self._caller = caller
self._timeout = timeout
Expand Down Expand Up @@ -387,7 +387,7 @@ def process(self, request: RequestT, *args, **kwargs):
_LOGGER.info(
"Delaying request for %d seconds" % self._throttler.delay_secs)
time.sleep(self._throttler.delay_secs)
self._metrics_collector.throttled_secs.inc(5)
self._metrics_collector.throttled_secs.inc(self._throttler.delay_secs)
is_throttled_request = True

if is_throttled_request:
Expand Down
25 changes: 24 additions & 1 deletion sdks/python/apache_beam/io/requestresponse_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
# pylint: disable=ungrouped-imports
try:
from google.api_core.exceptions import TooManyRequests
from apache_beam.io.requestresponse import Caller
from apache_beam.io.requestresponse import Caller, DefaultThrottler
from apache_beam.io.requestresponse import RequestResponseIO
from apache_beam.io.requestresponse import UserCodeExecutionException
from apache_beam.io.requestresponse import UserCodeTimeoutException
Expand Down Expand Up @@ -128,6 +128,29 @@ def test_caller_no_retry_strategy(self):
| RequestResponseIO(caller=caller, repeater=None))
self.assertRegex(cm.exception.message, 'retries = 0')

def test_default_throttler(self):
caller = CallerWithTimeout()
throttler = DefaultThrottler(
window_ms=10000, bucket_ms=5000, overload_ratio=1)
# manually override the number of received requests for testing.
throttler.throttler._all_requests.add(time.time() * 1000, 100)
test_pipeline = TestPipeline()
_ = (
test_pipeline
| beam.Create(['sample_request'])
| RequestResponseIO(caller=caller, throttler=throttler))
result = test_pipeline.run()
result.wait_until_finish()
metrics = result.metrics().query(
beam.metrics.MetricsFilter().with_name('throttled_requests'))
self.assertEqual(metrics['counters'][0].committed, 1)
metrics = result.metrics().query(
beam.metrics.MetricsFilter().with_name('cumulativeThrottlingSeconds'))
self.assertGreater(metrics['counters'][0].committed, 0)
metrics = result.metrics().query(
beam.metrics.MetricsFilter().with_name('responses'))
self.assertEqual(metrics['counters'][0].committed, 1)


if __name__ == '__main__':
unittest.main()
20 changes: 18 additions & 2 deletions sdks/python/apache_beam/transforms/enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import apache_beam as beam
from apache_beam.io.requestresponse import DEFAULT_TIMEOUT_SECS
from apache_beam.io.requestresponse import Caller
from apache_beam.io.requestresponse import DefaultThrottler
from apache_beam.io.requestresponse import ExponentialBackOffRepeater
from apache_beam.io.requestresponse import PreCallThrottler
from apache_beam.io.requestresponse import Repeater
from apache_beam.io.requestresponse import RequestResponseIO

__all__ = [
Expand Down Expand Up @@ -96,20 +100,32 @@ class Enrichment(beam.PTransform[beam.PCollection[InputT],
join_fn: A lambda function to join original element with lookup metadata.
Defaults to `CROSS_JOIN`.
timeout: (Optional) timeout for source requests. Defaults to 30 seconds.
repeater (~apache_beam.io.requestresponse.Repeater): provides methods to
repeat requests to API.
throttler (~apache_beam.io.requestresponse.PreCallThrottler):
provides methods to pre-throttle a request.
"""
def __init__(
self,
source_handler: EnrichmentSourceHandler,
join_fn: JoinFn = cross_join,
timeout: Optional[float] = DEFAULT_TIMEOUT_SECS):
timeout: Optional[float] = DEFAULT_TIMEOUT_SECS,
repeater: Repeater = ExponentialBackOffRepeater(),
throttler: PreCallThrottler = DefaultThrottler(),
):
self._source_handler = source_handler
self._join_fn = join_fn
self._timeout = timeout
self._repeater = repeater
self._throttler = throttler

def expand(self,
input_row: beam.PCollection[InputT]) -> beam.PCollection[OutputT]:
fetched_data = input_row | RequestResponseIO(
caller=self._source_handler, timeout=self._timeout)
caller=self._source_handler,
timeout=self._timeout,
repeater=self._repeater,
throttler=self._throttler)

# EnrichmentSourceHandler returns a tuple of (request,response).
return fetched_data | beam.Map(
Expand Down
44 changes: 29 additions & 15 deletions sdks/python/apache_beam/transforms/enrichment_handlers/bigtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
from typing import Optional

from google.api_core.exceptions import NotFound
from google.cloud import bigtable
from google.cloud.bigtable import Client
from google.cloud.bigtable.row_filters import CellsColumnLimitFilter
from google.cloud.bigtable.row_filters import RowFilter

import apache_beam as beam
Expand All @@ -41,12 +43,12 @@ class ExceptionLevel(Enum):
returns an empty row.

Members:
- WARNING_ONLY: Log a warning for exception without raising it.
- RAISE: Raise the exception.
- WARN: Log a warning for exception without raising it.
- QUIET: Neither log nor raise the exception.
"""
WARNING_ONLY = 0
RAISE = 1
RAISE = 0
WARN = 1
QUIET = 2


Expand All @@ -63,32 +65,43 @@ class EnrichWithBigTable(EnrichmentSourceHandler[beam.Row, beam.Row]):
to use as `row_key` for BigTable querying.
row_filter: a ``:class:`google.cloud.bigtable.row_filters.RowFilter``` to
filter data read with ``read_row()``.
Defaults to `CellsColumnLimitFilter(1)`.
app_profile_id (str): App profile ID to use for BigTable.
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
encoding (str): encoding type to convert the string to bytes and vice-versa
from BigTable. Default is `utf-8`.
exception_level: a `enum.Enum` value from
``apache_beam.transforms.enrichment_handlers.bigtable.ExceptionLevel``
to set the level when an empty row is returned from the BigTable query.
Defaults to ``ExceptionLevel.QUIET``.
Defaults to ``ExceptionLevel.WARN``.
"""
def __init__(
self,
project_id: str,
instance_id: str,
table_id: str,
row_key: str,
row_filter: Optional[RowFilter] = None,
exception_level: ExceptionLevel = ExceptionLevel.QUIET,
row_filter: Optional[RowFilter] = CellsColumnLimitFilter(1),
app_profile_id: str = "",
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
encoding: str = 'utf-8',
exception_level: ExceptionLevel = ExceptionLevel.WARN,
):
self._project_id = project_id
self._instance_id = instance_id
self._table_id = table_id
self._row_key = row_key
self._row_filter = row_filter
self._app_profile_id = app_profile_id
self._encoding = encoding
self._exception_level = exception_level

def __enter__(self):
"""connect to the Google BigTable cluster."""
self.client = Client(project=self._project_id)
self.instance = self.client.instance(self._instance_id)
self._table = self.instance.table(self._table_id)
self._table = bigtable.table.Table(
table_id=self._table_id,
instance=self.instance,
app_profile_id=self._app_profile_id)

def __call__(self, request: beam.Row, *args, **kwargs):
"""
Expand All @@ -99,27 +112,28 @@ def __call__(self, request: beam.Row, *args, **kwargs):
request: the input `beam.Row` to enrich.
"""
response_dict: Dict[str, Any] = {}
row_key: str = ""
row_key_str: str = ""
try:
request_dict = request._asdict()
row_key = str(request_dict[self._row_key]).encode()
row_key_str = str(request_dict[self._row_key])
row_key = row_key_str.encode(self._encoding)
row = self._table.read_row(row_key, filter_=self._row_filter)
if row:
damccorm marked this conversation as resolved.
Show resolved Hide resolved
for cf_id, cf_v in row.cells.items():
response_dict[cf_id] = {}
for k, v in cf_v.items():
response_dict[cf_id][k.decode('utf-8')] = \
v[0].value.decode('utf-8')
elif self._exception_level == ExceptionLevel.WARNING_ONLY:
response_dict[cf_id][k.decode(self._encoding)] = \
v[0].value.decode(self._encoding)
elif self._exception_level == ExceptionLevel.WARN:
_LOGGER.warning(
'no matching row found for row_key: %s '
'with row_filter: %s' % (row_key, self._row_filter))
'with row_filter: %s' % (row_key_str, self._row_filter))
elif self._exception_level == ExceptionLevel.RAISE:
raise ValueError(
'no matching row found for row_key: %s '
'with row_filter=%s' % (row_key, self._row_filter))
'with row_filter=%s' % (row_key_str, self._row_filter))
except KeyError:
raise KeyError('row_key %s not found in input PCollection.' % row_key)
raise KeyError('row_key %s not found in input PCollection.' % row_key_str)
except NotFound:
raise NotFound(
'GCP BigTable cluster `%s:%s:%s` not found.' %
Expand Down
Loading