Skip to content
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
a38c396
feat(zb-experimental): add async write object stream
chandra-siri Nov 13, 2025
aaee2f3
remove unused import and add license info
chandra-siri Nov 13, 2025
4db8bf1
remove unwated test
chandra-siri Nov 13, 2025
8dbd158
feat(zb-experimental): implement "open" for write_object_stream
chandra-siri Nov 14, 2025
03f1fde
remove unused import
chandra-siri Nov 14, 2025
f0d3439
feat(zb-experimental): implement close
chandra-siri Nov 14, 2025
e3b6f9e
feat(zb-experimental): implement send & recv
chandra-siri Nov 14, 2025
b24535f
feat(zb-experimental): Add Async_appendable_object_writer.py
chandra-siri Nov 14, 2025
eae1b36
feat(zb-experimental): implement state_lookup
chandra-siri Nov 14, 2025
788848a
implement tests for state_lookup
chandra-siri Nov 14, 2025
07c9b44
feat(zb-experimental): implement open in writer
chandra-siri Nov 14, 2025
3930823
add type annotation for state_lookup
chandra-siri Nov 14, 2025
222aef2
state_lookup should return persisted_size instead of proto
chandra-siri Nov 14, 2025
3eae403
Merge branch 'bidi-writes-6' of github.com:googleapis/python-storage …
chandra-siri Nov 14, 2025
79835fa
persisted size changes
chandra-siri Nov 14, 2025
cc5e12d
feat(zb-experimental): implement flush, close and finalize
chandra-siri Nov 16, 2025
57aa63e
Update doc strings
chandra-siri Nov 16, 2025
55e0e0d
feat(zb-experimental): implement append
chandra-siri Nov 16, 2025
55d495d
add test_append_data_two_times
chandra-siri Nov 16, 2025
6fa5e11
add doc string for AsyncAppendableObjectWriter
chandra-siri Nov 16, 2025
a8eba4f
Merge branch 'bidi-writes-5' of github.com:googleapis/python-storage …
chandra-siri Nov 16, 2025
d8859ee
Merge branch 'bidi-writes-6' of github.com:googleapis/python-storage …
chandra-siri Nov 16, 2025
c1cbd86
Merge branch 'bidi-writes-7' of github.com:googleapis/python-storage …
chandra-siri Nov 16, 2025
150556b
Merge branch 'bidi-writes-8' of github.com:googleapis/python-storage …
chandra-siri Nov 16, 2025
c4c79e6
correct comment size
chandra-siri Nov 16, 2025
96ff8dc
update the append logic to reduce copying data
chandra-siri Nov 17, 2025
244635f
Merge branch 'main' into bidi-writes-4
chandra-siri Nov 17, 2025
1981418
add missedout test after merge conflict resolution
chandra-siri Nov 17, 2025
ebd5c10
Merge branch 'bidi-writes-4' of github.com:googleapis/python-storage …
chandra-siri Nov 17, 2025
2a22cf7
Merge branch 'bidi-writes-5' of github.com:googleapis/python-storage …
chandra-siri Nov 17, 2025
aee1feb
Merge branch 'bidi-writes-6' of github.com:googleapis/python-storage …
chandra-siri Nov 17, 2025
cdaa25f
Merge branch 'bidi-writes-7' of github.com:googleapis/python-storage …
chandra-siri Nov 17, 2025
2766c46
Merge branch 'bidi-writes-8' of github.com:googleapis/python-storage …
chandra-siri Nov 17, 2025
62c8fd7
remove local testing code
chandra-siri Nov 18, 2025
ad9e4b9
Merge branch 'main' of github.com:googleapis/python-storage into bidi…
chandra-siri Nov 19, 2025
de4b74d
Merge branch 'main' into bidi-writes-9
chandra-siri Nov 20, 2025
9ede745
address comments and improve doc string
chandra-siri Nov 20, 2025
a18bf87
raise ValueError if stream is not open
chandra-siri Nov 20, 2025
dc2df55
Merge branch 'main' into bidi-writes-9
chandra-siri Nov 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
)


_MAX_CHUNK_SIZE_BYTES = 2 * 1024 * 1024 # 2 MiB
_MAX_BUFFER_SIZE_BYTES = 150 * 1024 * 1024 # 16 MiB


class AsyncAppendableObjectWriter:
"""Class for appending data to a GCS Appendable Object asynchronously."""

Expand Down Expand Up @@ -143,7 +147,35 @@ async def open(self) -> None:
_ = await self.state_lookup()

async def append(self, data: bytes):
raise NotImplementedError("append is not implemented yet.")
if not self._is_stream_open:
raise ValueError("Stream is not open. Call open() before append().")
total_bytes = len(data)
if total_bytes == 0:
# TODO: add warning.
return
if self.offset is None:
assert self.persisted_size is not None
self.offset = self.persisted_size

