Skip to content

Commit fdd0a50

Browse files
authored
feat(zb-experimental): Add Async_appendable_object_writer.py (#1616)
feat(zb-experimental): Add Async_appendable_object_writer.py
1 parent 6da1186 commit fdd0a50

File tree

2 files changed

+273
-0
lines changed

2 files changed

+273
-0
lines changed
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""
15+
NOTE:
16+
This is _experimental module for upcoming support for Rapid Storage.
17+
(https://cloud.google.com/blog/products/storage-data-transfer/high-performance-storage-innovations-for-ai-hpc#:~:text=your%20AI%20workloads%3A-,Rapid%20Storage,-%3A%20A%20new)
18+
19+
APIs may not work as intended and are not stable yet. Feature is not
20+
GA(Generally Available) yet, please contact your TAM (Technical Account Manager)
21+
if you want to use these Rapid Storage APIs.
22+
23+
"""
24+
from typing import Optional
25+
from google.cloud.storage._experimental.asyncio.async_grpc_client import (
26+
AsyncGrpcClient,
27+
)
28+
from google.cloud.storage._experimental.asyncio.async_write_object_stream import (
29+
_AsyncWriteObjectStream,
30+
)
31+
32+
33+
class AsyncAppendableObjectWriter:
34+
"""Class for appending data to a GCS Appendable Object asynchronously."""
35+
36+
def __init__(
37+
self,
38+
client: AsyncGrpcClient.grpc_client,
39+
bucket_name: str,
40+
object_name: str,
41+
generation=None,
42+
write_handle=None,
43+
):
44+
"""
45+
Class for appending data to a GCS Appendable Object.
46+
47+
Example usage:
48+
49+
```
50+
51+
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
52+
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import AsyncAppendableObjectWriter
53+
import asyncio
54+
55+
client = AsyncGrpcClient().grpc_client
56+
bucket_name = "my-bucket"
57+
object_name = "my-appendable-object"
58+
59+
# instantiate the writer
60+
writer = AsyncAppendableObjectWriter(client, bucket_name, object_name)
61+
# open the writer, (underlying gRPC bidi-stream will be opened)
62+
await writer.open()
63+
64+
# append data, it can be called multiple times.
65+
await writer.append(b"hello world")
66+
await writer.append(b"some more data")
67+
68+
# optionally flush data to persist.
69+
await writer.flush()
70+
71+
# close the gRPC stream.
72+
# Please note closing the program will also close the stream,
73+
# however it's recommended to close the stream if no more data to append
74+
# to clean up gRPC connection (which means CPU/memory/network resources)
75+
await writer.close()
76+
```
77+
78+
:type client: :class:`~google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client`
79+
:param client: async grpc client to use for making API requests.
80+
81+
:type bucket_name: str
82+
:param bucket_name: The name of the GCS bucket containing the object.
83+
84+
:type object_name: str
85+
:param object_name: The name of the GCS Appendable Object to be written.
86+
87+
:type generation: int
88+
:param generation: (Optional) If present, selects a specific revision of
89+
that object.
90+
If None, a new object is created.
91+
If None and Object already exists then it'll will be
92+
overwritten.
93+
94+
:type write_handle: bytes
95+
:param write_handle: (Optional) An existing handle for writing the object.
96+
If provided, opening the bidi-gRPC connection will be faster.
97+
"""
98+
self.client = client
99+
self.bucket_name = bucket_name
100+
self.object_name = object_name
101+
self.write_handle = write_handle
102+
self.generation = generation
103+
104+
self.write_obj_stream = _AsyncWriteObjectStream(
105+
client=self.client,
106+
bucket_name=self.bucket_name,
107+
object_name=self.object_name,
108+
generation_number=self.generation,
109+
write_handle=self.write_handle,
110+
)
111+
self._is_stream_open: bool = False
112+
self.offset: Optional[int] = None
113+
self.persisted_size: Optional[int] = None
114+
115+
async def state_lookup(self):
116+
"""Returns the persisted_size."""
117+
raise NotImplementedError("state_lookup is not implemented yet.")
118+
119+
async def open(self) -> None:
120+
"""Opens the underlying bidi-gRPC stream."""
121+
raise NotImplementedError("open is not implemented yet.")
122+
123+
async def append(self, data: bytes):
124+
raise NotImplementedError("append is not implemented yet.")
125+
126+
async def flush(self) -> int:
127+
"""Returns persisted_size"""
128+
raise NotImplementedError("flush is not implemented yet.")
129+
130+
async def close(self, finalize_on_close=False) -> int:
131+
"""Returns persisted_size"""
132+
raise NotImplementedError("close is not implemented yet.")
133+
134+
async def finalize(self) -> int:
135+
"""Returns persisted_size
136+
Note: Once finalized no more data can be appended.
137+
"""
138+
raise NotImplementedError("finalize is not implemented yet.")
139+
140+
# helper methods.
141+
async def append_from_string(self, data: str):
142+
"""
143+
str data will be encoded to bytes using utf-8 encoding calling
144+
145+
self.append(data.encode("utf-8"))
146+
"""
147+
raise NotImplementedError("append_from_string is not implemented yet.")
148+
149+
async def append_from_stream(self, stream_obj):
150+
"""
151+
At a time read a chunk of data (16MiB) from `stream_obj`
152+
and call self.append(chunk)
153+
"""
154+
raise NotImplementedError("append_from_stream is not implemented yet.")
155+
156+
async def append_from_file(self, file_path: str):
157+
"""Create a file object from `file_path` and call append_from_stream(file_obj)"""
158+
raise NotImplementedError("append_from_file is not implemented yet.")
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import pytest
16+
from unittest import mock
17+
18+
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
19+
AsyncAppendableObjectWriter,
20+
)
21+
22+
BUCKET = "test-bucket"
23+
OBJECT = "test-object"
24+
GENERATION = 123
25+
WRITE_HANDLE = b"test-write-handle"
26+
27+
28+
@pytest.fixture
29+
def mock_client():
30+
"""Mock the async gRPC client."""
31+
return mock.AsyncMock()
32+
33+
34+
@mock.patch(
35+
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
36+
)
37+
def test_init(mock_write_object_stream, mock_client):
38+
"""Test the constructor of AsyncAppendableObjectWriter."""
39+
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
40+
41+
assert writer.client == mock_client
42+
assert writer.bucket_name == BUCKET
43+
assert writer.object_name == OBJECT
44+
assert writer.generation is None
45+
assert writer.write_handle is None
46+
assert not writer._is_stream_open
47+
assert writer.offset is None
48+
assert writer.persisted_size is None
49+
50+
mock_write_object_stream.assert_called_once_with(
51+
client=mock_client,
52+
bucket_name=BUCKET,
53+
object_name=OBJECT,
54+
generation_number=None,
55+
write_handle=None,
56+
)
57+
assert writer.write_obj_stream == mock_write_object_stream.return_value
58+
59+
60+
@mock.patch(
61+
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
62+
)
63+
def test_init_with_optional_args(mock_write_object_stream, mock_client):
64+
"""Test the constructor with optional arguments."""
65+
writer = AsyncAppendableObjectWriter(
66+
mock_client,
67+
BUCKET,
68+
OBJECT,
69+
generation=GENERATION,
70+
write_handle=WRITE_HANDLE,
71+
)
72+
73+
assert writer.generation == GENERATION
74+
assert writer.write_handle == WRITE_HANDLE
75+
76+
mock_write_object_stream.assert_called_once_with(
77+
client=mock_client,
78+
bucket_name=BUCKET,
79+
object_name=OBJECT,
80+
generation_number=GENERATION,
81+
write_handle=WRITE_HANDLE,
82+
)
83+
84+
85+
@pytest.mark.asyncio
86+
async def test_unimplemented_methods_raise_error(mock_client):
87+
"""Test that all currently unimplemented methods raise NotImplementedError."""
88+
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
89+
90+
with pytest.raises(NotImplementedError):
91+
await writer.state_lookup()
92+
93+
with pytest.raises(NotImplementedError):
94+
await writer.open()
95+
96+
with pytest.raises(NotImplementedError):
97+
await writer.append(b"data")
98+
99+
with pytest.raises(NotImplementedError):
100+
await writer.flush()
101+
102+
with pytest.raises(NotImplementedError):
103+
await writer.close()
104+
105+
with pytest.raises(NotImplementedError):
106+
await writer.finalize()
107+
108+
with pytest.raises(NotImplementedError):
109+
await writer.append_from_string("data")
110+
111+
with pytest.raises(NotImplementedError):
112+
await writer.append_from_stream(mock.Mock())
113+
114+
with pytest.raises(NotImplementedError):
115+
await writer.append_from_file("file.txt")

0 commit comments

Comments
 (0)