Skip to content

Commit 786af55

Browse files
authored
feat(zb-experimental): implement "open" for write_object_stream (#1613)
feat(zb-experimental): implement "open" for write_object_stream
1 parent 0b70a28 commit 786af55

File tree

2 files changed

+211
-5
lines changed

2 files changed

+211
-5
lines changed

google/cloud/storage/_experimental/asyncio/async_write_object_stream.py

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,54 @@ async def open(self) -> None:
9595
"""Opening an object for write , should do it's state lookup
9696
to know what's the persisted size is.
9797
"""
98-
raise NotImplementedError(
99-
"open() is not implemented yet in _AsyncWriteObjectStream"
98+
if self._is_stream_open:
99+
raise ValueError("Stream is already open")
100+
101+
# Create a new object or overwrite existing one if generation_number
102+
# is None. This makes it consistent with GCS JSON API behavior.
103+
# Created object type would be Appendable Object.
104+
if self.generation_number is None:
105+
self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest(
106+
write_object_spec=_storage_v2.WriteObjectSpec(
107+
resource=_storage_v2.Object(
108+
name=self.object_name, bucket=self._full_bucket_name
109+
),
110+
appendable=True,
111+
),
112+
)
113+
else:
114+
self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest(
115+
append_object_spec=_storage_v2.AppendObjectSpec(
116+
bucket=self._full_bucket_name,
117+
object=self.object_name,
118+
generation=self.generation_number,
119+
),
120+
state_lookup=True,
121+
)
122+
123+
self.socket_like_rpc = AsyncBidiRpc(
124+
self.rpc, initial_request=self.first_bidi_write_req, metadata=self.metadata
100125
)
101126

127+
await self.socket_like_rpc.open() # this is actually 1 send
128+
response = await self.socket_like_rpc.recv()
129+
self._is_stream_open = True
130+
131+
if not response.resource:
132+
raise ValueError(
133+
"Failed to obtain object resource after opening the stream"
134+
)
135+
if not response.resource.generation:
136+
raise ValueError(
137+
"Failed to obtain object generation after opening the stream"
138+
)
139+
self.generation_number = response.resource.generation
140+
141+
if not response.write_handle:
142+
raise ValueError("Failed to obtain write_handle after opening the stream")
143+
144+
self.write_handle = response.write_handle
145+
102146
async def close(self) -> None:
103147
"""Closes the bidi-gRPC connection."""
104148
raise NotImplementedError(
@@ -132,3 +176,7 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse:
132176
raise NotImplementedError(
133177
"recv() is not implemented yet in _AsyncWriteObjectStream"
134178
)
179+
180+
@property
181+
def is_stream_open(self) -> bool:
182+
return self._is_stream_open

tests/unit/asyncio/test_async_write_object_stream.py

Lines changed: 161 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222

2323
BUCKET = "my-bucket"
2424
OBJECT = "my-object"
25+
GENERATION = 12345
26+
WRITE_HANDLE = b"test-handle"
2527

2628

2729
@pytest.fixture
@@ -91,13 +93,169 @@ def test_async_write_object_stream_init_raises_value_error():
9193

9294

9395
@pytest.mark.asyncio
94-
async def test_unimplemented_methods_raise_error(mock_client):
95-
"""Test that unimplemented methods raise NotImplementedError."""
96+
@mock.patch(
97+
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
98+
)
99+
async def test_open_for_new_object(mock_async_bidi_rpc, mock_client):
100+
"""Test opening a stream for a new object."""
101+
# Arrange
102+
socket_like_rpc = mock.AsyncMock()
103+
mock_async_bidi_rpc.return_value = socket_like_rpc
104+
socket_like_rpc.open = mock.AsyncMock()
105+
106+
mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
107+
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
108+
mock_response.resource.generation = GENERATION
109+
mock_response.write_handle = WRITE_HANDLE
110+
socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response)
111+
96112
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
97113

98-
with pytest.raises(NotImplementedError):
114+
# Act
115+
await stream.open()
116+
117+
# Assert
118+
assert stream._is_stream_open
119+
socket_like_rpc.open.assert_called_once()
120+
socket_like_rpc.recv.assert_called_once()
121+
assert stream.generation_number == GENERATION
122+
assert stream.write_handle == WRITE_HANDLE
123+
124+
125+
@pytest.mark.asyncio
126+
@mock.patch(
127+
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
128+
)
129+
async def test_open_for_existing_object(mock_async_bidi_rpc, mock_client):
130+
"""Test opening a stream for an existing object."""
131+
# Arrange
132+
socket_like_rpc = mock.AsyncMock()
133+
mock_async_bidi_rpc.return_value = socket_like_rpc
134+
socket_like_rpc.open = mock.AsyncMock()
135+
136+
mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
137+
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
138+
mock_response.resource.generation = GENERATION
139+
mock_response.write_handle = WRITE_HANDLE
140+
socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response)
141+
142+
stream = _AsyncWriteObjectStream(
143+
mock_client, BUCKET, OBJECT, generation_number=GENERATION
144+
)
145+
146+
# Act
147+
await stream.open()
148+
149+
# Assert
150+
assert stream._is_stream_open
151+
socket_like_rpc.open.assert_called_once()
152+
socket_like_rpc.recv.assert_called_once()
153+
assert stream.generation_number == GENERATION
154+
assert stream.write_handle == WRITE_HANDLE
155+
156+
157+
@pytest.mark.asyncio
158+
@mock.patch(
159+
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
160+
)
161+
async def test_open_when_already_open_raises_error(mock_async_bidi_rpc, mock_client):
162+
"""Test that opening an already open stream raises a ValueError."""
163+
# Arrange
164+
socket_like_rpc = mock.AsyncMock()
165+
mock_async_bidi_rpc.return_value = socket_like_rpc
166+
socket_like_rpc.open = mock.AsyncMock()
167+
168+
mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
169+
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
170+
mock_response.resource.generation = GENERATION
171+
mock_response.write_handle = WRITE_HANDLE
172+
socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response)
173+
174+
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
175+
await stream.open()
176+
177+
# Act & Assert
178+
with pytest.raises(ValueError, match="Stream is already open"):
99179
await stream.open()
100180