start_idx = 0
bytes_to_flush = 0
while start_idx < total_bytes:
end_idx = min(start_idx + _MAX_CHUNK_SIZE_BYTES, total_bytes)
await self.write_obj_stream.send(
_storage_v2.BidiWriteObjectRequest(
write_offset=self.offset,
checksummed_data=_storage_v2.ChecksummedData(
content=data[start_idx:end_idx]
),
)
)
chunk_size = end_idx - start_idx
self.offset += chunk_size
bytes_to_flush += chunk_size
if bytes_to_flush >= _MAX_BUFFER_SIZE_BYTES:
await self.flush()
bytes_to_flush = 0
start_idx = end_idx

async def flush(self) -> int:
"""Flushes the data to the server.
Expand Down
141 changes: 138 additions & 3 deletions tests/unit/asyncio/test_async_appendable_object_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,6 @@ async def test_unimplemented_methods_raise_error(mock_client):
"""Test that all currently unimplemented methods raise NotImplementedError."""
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)

with pytest.raises(NotImplementedError):
await writer.append(b"data")

with pytest.raises(NotImplementedError):
await writer.append_from_string("data")

Expand Down Expand Up @@ -267,3 +264,141 @@ async def test_finalize(mock_write_object_stream, mock_client):
)
mock_stream.recv.assert_awaited_once()
assert writer.object_resource == mock_resource


@pytest.mark.asyncio
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
)
async def test_append_raises_error_if_not_open(mock_write_object_stream, mock_client):
"""Test that append raises an error if the stream is not open."""
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
with pytest.raises(ValueError, match="Stream is not open"):
await writer.append(b"some data")


@pytest.mark.asyncio
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
)
async def test_append_with_empty_data(mock_write_object_stream, mock_client):
"""Test that append does nothing if data is empty."""
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
writer._is_stream_open = True
mock_stream = mock_write_object_stream.return_value
mock_stream.send = mock.AsyncMock()

await writer.append(b"")

mock_stream.send.assert_not_awaited()


@pytest.mark.asyncio
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
)
async def test_append_sends_data_in_chunks(mock_write_object_stream, mock_client):
"""Test that append sends data in chunks and updates offset."""
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
_MAX_CHUNK_SIZE_BYTES,
)

writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
writer._is_stream_open = True
writer.persisted_size = 100
mock_stream = mock_write_object_stream.return_value
mock_stream.send = mock.AsyncMock()
writer.flush = mock.AsyncMock()

data = b"a" * (_MAX_CHUNK_SIZE_BYTES + 1)
await writer.append(data)

assert mock_stream.send.await_count == 2
first_call = mock_stream.send.await_args_list[0]
second_call = mock_stream.send.await_args_list[1]

# First chunk
assert first_call[0][0].write_offset == 100
assert len(first_call[0][0].checksummed_data.content) == _MAX_CHUNK_SIZE_BYTES

# Second chunk
assert second_call[0][0].write_offset == 100 + _MAX_CHUNK_SIZE_BYTES
assert len(second_call[0][0].checksummed_data.content) == 1

assert writer.offset == 100 + len(data)
writer.flush.assert_not_awaited()


@pytest.mark.asyncio
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
)
async def test_append_flushes_when_buffer_is_full(
mock_write_object_stream, mock_client
):
"""Test that append flushes the stream when the buffer size is reached."""
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
_MAX_BUFFER_SIZE_BYTES,
)

writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
writer._is_stream_open = True
writer.persisted_size = 0
mock_stream = mock_write_object_stream.return_value
mock_stream.send = mock.AsyncMock()
writer.flush = mock.AsyncMock()

data = b"a" * _MAX_BUFFER_SIZE_BYTES
await writer.append(data)

writer.flush.assert_awaited_once()


@pytest.mark.asyncio
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
)
async def test_append_handles_large_data(mock_write_object_stream, mock_client):
"""Test that append handles data larger than the buffer size."""
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
_MAX_BUFFER_SIZE_BYTES,
)

writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
writer._is_stream_open = True
writer.persisted_size = 0
mock_stream = mock_write_object_stream.return_value
mock_stream.send = mock.AsyncMock()
writer.flush = mock.AsyncMock()

data = b"a" * (_MAX_BUFFER_SIZE_BYTES * 2 + 1)
await writer.append(data)

assert writer.flush.await_count == 2


@pytest.mark.asyncio
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
)
async def test_append_data_two_times(mock_write_object_stream, mock_client):
"""Test that append sends data correctly when called multiple times."""
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
_MAX_CHUNK_SIZE_BYTES,
)

writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
writer._is_stream_open = True
writer.persisted_size = 0
mock_stream = mock_write_object_stream.return_value
mock_stream.send = mock.AsyncMock()
writer.flush = mock.AsyncMock()

data1 = b"a" * (_MAX_CHUNK_SIZE_BYTES + 10)
await writer.append(data1)

data2 = b"b" * (_MAX_CHUNK_SIZE_BYTES + 20)
await writer.append(data2)

total_data_length = len(data1) + len(data2)
assert writer.offset == total_data_length