Skip to content

Commit 6da1186

Browse files
authored
feat(zb-experimental): implement send & recv (#1615)
feat(zb-experimental): implement send & recv
1 parent 7a53221 commit 6da1186

File tree

2 files changed

+80
-13
lines changed

2 files changed

+80
-13
lines changed

google/cloud/storage/_experimental/asyncio/async_write_object_stream.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,9 @@ async def send(
160160
The request message to send. This is typically used to specify
161161
the read offset and limit.
162162
"""
163-
raise NotImplementedError(
164-
"send() is not implemented yet in _AsyncWriteObjectStream"
165-
)
163+
if not self._is_stream_open:
164+
raise ValueError("Stream is not open")
165+
await self.socket_like_rpc.send(bidi_write_object_request)
166166

167167
async def recv(self) -> _storage_v2.BidiWriteObjectResponse:
168168
"""Receives a response from the stream.
@@ -174,9 +174,9 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse:
174174
:class:`~google.cloud._storage_v2.types.BidiWriteObjectResponse`:
175175
The response message from the server.
176176
"""
177-
raise NotImplementedError(
178-
"recv() is not implemented yet in _AsyncWriteObjectStream"
179-
)
177+
if not self._is_stream_open:
178+
raise ValueError("Stream is not open")
179+
return await self.socket_like_rpc.recv()
180180

181181
@property
182182
def is_stream_open(self) -> bool:

tests/unit/asyncio/test_async_write_object_stream.py

Lines changed: 74 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ async def instantiate_write_obj_stream(mock_client, mock_cls_async_bidi_rpc, ope
4949
socket_like_rpc = AsyncMock()
5050
mock_cls_async_bidi_rpc.return_value = socket_like_rpc
5151
socket_like_rpc.open = AsyncMock()
52+
socket_like_rpc.send = AsyncMock()
5253
socket_like_rpc.close = AsyncMock()
5354

5455
mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
@@ -313,11 +314,77 @@ async def test_close_without_open_should_raise_error(
313314
@mock.patch(
314315
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
315316
)
316-
async def test_unimplemented_methods_raise_error(mock_async_bidi_rpc, mock_client):
317-
"""Test that unimplemented methods (send, recv) raise NotImplementedError."""
318-
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
319-
with pytest.raises(NotImplementedError):
320-
await stream.send(_storage_v2.BidiWriteObjectRequest())
317+
async def test_send(mock_cls_async_bidi_rpc, mock_client):
318+
"""Test that send calls the underlying rpc's send method."""
319+
# Arrange
320+
write_obj_stream = await instantiate_write_obj_stream(
321+
mock_client, mock_cls_async_bidi_rpc, open=True
322+
)
323+
324+
# Act
325+
bidi_write_object_request = _storage_v2.BidiWriteObjectRequest()
326+
await write_obj_stream.send(bidi_write_object_request)
327+
328+
# Assert
329+
write_obj_stream.socket_like_rpc.send.assert_called_once_with(
330+
bidi_write_object_request
331+
)
332+
333+
334+
@pytest.mark.asyncio
335+
@mock.patch(
336+
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
337+
)
338+
async def test_send_without_open_should_raise_error(
339+
mock_cls_async_bidi_rpc, mock_client
340+
):
341+
"""Test that sending on a stream that is not open raises a ValueError."""
342+
# Arrange
343+
write_obj_stream = await instantiate_write_obj_stream(
344+
mock_client, mock_cls_async_bidi_rpc, open=False
345+
)
346+
347+
# Act & Assert
348+
with pytest.raises(ValueError, match="Stream is not open"):
349+
await write_obj_stream.send(_storage_v2.BidiWriteObjectRequest())
350+
351+
352+
@pytest.mark.asyncio
353+
@mock.patch(
354+
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
355+
)
356+
async def test_recv(mock_cls_async_bidi_rpc, mock_client):
357+
"""Test that recv calls the underlying rpc's recv method."""
358+
# Arrange
359+
write_obj_stream = await instantiate_write_obj_stream(
360+
mock_client, mock_cls_async_bidi_rpc, open=True
361+
)
362+
bidi_write_object_response = _storage_v2.BidiWriteObjectResponse()
363+
write_obj_stream.socket_like_rpc.recv = AsyncMock(
364+
return_value=bidi_write_object_response
365+
)
366+
367+
# Act
368+
response = await write_obj_stream.recv()
369+
370+
# Assert
371+
write_obj_stream.socket_like_rpc.recv.assert_called_once()
372+
assert response == bidi_write_object_response
373+
321374

322-
with pytest.raises(NotImplementedError):
323-
await stream.recv()
375+
@pytest.mark.asyncio
376+
@mock.patch(
377+
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
378+
)
379+
async def test_recv_without_open_should_raise_error(
380+
mock_cls_async_bidi_rpc, mock_client
381+
):
382+
"""Test that receiving on a stream that is not open raises a ValueError."""
383+
# Arrange
384+
write_obj_stream = await instantiate_write_obj_stream(
385+
mock_client, mock_cls_async_bidi_rpc, open=False
386+
)
387+
388+
# Act & Assert
389+
with pytest.raises(ValueError, match="Stream is not open"):
390+
await write_obj_stream.recv()

0 commit comments

Comments
 (0)