181+
182+
@pytest.mark.asyncio
183+
@mock.patch(
184+
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
185+
)
186+
async def test_open_raises_error_on_missing_object_resource(
187+
mock_async_bidi_rpc, mock_client
188+
):
189+
"""Test that open raises ValueError if object_resource is not in the response."""
190+
socket_like_rpc = mock.AsyncMock()
191+
mock_async_bidi_rpc.return_value = socket_like_rpc
192+
193+
mock_reponse = mock.AsyncMock()
194+
type(mock_reponse).resource = mock.PropertyMock(return_value=None)
195+
socket_like_rpc.recv.return_value = mock_reponse
196+
197+
# Note: Don't use below code as unittest library automatically assigns an
198+
# `AsyncMock` object to an attribute, if not set.
199+
# socket_like_rpc.recv.return_value = mock.AsyncMock(
200+
# return_value=_storage_v2.BidiWriteObjectResponse(resource=None)
201+
# )
202+
203+
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
204+
with pytest.raises(
205+
ValueError, match="Failed to obtain object resource after opening the stream"
206+
):
207+
await stream.open()
208+
209+
210+
@pytest.mark.asyncio
211+
@mock.patch(
212+
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
213+
)
214+
async def test_open_raises_error_on_missing_generation(
215+
mock_async_bidi_rpc, mock_client
216+
):
217+
"""Test that open raises ValueError if generation is not in the response."""
218+
socket_like_rpc = mock.AsyncMock()
219+
mock_async_bidi_rpc.return_value = socket_like_rpc
220+
221+
# Configure the mock response object
222+
mock_response = mock.AsyncMock()
223+
type(mock_response.resource).generation = mock.PropertyMock(return_value=None)
224+
socket_like_rpc.recv.return_value = mock_response
225+
226+
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
227+
with pytest.raises(
228+
ValueError, match="Failed to obtain object generation after opening the stream"
229+
):
230+
await stream.open()
231+
# assert stream.generation_number is None
232+
233+
234+
@pytest.mark.asyncio
235+
@mock.patch(
236+
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
237+
)
238+
async def test_open_raises_error_on_missing_write_handle(
239+
mock_async_bidi_rpc, mock_client
240+
):
241+
"""Test that open raises ValueError if write_handle is not in the response."""
242+
socket_like_rpc = mock.AsyncMock()
243+
mock_async_bidi_rpc.return_value = socket_like_rpc
244+
socket_like_rpc.recv = mock.AsyncMock(
245+
return_value=_storage_v2.BidiWriteObjectResponse(
246+
resource=_storage_v2.Object(generation=GENERATION), write_handle=None
247+
)
248+
)
249+
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
250+
with pytest.raises(ValueError, match="Failed to obtain write_handle"):
251+
await stream.open()
252+
253+
254+
@pytest.mark.asyncio
255+
async def test_unimplemented_methods_raise_error(mock_client):
256+
"""Test that unimplemented methods raise NotImplementedError."""
257+
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
258+
101259
with pytest.raises(NotImplementedError):
102260
await stream.close()
103261

0 commit comments

Comments
 (0)