Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ async def send(
The request message to send. This is typically used to specify
the read offset and limit.
"""
raise NotImplementedError(
"send() is not implemented yet in _AsyncWriteObjectStream"
)
if not self._is_stream_open:
raise ValueError("Stream is not open")
await self.socket_like_rpc.send(bidi_write_object_request)

async def recv(self) -> _storage_v2.BidiWriteObjectResponse:
"""Receives a response from the stream.
Expand All @@ -174,9 +174,9 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse:
:class:`~google.cloud._storage_v2.types.BidiWriteObjectResponse`:
The response message from the server.
"""
raise NotImplementedError(
"recv() is not implemented yet in _AsyncWriteObjectStream"
)
if not self._is_stream_open:
raise ValueError("Stream is not open")
return await self.socket_like_rpc.recv()

@property
def is_stream_open(self) -> bool:
Expand Down
81 changes: 74 additions & 7 deletions tests/unit/asyncio/test_async_write_object_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ async def instantiate_write_obj_stream(mock_client, mock_cls_async_bidi_rpc, ope
socket_like_rpc = AsyncMock()
mock_cls_async_bidi_rpc.return_value = socket_like_rpc
socket_like_rpc.open = AsyncMock()
socket_like_rpc.send = AsyncMock()
socket_like_rpc.close = AsyncMock()

mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
Expand Down Expand Up @@ -313,11 +314,77 @@ async def test_close_without_open_should_raise_error(
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
)
async def test_unimplemented_methods_raise_error(mock_async_bidi_rpc, mock_client):
"""Test that unimplemented methods (send, recv) raise NotImplementedError."""
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
with pytest.raises(NotImplementedError):
await stream.send(_storage_v2.BidiWriteObjectRequest())
async def test_send(mock_cls_async_bidi_rpc, mock_client):
"""Test that send calls the underlying rpc's send method."""
# Arrange
write_obj_stream = await instantiate_write_obj_stream(
mock_client, mock_cls_async_bidi_rpc, open=True
)

# Act
bidi_write_object_request = _storage_v2.BidiWriteObjectRequest()
await write_obj_stream.send(bidi_write_object_request)

# Assert
write_obj_stream.socket_like_rpc.send.assert_called_once_with(
bidi_write_object_request
)


@pytest.mark.asyncio
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
)
async def test_send_without_open_should_raise_error(
mock_cls_async_bidi_rpc, mock_client
):
"""Test that sending on a stream that is not open raises a ValueError."""
# Arrange
write_obj_stream = await instantiate_write_obj_stream(
mock_client, mock_cls_async_bidi_rpc, open=False
)

# Act & Assert
with pytest.raises(ValueError, match="Stream is not open"):
await write_obj_stream.send(_storage_v2.BidiWriteObjectRequest())


@pytest.mark.asyncio
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
)
async def test_recv(mock_cls_async_bidi_rpc, mock_client):
"""Test that recv calls the underlying rpc's recv method."""
# Arrange
write_obj_stream = await instantiate_write_obj_stream(
mock_client, mock_cls_async_bidi_rpc, open=True
)
bidi_write_object_response = _storage_v2.BidiWriteObjectResponse()
write_obj_stream.socket_like_rpc.recv = AsyncMock(
return_value=bidi_write_object_response
)

# Act
response = await write_obj_stream.recv()

# Assert
write_obj_stream.socket_like_rpc.recv.assert_called_once()
assert response == bidi_write_object_response


with pytest.raises(NotImplementedError):
await stream.recv()
@pytest.mark.asyncio
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
)
async def test_recv_without_open_should_raise_error(
mock_cls_async_bidi_rpc, mock_client
):
"""Test that receiving on a stream that is not open raises a ValueError."""
# Arrange
write_obj_stream = await instantiate_write_obj_stream(
mock_client, mock_cls_async_bidi_rpc, open=False
)

# Act & Assert
with pytest.raises(ValueError, match="Stream is not open"):
await write_obj_stream.recv()