Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.
6 changes: 6 additions & 0 deletions .kokoro/presubmit/system-3.9.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,10 @@
env_vars: {
key: "NOX_SESSION"
value: "system-3.9"
}

# Credentials needed to test universe domain.
env_vars: {
key: "SECRET_MANAGER_KEYS"
value: "client-library-test-universe-domain-credential"
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def __init__(
self._read_id_to_writable_buffer_dict = {}
self._read_id_to_download_ranges_id = {}
self._download_ranges_id_to_pending_read_ids = {}
self.persisted_size: Optional[int] = None # updated after opening the stream

async def open(self) -> None:
"""Opens the bidi-gRPC connection to read from the object.
Expand All @@ -206,6 +207,8 @@ async def open(self) -> None:
if self.generation_number is None:
self.generation_number = self.read_obj_str.generation_number
self.read_handle = self.read_obj_str.read_handle
if self.read_obj_str.persisted_size is not None:
self.persisted_size = self.read_obj_str.persisted_size
return

async def download_ranges(
Expand Down Expand Up @@ -339,6 +342,7 @@ async def close(self):
if not self._is_stream_open:
raise ValueError("Underlying bidi-gRPC stream is not open")
await self.read_obj_str.close()
self.read_obj_str = None
self._is_stream_open = False

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,10 @@ def __init__(
self.rpc = self.client._client._transport._wrapped_methods[
self.client._client._transport.bidi_read_object
]
self.first_bidi_read_req = _storage_v2.BidiReadObjectRequest(
read_object_spec=_storage_v2.BidiReadObjectSpec(
bucket=self._full_bucket_name, object=object_name
),
)
self.metadata = (("x-goog-request-params", f"bucket={self._full_bucket_name}"),)
self.socket_like_rpc: Optional[AsyncBidiRpc] = None
self._is_stream_open: bool = False
self.persisted_size: Optional[int] = None

async def open(self) -> None:
"""Opens the bidi-gRPC connection to read from the object.
Expand All @@ -101,13 +97,25 @@ async def open(self) -> None:
"""
if self._is_stream_open:
raise ValueError("Stream is already open")
self.first_bidi_read_req = _storage_v2.BidiReadObjectRequest(
read_object_spec=_storage_v2.BidiReadObjectSpec(
bucket=self._full_bucket_name,
object=self.object_name,
read_handle=self.read_handle,
),
)
self.socket_like_rpc = AsyncBidiRpc(
self.rpc, initial_request=self.first_bidi_read_req, metadata=self.metadata
)
await self.socket_like_rpc.open() # this is actually 1 send
response = await self.socket_like_rpc.recv()
if self.generation_number is None:
self.generation_number = response.metadata.generation
# populated only in the first response of bidi-stream and when opened
# without using `read_handle`
if hasattr(response, "metadata") and response.metadata:
if self.generation_number is None:
self.generation_number = response.metadata.generation
# update persisted size
self.persisted_size = response.metadata.size

self.read_handle = response.read_handle

Expand Down
77 changes: 77 additions & 0 deletions tests/system/test_zonal.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# py standard imports
import asyncio
import os
import uuid
from io import BytesIO
Expand Down Expand Up @@ -27,6 +28,36 @@
_BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object"


async def write_one_appendable_object(
bucket_name: str,
object_name: str,
data: bytes,
) -> None:
"""Helper to write an appendable object."""
grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client
writer = AsyncAppendableObjectWriter(grpc_client, bucket_name, object_name)
await writer.open()
await writer.append(data)
await writer.close()


@pytest.fixture(scope="function")
def appendable_object(storage_client, blobs_to_delete):
"""Fixture to create and cleanup an appendable object."""
object_name = f"appendable_obj_for_mrd-{str(uuid.uuid4())[:4]}"
asyncio.run(
write_one_appendable_object(
_ZONAL_BUCKET,
object_name,
_BYTES_TO_UPLOAD,
)
)
Comment thread
chandra-siri marked this conversation as resolved.
yield object_name

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))


@pytest.mark.asyncio
@pytest.mark.parametrize(
"attempt_direct_path",
Expand Down Expand Up @@ -58,6 +89,52 @@ async def test_basic_wrd(storage_client, blobs_to_delete, attempt_direct_path):
await mrd.download_ranges([(0, 0, buffer)])
await mrd.close()
assert buffer.getvalue() == _BYTES_TO_UPLOAD
assert mrd.persisted_size == len(_BYTES_TO_UPLOAD)

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))


@pytest.mark.asyncio
async def test_read_unfinalized_appendable_object(storage_client, blobs_to_delete):
object_name = f"read_unfinalized_appendable_object-{str(uuid.uuid4())[:4]}"
grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client

writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name)
await writer.open()
await writer.append(_BYTES_TO_UPLOAD)
await writer.flush()

mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name)
buffer = BytesIO()
await mrd.open()
assert mrd.persisted_size == len(_BYTES_TO_UPLOAD)
# (0, 0) means read the whole object
await mrd.download_ranges([(0, 0, buffer)])
await mrd.close()
assert buffer.getvalue() == _BYTES_TO_UPLOAD

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))


@pytest.mark.asyncio
async def test_mrd_open_with_read_handle(appendable_object):
grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client

mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, appendable_object)
await mrd.open()
read_handle = mrd.read_handle
await mrd.close()

# Open a new MRD using the `read_handle` obtained above
new_mrd = AsyncMultiRangeDownloader(
grpc_client, _ZONAL_BUCKET, appendable_object, read_handle=read_handle
)
await new_mrd.open()
# persisted_size not set when opened with read_handle
assert new_mrd.persisted_size is None
buffer = BytesIO()
await new_mrd.download_ranges([(0, 0, buffer)])
await new_mrd.close()
assert buffer.getvalue() == _BYTES_TO_UPLOAD
3 changes: 3 additions & 0 deletions tests/unit/asyncio/test_async_multi_range_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

_TEST_BUCKET_NAME = "test-bucket"
_TEST_OBJECT_NAME = "test-object"
_TEST_OBJECT_SIZE = 1024 * 1024 # 1 MiB
_TEST_GENERATION_NUMBER = 123456789
_TEST_READ_HANDLE = b"test-handle"

Expand Down Expand Up @@ -57,6 +58,7 @@ async def _make_mock_mrd(
mock_stream = mock_cls_async_read_object_stream.return_value
mock_stream.open = AsyncMock()
mock_stream.generation_number = _TEST_GENERATION_NUMBER
mock_stream.persisted_size = _TEST_OBJECT_SIZE
mock_stream.read_handle = _TEST_READ_HANDLE

mrd = await AsyncMultiRangeDownloader.create_mrd(
Expand Down Expand Up @@ -106,6 +108,7 @@ async def test_create_mrd(
assert mrd.object_name == _TEST_OBJECT_NAME
assert mrd.generation_number == _TEST_GENERATION_NUMBER
assert mrd.read_handle == _TEST_READ_HANDLE
assert mrd.persisted_size == _TEST_OBJECT_SIZE
assert mrd.is_stream_open

@mock.patch(
Expand Down
61 changes: 54 additions & 7 deletions tests/unit/asyncio/test_async_read_object_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
_TEST_BUCKET_NAME = "test-bucket"
_TEST_OBJECT_NAME = "test-object"
_TEST_GENERATION_NUMBER = 12345
_TEST_OBJECT_SIZE = 1024 * 1024 # 1 MiB
_TEST_READ_HANDLE = b"test-read-handle"
_TEST_READ_HANDLE_NEW = b"test-read-handle-new"


async def instantiate_read_obj_stream(mock_client, mock_cls_async_bidi_rpc, open=True):
Expand All @@ -37,6 +39,7 @@ async def instantiate_read_obj_stream(mock_client, mock_cls_async_bidi_rpc, open
recv_response = mock.MagicMock(spec=_storage_v2.BidiReadObjectResponse)
recv_response.metadata = mock.MagicMock(spec=_storage_v2.Object)
recv_response.metadata.generation = _TEST_GENERATION_NUMBER
recv_response.metadata.size = _TEST_OBJECT_SIZE
recv_response.read_handle = _TEST_READ_HANDLE
socket_like_rpc.recv = AsyncMock(return_value=recv_response)

Expand All @@ -52,6 +55,30 @@ async def instantiate_read_obj_stream(mock_client, mock_cls_async_bidi_rpc, open
return read_obj_stream


async def instantiate_read_obj_stream_with_read_handle(
mock_client, mock_cls_async_bidi_rpc, open=True
):
"""Helper to create an instance of _AsyncReadObjectStream and open it by default."""
socket_like_rpc = AsyncMock()
mock_cls_async_bidi_rpc.return_value = socket_like_rpc
socket_like_rpc.open = AsyncMock()

recv_response = mock.MagicMock(spec=_storage_v2.BidiReadObjectResponse)
recv_response.read_handle = _TEST_READ_HANDLE_NEW
socket_like_rpc.recv = AsyncMock(return_value=recv_response)

read_obj_stream = _AsyncReadObjectStream(
client=mock_client,
bucket_name=_TEST_BUCKET_NAME,
object_name=_TEST_OBJECT_NAME,
)

if open:
await read_obj_stream.open()

return read_obj_stream


@mock.patch(
"google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc"
)
Expand All @@ -65,12 +92,6 @@ def test_init_with_bucket_object_generation(mock_client, mock_async_bidi_rpc):
mock_client._client._transport._wrapped_methods = {
"bidi_read_object_rpc": rpc_sentinel,
}
full_bucket_name = f"projects/_/buckets/{_TEST_BUCKET_NAME}"
first_bidi_read_req = _storage_v2.BidiReadObjectRequest(
read_object_spec=_storage_v2.BidiReadObjectSpec(
bucket=full_bucket_name, object=_TEST_OBJECT_NAME
),
)

# Act
read_obj_stream = _AsyncReadObjectStream(
Expand All @@ -86,7 +107,6 @@ def test_init_with_bucket_object_generation(mock_client, mock_async_bidi_rpc):
assert read_obj_stream.object_name == _TEST_OBJECT_NAME
assert read_obj_stream.generation_number == _TEST_GENERATION_NUMBER
assert read_obj_stream.read_handle == _TEST_READ_HANDLE
assert read_obj_stream.first_bidi_read_req == first_bidi_read_req
assert read_obj_stream.rpc == rpc_sentinel


Expand All @@ -112,6 +132,33 @@ async def test_open(mock_client, mock_cls_async_bidi_rpc):

assert read_obj_stream.generation_number == _TEST_GENERATION_NUMBER
assert read_obj_stream.read_handle == _TEST_READ_HANDLE
assert read_obj_stream.persisted_size == _TEST_OBJECT_SIZE
assert read_obj_stream.is_stream_open


@mock.patch(
"google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc"
)
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
@pytest.mark.asyncio
async def test_open_with_read_handle(mock_client, mock_cls_async_bidi_rpc):
# arrange
read_obj_stream = await instantiate_read_obj_stream_with_read_handle(
mock_client, mock_cls_async_bidi_rpc, open=False
)

# act
await read_obj_stream.open()

# assert
read_obj_stream.socket_like_rpc.open.assert_called_once()
read_obj_stream.socket_like_rpc.recv.assert_called_once()

assert read_obj_stream.generation_number is None
assert read_obj_stream.persisted_size is None
assert read_obj_stream.read_handle == _TEST_READ_HANDLE_NEW
assert read_obj_stream.is_stream_open


Expand Down