Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enrichment Transform with BigTable handler #30001

Merged
merged 45 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
0ad521c
enrichment v1
riteshghorse Dec 15, 2023
e83bad7
add documentation
riteshghorse Dec 15, 2023
5162960
add doc comment
riteshghorse Dec 15, 2023
a45392d
rerun
riteshghorse Dec 18, 2023
9fdbeb3
update docs, lint
riteshghorse Dec 18, 2023
c541148
update docs, lint
riteshghorse Dec 18, 2023
5c9be0e
add generic type
riteshghorse Dec 18, 2023
9df679c
add generic type
riteshghorse Dec 18, 2023
883ff0d
adjust doc path
riteshghorse Dec 18, 2023
818bb8a
create test row
riteshghorse Dec 18, 2023
e1feeb8
use request type
riteshghorse Dec 18, 2023
40275e9
use request type
riteshghorse Dec 18, 2023
be67a88
change module name
riteshghorse Dec 20, 2023
27ed250
more tests
riteshghorse Jan 2, 2024
4af90f5
remove non-functional params
riteshghorse Jan 3, 2024
041fcd0
lint, doc
riteshghorse Jan 3, 2024
91f58b5
change types for general use
riteshghorse Jan 4, 2024
9fd6813
callable type
riteshghorse Jan 4, 2024
036eceb
dict type
riteshghorse Jan 4, 2024
021f9c4
update signatures
riteshghorse Jan 9, 2024
062b9ef
fix unit test
riteshghorse Jan 9, 2024
b11d3ea
bigtable with column family, ids, rrio-throttler
riteshghorse Jan 11, 2024
46e3a6d
update tests for row filter
riteshghorse Jan 11, 2024
433d5fa
convert handler types from dict to Row
riteshghorse Jan 11, 2024
d18d583
update tests for bigtable
riteshghorse Jan 12, 2024
c5e792c
ran pydocs
riteshghorse Jan 12, 2024
9285a5b
ran pydocs
riteshghorse Jan 12, 2024
641bdf7
mark postcommit
riteshghorse Jan 12, 2024
c36a21e
remove _test file, fix import
riteshghorse Jan 12, 2024
3989c16
enable postcommit
riteshghorse Jan 12, 2024
7102acd
add more tests
riteshghorse Jan 12, 2024
87e32bb
skip tests when dependencies are not installed
riteshghorse Jan 16, 2024
57efa52
add deleted imports from last commit
riteshghorse Jan 16, 2024
282c608
add skip test condition
riteshghorse Jan 16, 2024
deecdbc
fix import order, add TooManyRequests to try-catch
riteshghorse Jan 16, 2024
253633e
make throttler, repeater non-optional
riteshghorse Jan 16, 2024
932fae3
add exception level and tests
riteshghorse Jan 17, 2024
cf88d6f
correct pydoc statement
riteshghorse Jan 17, 2024
5b702d2
add throttle tests
riteshghorse Jan 17, 2024
18d9539
add bigtable improvements
riteshghorse Jan 18, 2024
6e251ce
default app_profile_id
riteshghorse Jan 18, 2024
20b8ba6
add documentation, ignore None assignment
riteshghorse Jan 18, 2024
27974b8
add to changes.md
riteshghorse Jan 18, 2024
7c9a03c
change test structure that throws exception, skip http test for now
riteshghorse Jan 18, 2024
5a626e3
drop postcommit trigger file
riteshghorse Jan 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
408 changes: 408 additions & 0 deletions sdks/python/apache_beam/io/requestresponse.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#
import base64
import sys
import typing
import unittest
from dataclasses import dataclass
from typing import Tuple
Expand All @@ -24,13 +25,18 @@
import urllib3

import apache_beam as beam
from apache_beam.io.requestresponseio import Caller
from apache_beam.io.requestresponseio import RequestResponseIO
from apache_beam.io.requestresponseio import UserCodeExecutionException
from apache_beam.io.requestresponseio import UserCodeQuotaException
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.testing.test_pipeline import TestPipeline

# pylint: disable=ungrouped-imports
try:
from apache_beam.io.requestresponse import Caller
from apache_beam.io.requestresponse import RequestResponseIO
from apache_beam.io.requestresponse import UserCodeExecutionException
from apache_beam.io.requestresponse import UserCodeQuotaException
except ImportError:
raise unittest.SkipTest('RequestResponseIO dependencies are not installed.')

