diff --git a/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/rest_asyncio.py.j2 b/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/rest_asyncio.py.j2 index a36477b7ce..0968e88007 100644 --- a/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/rest_asyncio.py.j2 +++ b/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/rest_asyncio.py.j2 @@ -22,6 +22,8 @@ from google.api_core import exceptions as core_exceptions from google.api_core import gapic_v1 from google.api_core import retry_async as retries from google.api_core import rest_helpers +{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2137): raise an import error if an older version of google.api.core is installed. #} +from google.api_core import rest_streaming_async # type: ignore try: from google.api_core import rest_streaming_async # type: ignore @@ -128,30 +130,27 @@ class Async{{service.name}}RestTransport(_Base{{ service.name }}RestTransport): def __hash__(self): return hash("Async{{service.name}}RestTransport.{{method.name}}") - {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2168): Implement server streaming method. #} {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2169): Implement client streaming method. #} {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2170): Implement long running operation method. #} {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2171): Implement pager method. #} - {% if method.http_options and not method.client_streaming and not method.server_streaming and not method.lro and not method.extended_lro and not method.paged_result_field %} + {% if method.http_options and not method.client_streaming and not method.lro and not method.extended_lro and not method.paged_result_field %} {% set body_spec = method.http_options[0].body %} {{ shared_macros.response_method(body_spec, is_async=True)|indent(8) }} - {% endif %}{# method.http_options and not method.client_streaming and not method.server_streaming and not method.lro and not method.extended_lro and not method.paged_result_field #} + {% endif %}{# method.http_options and not method.client_streaming and not method.lro and not method.extended_lro and not method.paged_result_field #} async def __call__(self, request: {{method.input.ident}}, *, retry: OptionalRetry=gapic_v1.method.DEFAULT, timeout: Optional[float]=None, metadata: Sequence[Tuple[str, str]]=(), - {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2168): Update return type for server streaming method. #} {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2169): Update return type for client streaming method. #} {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2170): Update the return type for long running operation method. #} {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2171): Update the return type for pager method. #} - ){% if not method.void %} -> {% if not method.server_streaming %}{{method.output.ident}}{% else %}None{% endif %}{% endif %}: - {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2168): Implement server streaming method. #} + ){% if not method.void %} -> {% if not method.server_streaming %}{{method.output.ident}}{% else %}rest_streaming_async.AsyncResponseIterator{% endif %}{% endif %}: {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2169): Implement client streaming method. #} {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2170): Implement long running operation method. #} {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2171): Implement pager method. #} - {% if method.http_options and not method.client_streaming and not method.server_streaming and not method.lro and not method.extended_lro and not method.paged_result_field %} + {% if method.http_options and not method.client_streaming and not method.lro and not method.extended_lro and not method.paged_result_field %} r"""Call the {{- ' ' -}} {{ (method.name|snake_case).replace('_',' ')|wrap( width=70, offset=45, indent=8) }} @@ -178,14 +177,19 @@ class Async{{service.name}}RestTransport(_Base{{ service.name }}RestTransport): {% if not method.void %} # Return the response + {% if method.server_streaming %} + resp = rest_streaming_async.AsyncResponseIterator(response, {{method.output.ident}}) + {% else %} resp = {{method.output.ident}}() + {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2189): Investigate if the proto-plus conversion below is needed for a streaming response. #} {% if method.output.ident.is_proto_plus_type %} pb_resp = {{method.output.ident}}.pb(resp) {% else %} pb_resp = resp - {% endif %} + {% endif %}{# if method.output.ident.is_proto_plus_type #} content = await response.read() json_format.Parse(content, pb_resp, ignore_unknown_fields=True) + {% endif %}{# if method.server_streaming #} return resp {% endif %}{# method.void #} @@ -194,7 +198,7 @@ class Async{{service.name}}RestTransport(_Base{{ service.name }}RestTransport): raise NotImplementedError( "Method {{ method.name }} is not available over REST transport" ) - {% endif %}{# method.http_options and not method.client_streaming and not method.server_streaming and not method.lro and not method.extended_lro and not method.paged_result_field #} + {% endif %}{# method.http_options and not method.client_streaming and not method.lro and not method.extended_lro and not method.paged_result_field #} {% endfor %} {% for method in service.methods.values()|sort(attribute="name") %} diff --git a/gapic/templates/tests/unit/gapic/%name_%version/%sub/test_%service.py.j2 b/gapic/templates/tests/unit/gapic/%name_%version/%sub/test_%service.py.j2 index 060e2090fc..e8fdb0c8aa 100644 --- a/gapic/templates/tests/unit/gapic/%name_%version/%sub/test_%service.py.j2 +++ b/gapic/templates/tests/unit/gapic/%name_%version/%sub/test_%service.py.j2 @@ -20,7 +20,7 @@ except ImportError: # pragma: NO COVER import grpc from grpc.experimental import aio {% if "rest" in opts.transport %} -from collections.abc import Iterable +from collections.abc import Iterable, AsyncIterable from google.protobuf import json_format import json {% endif %} @@ -114,6 +114,11 @@ from google.iam.v1 import policy_pb2 # type: ignore {% endfilter %} {{ shared_macros.add_google_api_core_version_header_import(service.version) }} +async def mock_async_gen(data, chunk_size=1): + for i in range(0, len(data)): # pragma: NO COVER + chunk = data[i : i + chunk_size] + yield chunk.encode("utf-8") + def client_cert_source_callback(): return b"cert bytes", b"key bytes" diff --git a/gapic/templates/tests/unit/gapic/%name_%version/%sub/test_macros.j2 b/gapic/templates/tests/unit/gapic/%name_%version/%sub/test_macros.j2 index 58a5cb4212..cad0948afb 100644 --- a/gapic/templates/tests/unit/gapic/%name_%version/%sub/test_macros.j2 +++ b/gapic/templates/tests/unit/gapic/%name_%version/%sub/test_macros.j2 @@ -1004,14 +1004,13 @@ def test_{{ method_name }}_raw_page_lro(): {% with method_name = method.safe_name|snake_case + "_unary" if method.extended_lro and not full_extended_lro else method.name|snake_case, method_output = method.extended_lro.operation_type if method.extended_lro and not full_extended_lro else method.output %}{% if method.http_options %} {# TODO(kbandes): remove this if condition when lro and client streaming are supported. #} {% if not method.client_streaming %} -{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2168): Remove unit test for server streaming method. #} {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2170): Remove unit test for long running operation method. #} {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2171): Remove unit test for pager method. #} {# NOTE: This guard is added to avoid generating duplicate tests for methods which are tested elsewhere. As we implement each of the api methods # in the `macro::call_success_test`, the case will be removed from this condition below. # TODO(https://github.com/googleapis/gapic-generator-python/issues/2143): Remove the test `test_{{ method_name }}_rest` from here once the linked issue is resolved. #} -{% if method.server_streaming or method.lro or method.extended_lro or method.paged_result_field %} +{% if method.lro or method.extended_lro or method.paged_result_field %} @pytest.mark.parametrize("request_type", [ {{ method.input.ident }}, dict, @@ -1914,15 +1913,15 @@ def test_unsupported_parameter_rest_asyncio(): {% endmacro %} {# is_rest_unsupported_method renders: - # 'True' if transport is async REST. - # 'True' if transport is sync REST and method is a client streaming method. + # 'True' if transport is async REST and method is one of [client_streaming, lro, extended_lro, paged_result_field]. + # 'True' if transport is sync REST and method is a client_streaming method. # 'False' otherwise. #} {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2152): Update this method as we add support for methods in async REST. # There are no plans to add support for client streaming. #} {% macro is_rest_unsupported_method(method, is_async) %} -{%- if method.client_streaming or (is_async and (method.server_streaming or method.lro or method.extended_lro or method.paged_result_field)) -%} +{%- if method.client_streaming or (is_async and (method.lro or method.extended_lro or method.paged_result_field)) -%} {{'True'}} {%- else -%} {{'False'}} @@ -2062,7 +2061,7 @@ def test_initialize_client_w_{{transport_name}}(): {# call_success_test generates tests for rest methods # when they make a successful request. # NOTE: Currently, this macro does not support the following method - # types: [method.server_streaming, method.lro, method.extended_lro, method.paged_result_field]. + # types: [method.lro, method.extended_lro, method.paged_result_field]. # As support is added for the above methods, the relevant guard can be removed from within the macro # TODO(https://github.com/googleapis/gapic-generator-python/issues/2142): Clean up `rest_required_tests` as we add support for each of the method types metioned above. #} @@ -2076,14 +2075,13 @@ def test_initialize_client_w_{{transport_name}}(): # (method.extended_lro and not full_extended_lro) #} {% set method_output = method.output %} -{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2168): Add unit test for server streaming method. #} {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2170): Add unit test for long running operation method. #} {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2171): Add unit test for pager method. #} {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2143): Update the guard below as we add support for each method, and keep it in sync with the guard in # `rest_required_tests`, which should be the exact opposite. Remove it once we have all the methods supported in async rest transport that are supported in sync rest transport. #} -{% if not (method.server_streaming or method.lro or method.extended_lro or method.paged_result_field)%} -{{async_decorator}} +{% if not (method.lro or method.extended_lro or method.paged_result_field)%} +{{ async_decorator }} @pytest.mark.parametrize("request_type", [ {{ method.input.ident }}, dict, @@ -2232,14 +2230,33 @@ def test_initialize_client_w_{{transport_name}}(): {% endif %}{# method.output.ident.is_proto_plus_type #} json_return_value = json_format.MessageToJson(return_value) {% endif %}{# method.void #} + {% if method.server_streaming %} + json_return_value = "[{}]".format(json_return_value) + {% if is_async %} + response_value.content.return_value = mock_async_gen(json_return_value) + {% else %}{# not is_async #} + response_value.iter_content = mock.Mock(return_value=iter(json_return_value)) + {% endif %}{# is_async #} + {% else %}{# not method.streaming #} {% if is_async %} response_value.read = mock.AsyncMock(return_value=json_return_value.encode('UTF-8')) - {% else %}{# is_async #} + {% else %}{# not is_async #} response_value.content = json_return_value.encode('UTF-8') {% endif %}{# is_async #} + {% endif %}{# method.server_streaming #} req.return_value = response_value response = {{ await_prefix }}client.{{ method_name }}(request) - + + {% if method.server_streaming %} + {% if is_async %} + assert isinstance(response, AsyncIterable) + response = await response.__anext__() + {% else %} + assert isinstance(response, Iterable) + response = next(response) + {% endif %} + {% endif %} + # Establish that the response is the type that we expect. {% if method.void %} assert response is None diff --git a/tests/integration/goldens/asset/tests/unit/gapic/asset_v1/test_asset_service.py b/tests/integration/goldens/asset/tests/unit/gapic/asset_v1/test_asset_service.py index bf59bfe5fd..42085b1e26 100755 --- a/tests/integration/goldens/asset/tests/unit/gapic/asset_v1/test_asset_service.py +++ b/tests/integration/goldens/asset/tests/unit/gapic/asset_v1/test_asset_service.py @@ -23,7 +23,7 @@ import grpc from grpc.experimental import aio -from collections.abc import Iterable +from collections.abc import Iterable, AsyncIterable from google.protobuf import json_format import json import math @@ -71,6 +71,11 @@ import google.auth +async def mock_async_gen(data, chunk_size=1): + for i in range(0, len(data)): # pragma: NO COVER + chunk = data[i : i + chunk_size] + yield chunk.encode("utf-8") + def client_cert_source_callback(): return b"cert bytes", b"key bytes" diff --git a/tests/integration/goldens/credentials/tests/unit/gapic/credentials_v1/test_iam_credentials.py b/tests/integration/goldens/credentials/tests/unit/gapic/credentials_v1/test_iam_credentials.py index d619524de2..bd3c89eda9 100755 --- a/tests/integration/goldens/credentials/tests/unit/gapic/credentials_v1/test_iam_credentials.py +++ b/tests/integration/goldens/credentials/tests/unit/gapic/credentials_v1/test_iam_credentials.py @@ -23,7 +23,7 @@ import grpc from grpc.experimental import aio -from collections.abc import Iterable +from collections.abc import Iterable, AsyncIterable from google.protobuf import json_format import json import math @@ -61,6 +61,11 @@ import google.auth +async def mock_async_gen(data, chunk_size=1): + for i in range(0, len(data)): # pragma: NO COVER + chunk = data[i : i + chunk_size] + yield chunk.encode("utf-8") + def client_cert_source_callback(): return b"cert bytes", b"key bytes" diff --git a/tests/integration/goldens/eventarc/tests/unit/gapic/eventarc_v1/test_eventarc.py b/tests/integration/goldens/eventarc/tests/unit/gapic/eventarc_v1/test_eventarc.py index de351f98ee..fa6e719880 100755 --- a/tests/integration/goldens/eventarc/tests/unit/gapic/eventarc_v1/test_eventarc.py +++ b/tests/integration/goldens/eventarc/tests/unit/gapic/eventarc_v1/test_eventarc.py @@ -23,7 +23,7 @@ import grpc from grpc.experimental import aio -from collections.abc import Iterable +from collections.abc import Iterable, AsyncIterable from google.protobuf import json_format import json import math @@ -81,6 +81,11 @@ import google.auth +async def mock_async_gen(data, chunk_size=1): + for i in range(0, len(data)): # pragma: NO COVER + chunk = data[i : i + chunk_size] + yield chunk.encode("utf-8") + def client_cert_source_callback(): return b"cert bytes", b"key bytes" diff --git a/tests/integration/goldens/logging/tests/unit/gapic/logging_v2/test_config_service_v2.py b/tests/integration/goldens/logging/tests/unit/gapic/logging_v2/test_config_service_v2.py index 07991caf30..80b4f75e6e 100755 --- a/tests/integration/goldens/logging/tests/unit/gapic/logging_v2/test_config_service_v2.py +++ b/tests/integration/goldens/logging/tests/unit/gapic/logging_v2/test_config_service_v2.py @@ -61,6 +61,11 @@ import google.auth +async def mock_async_gen(data, chunk_size=1): + for i in range(0, len(data)): # pragma: NO COVER + chunk = data[i : i + chunk_size] + yield chunk.encode("utf-8") + def client_cert_source_callback(): return b"cert bytes", b"key bytes" diff --git a/tests/integration/goldens/logging/tests/unit/gapic/logging_v2/test_logging_service_v2.py b/tests/integration/goldens/logging/tests/unit/gapic/logging_v2/test_logging_service_v2.py index dffd6bebbf..bb21903760 100755 --- a/tests/integration/goldens/logging/tests/unit/gapic/logging_v2/test_logging_service_v2.py +++ b/tests/integration/goldens/logging/tests/unit/gapic/logging_v2/test_logging_service_v2.py @@ -62,6 +62,11 @@ import google.auth +async def mock_async_gen(data, chunk_size=1): + for i in range(0, len(data)): # pragma: NO COVER + chunk = data[i : i + chunk_size] + yield chunk.encode("utf-8") + def client_cert_source_callback(): return b"cert bytes", b"key bytes" diff --git a/tests/integration/goldens/logging/tests/unit/gapic/logging_v2/test_metrics_service_v2.py b/tests/integration/goldens/logging/tests/unit/gapic/logging_v2/test_metrics_service_v2.py index 4dea5183c1..a80079b399 100755 --- a/tests/integration/goldens/logging/tests/unit/gapic/logging_v2/test_metrics_service_v2.py +++ b/tests/integration/goldens/logging/tests/unit/gapic/logging_v2/test_metrics_service_v2.py @@ -60,6 +60,11 @@ import google.auth +async def mock_async_gen(data, chunk_size=1): + for i in range(0, len(data)): # pragma: NO COVER + chunk = data[i : i + chunk_size] + yield chunk.encode("utf-8") + def client_cert_source_callback(): return b"cert bytes", b"key bytes" diff --git a/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/rest_asyncio.py b/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/rest_asyncio.py index 277c815b8a..cdd7c50c88 100755 --- a/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/rest_asyncio.py +++ b/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/rest_asyncio.py @@ -27,6 +27,7 @@ from google.api_core import gapic_v1 from google.api_core import retry_async as retries from google.api_core import rest_helpers +from google.api_core import rest_streaming_async # type: ignore try: from google.api_core import rest_streaming_async # type: ignore diff --git a/tests/integration/goldens/redis/tests/unit/gapic/redis_v1/test_cloud_redis.py b/tests/integration/goldens/redis/tests/unit/gapic/redis_v1/test_cloud_redis.py index dd0401d527..70f57f83c3 100755 --- a/tests/integration/goldens/redis/tests/unit/gapic/redis_v1/test_cloud_redis.py +++ b/tests/integration/goldens/redis/tests/unit/gapic/redis_v1/test_cloud_redis.py @@ -23,7 +23,7 @@ import grpc from grpc.experimental import aio -from collections.abc import Iterable +from collections.abc import Iterable, AsyncIterable from google.protobuf import json_format import json import math @@ -84,6 +84,11 @@ import google.auth +async def mock_async_gen(data, chunk_size=1): + for i in range(0, len(data)): # pragma: NO COVER + chunk = data[i : i + chunk_size] + yield chunk.encode("utf-8") + def client_cert_source_callback(): return b"cert bytes", b"key bytes" diff --git a/tests/system/test_streams.py b/tests/system/test_streams.py index 1e89d46059..b4adc6ee51 100644 --- a/tests/system/test_streams.py +++ b/tests/system/test_streams.py @@ -114,50 +114,45 @@ def test_stream_stream_passing_dict(echo): @pytest.mark.asyncio async def test_async_unary_stream_reader(async_echo): - # TODO(https://github.com/googleapis/gapic-generator-python/issues/2168): Add test for async rest server-streaming. - if "rest" in str(async_echo.transport).lower(): - with pytest.raises(NotImplementedError): - call = await async_echo.expand() - return - content = 'The hail in Wales falls mainly on the snails.' - call = await async_echo.expand({ + stream = await async_echo.expand({ 'content': content, }, metadata=_METADATA) + # Note: gRPC exposes `read`, REST exposes `__anext__` to read + # a chunk of response from the stream. + response_attr = '__anext__' if "rest" in str( + async_echo.transport).lower() else 'read' + # Consume the response and ensure it matches what we expect. - # with pytest.raises(exceptions.NotFound) as exc: for ground_truth in content.split(' '): - response = await call.read() + response = await getattr(stream, response_attr)() assert response.content == ground_truth assert ground_truth == 'snails.' - trailing_metadata = await call.trailing_metadata() - assert _METADATA[0] in trailing_metadata.items() + # Note: trailing metadata is part of a gRPC response. + if "grpc" in str(async_echo.transport).lower(): + trailing_metadata = await stream.trailing_metadata() + assert _METADATA[0] in trailing_metadata.items() @pytest.mark.asyncio async def test_async_unary_stream_async_generator(async_echo): - # TODO(https://github.com/googleapis/gapic-generator-python/issues/2168): Add test for async rest server-streaming. - if "rest" in str(async_echo.transport).lower(): - with pytest.raises(NotImplementedError): - call = await async_echo.expand() - return - content = 'The hail in Wales falls mainly on the snails.' - call = await async_echo.expand({ + stream = await async_echo.expand({ 'content': content, }, metadata=_METADATA) # Consume the response and ensure it matches what we expect. - # with pytest.raises(exceptions.NotFound) as exc: tokens = iter(content.split(' ')) - async for response in call: + async for response in stream: ground_truth = next(tokens) assert response.content == ground_truth assert ground_truth == 'snails.' - trailing_metadata = await call.trailing_metadata() - assert _METADATA[0] in trailing_metadata.items() + # Note: trailing metadata is part of a gRPC response. + if "grpc" in str(async_echo.transport).lower(): + trailing_metadata = await stream.trailing_metadata() + assert _METADATA[0] in trailing_metadata.items() @pytest.mark.asyncio async def test_async_stream_unary_iterable(async_echo):