Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
NOTE:
This is _experimental module for upcoming support for Rapid Storage.
(https://cloud.google.com/blog/products/storage-data-transfer/high-performance-storage-innovations-for-ai-hpc#:~:text=your%20AI%20workloads%3A-,Rapid%20Storage,-%3A%20A%20new)

APIs may not work as intended and are not stable yet. Feature is not
GA(Generally Available) yet, please contact your TAM (Technical Account Manager)
if you want to use these Rapid Storage APIs.

"""
from typing import Optional
from google.cloud.storage._experimental.asyncio.async_grpc_client import (
AsyncGrpcClient,
)
from google.cloud.storage._experimental.asyncio.async_write_object_stream import (
_AsyncWriteObjectStream,
)


class AsyncAppendableObjectWriter:
"""Class for appending data to a GCS Appendable Object asynchronously."""

def __init__(
self,
client: AsyncGrpcClient.grpc_client,
bucket_name: str,
object_name: str,
generation=None,
write_handle=None,
):
"""
Class for appending data to a GCS Appendable Object.

Example usage:

```

from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import AsyncAppendableObjectWriter
import asyncio

client = AsyncGrpcClient().grpc_client
bucket_name = "my-bucket"
object_name = "my-appendable-object"

# instantiate the writer
writer = AsyncAppendableObjectWriter(client, bucket_name, object_name)
# open the writer, (underlying gRPC bidi-stream will be opened)
await writer.open()

# append data, it can be called multiple times.
await writer.append(b"hello world")
await writer.append(b"some more data")

# optionally flush data to persist.
await writer.flush()

# close the gRPC stream.
# Please note closing the program will also close the stream,
# however it's recommended to close the stream if no more data to append
# to clean up gRPC connection (which means CPU/memory/network resources)
await writer.close()
```

:type client: :class:`~google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client`
:param client: async grpc client to use for making API requests.

:type bucket_name: str
:param bucket_name: The name of the GCS bucket containing the object.

:type object_name: str
:param object_name: The name of the GCS Appendable Object to be written.

:type generation: int
:param generation: (Optional) If present, selects a specific revision of
that object.
If None, a new object is created.
If None and Object already exists then it'll will be
overwritten.

:type write_handle: bytes
:param write_handle: (Optional) An existing handle for writing the object.
If provided, opening the bidi-gRPC connection will be faster.
"""
self.client = client
self.bucket_name = bucket_name
self.object_name = object_name
self.write_handle = write_handle
self.generation = generation

self.write_obj_stream = _AsyncWriteObjectStream(
client=self.client,
bucket_name=self.bucket_name,
object_name=self.object_name,
generation_number=self.generation,
write_handle=self.write_handle,
)
self._is_stream_open: bool = False
self.offset: Optional[int] = None
self.persisted_size: Optional[int] = None

async def state_lookup(self):
"""Returns the persisted_size."""
raise NotImplementedError("state_lookup is not implemented yet.")

async def open(self) -> None:
"""Opens the underlying bidi-gRPC stream."""
raise NotImplementedError("open is not implemented yet.")

async def append(self, data: bytes):
raise NotImplementedError("append is not implemented yet.")

async def flush(self) -> int:
"""Returns persisted_size"""
raise NotImplementedError("flush is not implemented yet.")

async def close(self, finalize_on_close=False) -> int:
"""Returns persisted_size"""
raise NotImplementedError("close is not implemented yet.")

async def finalize(self) -> int:
"""Returns persisted_size
Note: Once finalized no more data can be appended.
"""
raise NotImplementedError("finalize is not implemented yet.")

# helper methods.
async def append_from_string(self, data: str):
"""
str data will be encoded to bytes using utf-8 encoding calling

self.append(data.encode("utf-8"))
"""
raise NotImplementedError("append_from_string is not implemented yet.")

async def append_from_stream(self, stream_obj):
"""
At a time read a chunk of data (16MiB) from `stream_obj`
and call self.append(chunk)
"""
raise NotImplementedError("append_from_stream is not implemented yet.")

async def append_from_file(self, file_path: str):
"""Create a file object from `file_path` and call append_from_stream(file_obj)"""
raise NotImplementedError("append_from_file is not implemented yet.")
115 changes: 115 additions & 0 deletions tests/unit/asyncio/test_async_appendable_object_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
from unittest import mock

from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
AsyncAppendableObjectWriter,
)

BUCKET = "test-bucket"
OBJECT = "test-object"
GENERATION = 123
WRITE_HANDLE = b"test-write-handle"


@pytest.fixture
def mock_client():
"""Mock the async gRPC client."""
return mock.AsyncMock()


@mock.patch(
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
)
def test_init(mock_write_object_stream, mock_client):
"""Test the constructor of AsyncAppendableObjectWriter."""
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)

assert writer.client == mock_client
assert writer.bucket_name == BUCKET
assert writer.object_name == OBJECT
assert writer.generation is None
assert writer.write_handle is None
assert not writer._is_stream_open
assert writer.offset is None
assert writer.persisted_size is None

mock_write_object_stream.assert_called_once_with(
client=mock_client,
bucket_name=BUCKET,
object_name=OBJECT,
generation_number=None,
write_handle=None,
)
assert writer.write_obj_stream == mock_write_object_stream.return_value


@mock.patch(
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
)
def test_init_with_optional_args(mock_write_object_stream, mock_client):
"""Test the constructor with optional arguments."""
writer = AsyncAppendableObjectWriter(
mock_client,
BUCKET,
OBJECT,
generation=GENERATION,
write_handle=WRITE_HANDLE,
)

assert writer.generation == GENERATION
assert writer.write_handle == WRITE_HANDLE

mock_write_object_stream.assert_called_once_with(
client=mock_client,
bucket_name=BUCKET,
object_name=OBJECT,
generation_number=GENERATION,
write_handle=WRITE_HANDLE,
)


@pytest.mark.asyncio
async def test_unimplemented_methods_raise_error(mock_client):
"""Test that all currently unimplemented methods raise NotImplementedError."""
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)

with pytest.raises(NotImplementedError):
await writer.state_lookup()

with pytest.raises(NotImplementedError):
await writer.open()

with pytest.raises(NotImplementedError):
await writer.append(b"data")

with pytest.raises(NotImplementedError):
await writer.flush()

with pytest.raises(NotImplementedError):
await writer.close()

with pytest.raises(NotImplementedError):
await writer.finalize()

with pytest.raises(NotImplementedError):
await writer.append_from_string("data")

with pytest.raises(NotImplementedError):
await writer.append_from_stream(mock.Mock())

with pytest.raises(NotImplementedError):
await writer.append_from_file("file.txt")