_HTTP_PATH = '/v1/echo'
_PAYLOAD = base64.b64encode(bytes('payload', 'utf-8'))
_HTTP_ENDPOINT_ADDRESS_FLAG = '--httpEndpointAddress'
Expand Down Expand Up @@ -61,28 +67,27 @@ def _add_argparse_args(cls, parser) -> None:
help='The ID for an allocated quota that should exceed.')


# TODO(riteshghorse,damondouglas) replace Echo(Request|Response) with proto
# generated classes from .test-infra/mock-apis:
@dataclass
class EchoRequest:
class EchoResponse:
id: str
payload: bytes


@dataclass
class EchoResponse:
# TODO(riteshghorse,damondouglas) replace Echo(Request|Response) with proto
# generated classes from .test-infra/mock-apis:
class Request(typing.NamedTuple):
id: str
payload: bytes


class EchoHTTPCaller(Caller):
class EchoHTTPCaller(Caller[Request, EchoResponse]):
"""Implements ``Caller`` to call the ``EchoServiceGrpc``'s HTTP handler.
The purpose of ``EchoHTTPCaller`` is to support integration tests.
"""
def __init__(self, url: str):
self.url = url + _HTTP_PATH

def __call__(self, request: EchoRequest, *args, **kwargs) -> EchoResponse:
def __call__(self, request: Request, *args, **kwargs) -> EchoResponse:
"""Overrides ``Caller``'s call method invoking the
``EchoServiceGrpc``'s HTTP handler with an ``EchoRequest``, returning
either a successful ``EchoResponse`` or throwing either a
Expand Down Expand Up @@ -129,7 +134,7 @@ def setUpClass(cls) -> None:
def setUp(self) -> None:
client, options = EchoHTTPCallerTestIT._get_client_and_options()

req = EchoRequest(id=options.should_exceed_quota_id, payload=_PAYLOAD)
req = Request(id=options.should_exceed_quota_id, payload=_PAYLOAD)
try:
# The following is needed to exceed the API
client(req)
Expand All @@ -148,7 +153,7 @@ def _get_client_and_options(cls) -> Tuple[EchoHTTPCaller, EchoITOptions]:
def test_given_valid_request_receives_response(self):
client, options = EchoHTTPCallerTestIT._get_client_and_options()

req = EchoRequest(id=options.never_exceed_quota_id, payload=_PAYLOAD)
req = Request(id=options.never_exceed_quota_id, payload=_PAYLOAD)

response: EchoResponse = client(req)

Expand All @@ -158,20 +163,20 @@ def test_given_valid_request_receives_response(self):
def test_given_exceeded_quota_should_raise(self):
client, options = EchoHTTPCallerTestIT._get_client_and_options()

req = EchoRequest(id=options.should_exceed_quota_id, payload=_PAYLOAD)
req = Request(id=options.should_exceed_quota_id, payload=_PAYLOAD)

self.assertRaises(UserCodeQuotaException, lambda: client(req))

def test_not_found_should_raise(self):
client, _ = EchoHTTPCallerTestIT._get_client_and_options()

req = EchoRequest(id='i-dont-exist-quota-id', payload=_PAYLOAD)
req = Request(id='i-dont-exist-quota-id', payload=_PAYLOAD)
self.assertRaisesRegex(
UserCodeExecutionException, "Not Found", lambda: client(req))

def test_request_response_io(self):
client, options = EchoHTTPCallerTestIT._get_client_and_options()
req = EchoRequest(id=options.never_exceed_quota_id, payload=_PAYLOAD)
req = Request(id=options.never_exceed_quota_id, payload=_PAYLOAD)
with TestPipeline(is_integration_test=True) as test_pipeline:
output = (
test_pipeline
Expand Down
156 changes: 156 additions & 0 deletions sdks/python/apache_beam/io/requestresponse_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import time
import unittest

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline

# pylint: disable=ungrouped-imports
try:
from google.api_core.exceptions import TooManyRequests
from apache_beam.io.requestresponse import Caller, DefaultThrottler
from apache_beam.io.requestresponse import RequestResponseIO
from apache_beam.io.requestresponse import UserCodeExecutionException
from apache_beam.io.requestresponse import UserCodeTimeoutException
from apache_beam.io.requestresponse import retry_on_exception
except ImportError:
raise unittest.SkipTest('RequestResponseIO dependencies are not installed.')


class AckCaller(Caller[str, str]):
"""AckCaller acknowledges the incoming request by returning a
request with ACK."""
def __enter__(self):
pass

def __call__(self, request: str):
return f"ACK: {request}"

def __exit__(self, exc_type, exc_val, exc_tb):
return None


class CallerWithTimeout(AckCaller):
"""CallerWithTimeout sleeps for 2 seconds before responding.
Used to test timeout in RequestResponseIO."""
def __call__(self, request: str, *args, **kwargs):
time.sleep(2)
return f"ACK: {request}"


class CallerWithRuntimeError(AckCaller):
"""CallerWithRuntimeError raises a `RuntimeError` for RequestResponseIO
to raise a UserCodeExecutionException."""
def __call__(self, request: str, *args, **kwargs):
if not request:
raise RuntimeError("Exception expected, not an error.")


class CallerThatRetries(AckCaller):
def __init__(self):
self.count = -1

def __call__(self, request: str, *args, **kwargs):
try:
pass
except Exception as e:
raise e
finally:
self.count += 1
raise TooManyRequests('retries = %d' % self.count)


class TestCaller(unittest.TestCase):
def test_valid_call(self):
caller = AckCaller()
with TestPipeline() as test_pipeline:
output = (
test_pipeline
| beam.Create(["sample_request"])
| RequestResponseIO(caller=caller))

self.assertIsNotNone(output)

def test_call_timeout(self):
caller = CallerWithTimeout()
with self.assertRaises(UserCodeTimeoutException):
with TestPipeline() as test_pipeline:
_ = (
test_pipeline
| beam.Create(["timeout_request"])
| RequestResponseIO(caller=caller, timeout=1))

def test_call_runtime_error(self):
caller = CallerWithRuntimeError()
with self.assertRaises(UserCodeExecutionException):
with TestPipeline() as test_pipeline:
_ = (
test_pipeline
| beam.Create([""])
| RequestResponseIO(caller=caller))

def test_retry_on_exception(self):
self.assertFalse(retry_on_exception(RuntimeError()))
self.assertTrue(retry_on_exception(TooManyRequests("HTTP 429")))

def test_caller_backoff_retry_strategy(self):
caller = CallerThatRetries()
with self.assertRaises(TooManyRequests) as cm:
with TestPipeline() as test_pipeline:
_ = (
test_pipeline
| beam.Create(["sample_request"])
| RequestResponseIO(caller=caller))
self.assertRegex(cm.exception.message, 'retries = 2')

def test_caller_no_retry_strategy(self):
caller = CallerThatRetries()
with self.assertRaises(TooManyRequests) as cm:
with TestPipeline() as test_pipeline:
_ = (
test_pipeline
| beam.Create(["sample_request"])
| RequestResponseIO(caller=caller, repeater=None))
self.assertRegex(cm.exception.message, 'retries = 0')

def test_default_throttler(self):
caller = CallerWithTimeout()
throttler = DefaultThrottler(
window_ms=10000, bucket_ms=5000, overload_ratio=1)
# manually override the number of received requests for testing.
throttler.throttler._all_requests.add(time.time() * 1000, 100)
test_pipeline = TestPipeline()
_ = (
test_pipeline
| beam.Create(['sample_request'])
| RequestResponseIO(caller=caller, throttler=throttler))
result = test_pipeline.run()
result.wait_until_finish()
metrics = result.metrics().query(
beam.metrics.MetricsFilter().with_name('throttled_requests'))
self.assertEqual(metrics['counters'][0].committed, 1)
metrics = result.metrics().query(
beam.metrics.MetricsFilter().with_name('cumulativeThrottlingSeconds'))
self.assertGreater(metrics['counters'][0].committed, 0)
metrics = result.metrics().query(
beam.metrics.MetricsFilter().with_name('responses'))
self.assertEqual(metrics['counters'][0].committed, 1)


if __name__ == '__main__':
unittest.main()
Loading
Loading