Skip to content

Commit b413405

Browse files
authored
feat(zb-experimental): implement flush, close and finalize (#1619)
feat(zb-experimental): implement flush, close and finalize
1 parent 361235a commit b413405

File tree

2 files changed

+125
-16
lines changed

2 files changed

+125
-16
lines changed

google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,11 @@ def __init__(
114114
self.persisted_size: Optional[int] = None
115115

116116
async def state_lookup(self) -> int:
117-
"""Returns the persisted_size."""
117+
"""Returns the persisted_size
118+
119+
:rtype: int
120+
:returns: persisted size.
121+
"""
118122
await self.write_obj_stream.send(
119123
_storage_v2.BidiWriteObjectRequest(
120124
state_lookup=True,
@@ -142,18 +146,44 @@ async def append(self, data: bytes):
142146
raise NotImplementedError("append is not implemented yet.")
143147

144148
async def flush(self) -> int:
145-
"""Returns persisted_size"""
146-
raise NotImplementedError("flush is not implemented yet.")
149+
"""Flushes the data to the server.
150+
151+
:rtype: int
152+
:returns: The persisted size after flush.
153+
"""
154+
await self.write_obj_stream.send(
155+
_storage_v2.BidiWriteObjectRequest(
156+
flush=True,
157+
state_lookup=True,
158+
)
159+
)
160+
response = await self.write_obj_stream.recv()
161+
self.persisted_size = response.persisted_size
162+
self.offset = self.persisted_size
163+
return self.persisted_size
147164

148165
async def close(self, finalize_on_close=False) -> int:
149166
"""Returns persisted_size"""
150-
raise NotImplementedError("close is not implemented yet.")
167+
if finalize_on_close:
168+
await self.finalize()
169+
170+
await self.write_obj_stream.close()
171+
self._is_stream_open = False
172+
self.offset = None
173+
174+
async def finalize(self) -> _storage_v2.Object:
175+
"""Finalizes the Appendable Object.
151176
152-
async def finalize(self) -> int:
153-
"""Returns persisted_size
154177
Note: Once finalized no more data can be appended.
178+
179+
rtype: google.cloud.storage_v2.types.Object
180+
returns: The finalized object resource.
155181
"""
156-
raise NotImplementedError("finalize is not implemented yet.")
182+
await self.write_obj_stream.send(
183+
_storage_v2.BidiWriteObjectRequest(finish_write=True)
184+
)
185+
response = await self.write_obj_stream.recv()
186+
self.object_resource = response.resource
157187

158188
# helper methods.
159189
async def append_from_string(self, data: str):

tests/unit/asyncio/test_async_appendable_object_writer.py

Lines changed: 88 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -171,15 +171,6 @@ async def test_unimplemented_methods_raise_error(mock_client):
171171
with pytest.raises(NotImplementedError):
172172
await writer.append(b"data")
173173

174-
with pytest.raises(NotImplementedError):
175-
await writer.flush()
176-
177-
with pytest.raises(NotImplementedError):
178-
await writer.close()
179-
180-
with pytest.raises(NotImplementedError):
181-
await writer.finalize()
182-
183174
with pytest.raises(NotImplementedError):
184175
await writer.append_from_string("data")
185176

@@ -188,3 +179,91 @@ async def test_unimplemented_methods_raise_error(mock_client):
188179

189180
with pytest.raises(NotImplementedError):
190181
await writer.append_from_file("file.txt")
182+
183+
184+
@pytest.mark.asyncio
185+
@mock.patch(
186+
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
187+
)
188+
async def test_flush(mock_write_object_stream, mock_client):
189+
"""Test that flush sends the correct request and updates state."""
190+
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
191+
mock_stream = mock_write_object_stream.return_value
192+
mock_stream.send = mock.AsyncMock()
193+
mock_stream.recv = mock.AsyncMock(
194+
return_value=_storage_v2.BidiWriteObjectResponse(persisted_size=1024)
195+
)
196+
197+
persisted_size = await writer.flush()
198+
199+
expected_request = _storage_v2.BidiWriteObjectRequest(flush=True, state_lookup=True)
200+
mock_stream.send.assert_awaited_once_with(expected_request)
201+
mock_stream.recv.assert_awaited_once()
202+
assert writer.persisted_size == 1024
203+
assert writer.offset == 1024
204+
assert persisted_size == 1024
205+
206+
207+
@pytest.mark.asyncio
208+
@mock.patch(
209+
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
210+
)
211+
async def test_close_without_finalize(mock_write_object_stream, mock_client):
212+
"""Test close without finalizing."""
213+
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
214+
writer._is_stream_open = True
215+
writer.offset = 1024
216+
mock_stream = mock_write_object_stream.return_value
217+
mock_stream.close = mock.AsyncMock()
218+
writer.finalize = mock.AsyncMock()
219+
220+
await writer.close(finalize_on_close=False)
221+
222+
writer.finalize.assert_not_awaited()
223+
mock_stream.close.assert_awaited_once()
224+
assert not writer._is_stream_open
225+
assert writer.offset is None
226+
227+
228+
@pytest.mark.asyncio
229+
@mock.patch(
230+
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
231+
)
232+
async def test_close_with_finalize(mock_write_object_stream, mock_client):
233+
"""Test close with finalizing."""
234+
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
235+
writer._is_stream_open = True
236+
writer.offset = 1024
237+
mock_stream = mock_write_object_stream.return_value
238+
mock_stream.close = mock.AsyncMock()
239+
writer.finalize = mock.AsyncMock()
240+
241+
await writer.close(finalize_on_close=True)
242+
243+
writer.finalize.assert_awaited_once()
244+
mock_stream.close.assert_awaited_once()
245+
assert not writer._is_stream_open
246+
assert writer.offset is None
247+
248+
249+
@pytest.mark.asyncio
250+
@mock.patch(
251+
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
252+
)
253+
async def test_finalize(mock_write_object_stream, mock_client):
254+
"""Test that finalize sends the correct request and updates state."""
255+
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
256+
mock_stream = mock_write_object_stream.return_value
257+
mock_stream.send = mock.AsyncMock()
258+
mock_resource = _storage_v2.Object(name=OBJECT, bucket=BUCKET)
259+
mock_stream.recv = mock.AsyncMock(
260+
return_value=_storage_v2.BidiWriteObjectResponse(resource=mock_resource)
261+
)
262+
263+
await writer.finalize()
264+
265+
mock_stream.send.assert_awaited_once_with(
266+
_storage_v2.BidiWriteObjectRequest(finish_write=True)
267+
)
268+
mock_stream.recv.assert_awaited_once()
269+
assert writer.object_resource == mock_resource

0 commit comments

Comments
 (0)