diff --git a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py index c808cb52a..6ec1e7315 100644 --- a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. """ +from __future__ import annotations + NOTE: This is _experimental module for upcoming support for Rapid Storage. (https://cloud.google.com/blog/products/storage-data-transfer/high-performance-storage-innovations-for-ai-hpc#:~:text=your%20AI%20workloads%3A-,Rapid%20Storage,-%3A%20A%20new) @@ -22,10 +24,17 @@ """ from io import BufferedReader -from typing import Optional, Union +import asyncio +import io +import logging +from typing import List, Optional, Tuple, Union -from google_crc32c import Checksum from google.api_core import exceptions +from google.api_core.retry_async import AsyncRetry +from google.rpc import status_pb2 +from google.cloud._storage_v2.types import BidiWriteObjectRedirectedError +from google.cloud._storage_v2.types.storage import BidiWriteObjectRequest + from ._utils import raise_if_no_fast_crc32c from google.cloud import _storage_v2 @@ -35,10 +44,69 @@ from google.cloud.storage._experimental.asyncio.async_write_object_stream import ( _AsyncWriteObjectStream, ) +from google.cloud.storage._experimental.asyncio.retry.bidi_stream_retry_manager import ( + _BidiStreamRetryManager, +) +from google.cloud.storage._experimental.asyncio.retry.writes_resumption_strategy import ( + _WriteResumptionStrategy, + _WriteState, +) +from google.cloud.storage._experimental.asyncio.retry._helpers import ( + _extract_bidi_writes_redirect_proto, +) _MAX_CHUNK_SIZE_BYTES = 2 * 1024 * 1024 # 2 MiB _DEFAULT_FLUSH_INTERVAL_BYTES = 16 * 1024 * 1024 # 16 MiB +_BIDI_WRITE_REDIRECTED_TYPE_URL = ( + "type.googleapis.com/google.storage.v2.BidiWriteObjectRedirectedError" +) +logger = logging.getLogger(__name__) + + +def _is_write_retryable(exc): + """Predicate to determine if a write operation should be retried.""" + + if isinstance( + exc, + ( + exceptions.InternalServerError, + exceptions.ServiceUnavailable, + exceptions.DeadlineExceeded, + exceptions.TooManyRequests, + BidiWriteObjectRedirectedError, + ), + ): + logger.info(f"Retryable write exception encountered: {exc}") + return True + + grpc_error = None + if isinstance(exc, exceptions.Aborted) and exc.errors: + grpc_error = exc.errors[0] + if isinstance(grpc_error, BidiWriteObjectRedirectedError): + return True + + trailers = grpc_error.trailing_metadata() + if not trailers: + return False + + status_details_bin = None + for key, value in trailers: + if key == "grpc-status-details-bin": + status_details_bin = value + break + + if status_details_bin: + status_proto = status_pb2.Status() + try: + status_proto.ParseFromString(status_details_bin) + for detail in status_proto.details: + if detail.type_url == _BIDI_WRITE_REDIRECTED_TYPE_URL: + return True + except Exception: + logger.error("Error unpacking redirect details from gRPC error.") + return False + return False class AsyncAppendableObjectWriter: @@ -114,13 +182,7 @@ def __init__( self.write_handle = write_handle self.generation = generation - self.write_obj_stream = _AsyncWriteObjectStream( - client=self.client, - bucket_name=self.bucket_name, - object_name=self.object_name, - generation_number=self.generation, - write_handle=self.write_handle, - ) + self.write_obj_stream: Optional[_AsyncWriteObjectStream] = None self._is_stream_open: bool = False # `offset` is the latest size of the object without staleless. self.offset: Optional[int] = None @@ -143,6 +205,9 @@ def __init__( f"flush_interval must be a multiple of {_MAX_CHUNK_SIZE_BYTES}, but provided {self.flush_interval}" ) self.bytes_appended_since_last_flush = 0 + self._lock = asyncio.Lock() + self._routing_token: Optional[str] = None + self.object_resource: Optional[_storage_v2.Object] = None async def state_lookup(self) -> int: """Returns the persisted_size @@ -156,16 +221,32 @@ async def state_lookup(self) -> int: if not self._is_stream_open: raise ValueError("Stream is not open. Call open() before state_lookup().") - await self.write_obj_stream.send( - _storage_v2.BidiWriteObjectRequest( - state_lookup=True, + async with self._lock: + await self.write_obj_stream.send( + _storage_v2.BidiWriteObjectRequest( + state_lookup=True, + ) ) - ) - response = await self.write_obj_stream.recv() - self.persisted_size = response.persisted_size - return self.persisted_size - - async def open(self) -> None: + response = await self.write_obj_stream.recv() + self.persisted_size = response.persisted_size + return self.persisted_size + + def _on_open_error(self, exc): + """Extracts routing token and write handle on redirect error during open.""" + redirect_proto = _extract_bidi_writes_redirect_proto(exc) + if redirect_proto: + if redirect_proto.routing_token: + self._routing_token = redirect_proto.routing_token + if redirect_proto.write_handle: + self.write_handle = redirect_proto.write_handle + if redirect_proto.generation: + self.generation = redirect_proto.generation + + async def open( + self, + retry_policy: Optional[AsyncRetry] = None, + metadata: Optional[List[Tuple[str, str]]] = None, + ) -> None: """Opens the underlying bidi-gRPC stream. :raises ValueError: If the stream is already open. @@ -174,15 +255,77 @@ async def open(self) -> None: if self._is_stream_open: raise ValueError("Underlying bidi-gRPC stream is already open") - await self.write_obj_stream.open() - self._is_stream_open = True - if self.generation is None: - self.generation = self.write_obj_stream.generation_number - self.write_handle = self.write_obj_stream.write_handle - self.persisted_size = self.write_obj_stream.persisted_size + if retry_policy is None: + retry_policy = AsyncRetry( + predicate=_is_write_retryable, on_error=self._on_open_error + ) + else: + original_on_error = retry_policy._on_error + + def combined_on_error(exc): + self._on_open_error(exc) + if original_on_error: + original_on_error(exc) + + retry_policy = AsyncRetry( + predicate=_is_write_retryable, + initial=retry_policy._initial, + maximum=retry_policy._maximum, + multiplier=retry_policy._multiplier, + deadline=retry_policy._deadline, + on_error=combined_on_error, + ) + + async def _do_open(): + current_metadata = list(metadata) if metadata else [] + + # Cleanup stream from previous failed attempt, if any. + if self.write_obj_stream: + if self._is_stream_open: + try: + await self.write_obj_stream.close() + except Exception: # ignore cleanup errors + pass + self.write_obj_stream = None + self._is_stream_open = False + + self.write_obj_stream = _AsyncWriteObjectStream( + client=self.client, + bucket_name=self.bucket_name, + object_name=self.object_name, + generation_number=self.generation, + write_handle=self.write_handle, + routing_token=self._routing_token, + ) + + if self._routing_token: + current_metadata.append( + ("x-goog-request-params", f"routing_token={self._routing_token}") + ) + + await self.write_obj_stream.open( + metadata=current_metadata if metadata else None + ) + + if self.write_obj_stream.generation_number: + self.generation = self.write_obj_stream.generation_number + if self.write_obj_stream.write_handle: + self.write_handle = self.write_obj_stream.write_handle + if self.write_obj_stream.persisted_size is not None: + self.persisted_size = self.write_obj_stream.persisted_size - async def append(self, data: bytes) -> None: - """Appends data to the Appendable object. + self._is_stream_open = True + self._routing_token = None + + await retry_policy(_do_open)() + + async def append( + self, + data: bytes, + retry_policy: Optional[AsyncRetry] = None, + metadata: Optional[List[Tuple[str, str]]] = None, + ) -> None: + """Appends data to the Appendable object with automatic retries. calling `self.append` will append bytes at the end of the current size ie. `self.offset` bytes relative to the begining of the object. @@ -195,55 +338,113 @@ async def append(self, data: bytes) -> None: :type data: bytes :param data: The bytes to append to the object. - :rtype: None + :type retry_policy: :class:`~google.api_core.retry_async.AsyncRetry` + :param retry_policy: (Optional) The retry policy to use for the operation. - :raises ValueError: If the stream is not open (i.e., `open()` has not - been called). - """ + :type metadata: List[Tuple[str, str]] + :param metadata: (Optional) The metadata to be sent with the request. + :raises ValueError: If the stream is not open. + """ if not self._is_stream_open: raise ValueError("Stream is not open. Call open() before append().") - total_bytes = len(data) - if total_bytes == 0: - # TODO: add warning. + if not data: return - if self.offset is None: - assert self.persisted_size is not None - self.offset = self.persisted_size - - start_idx = 0 - while start_idx < total_bytes: - end_idx = min(start_idx + _MAX_CHUNK_SIZE_BYTES, total_bytes) - data_chunk = data[start_idx:end_idx] - is_last_chunk = end_idx == total_bytes - chunk_size = end_idx - start_idx - await self.write_obj_stream.send( - _storage_v2.BidiWriteObjectRequest( - write_offset=self.offset, - checksummed_data=_storage_v2.ChecksummedData( - content=data_chunk, - crc32c=int.from_bytes(Checksum(data_chunk).digest(), "big"), - ), - state_lookup=is_last_chunk, - flush=is_last_chunk - or ( - self.bytes_appended_since_last_flush + chunk_size - >= self.flush_interval - ), - ) - ) - self.offset += chunk_size - self.bytes_appended_since_last_flush += chunk_size - if self.bytes_appended_since_last_flush >= self.flush_interval: - self.bytes_appended_since_last_flush = 0 + if retry_policy is None: + retry_policy = AsyncRetry(predicate=_is_write_retryable) + + strategy = _WriteResumptionStrategy() + buffer = io.BytesIO(data) + attempt_count = 0 + + def send_and_recv_generator( + requests: List[BidiWriteObjectRequest], + state: dict[str, _WriteState], + metadata: Optional[List[Tuple[str, str]]] = None, + ): + async def generator(): + nonlocal attempt_count + nonlocal requests + attempt_count += 1 + resp = None + async with self._lock: + write_state = state["write_state"] + # If this is a retry or redirect, we must re-open the stream + if attempt_count > 1 or write_state.routing_token: + logger.info( + f"Re-opening the stream with attempt_count: {attempt_count}" + ) + if ( + self.write_obj_stream + and self.write_obj_stream.is_stream_open + ): + await self.write_obj_stream.close() + + current_metadata = list(metadata) if metadata else [] + if write_state.routing_token: + current_metadata.append( + ( + "x-goog-request-params", + f"routing_token={write_state.routing_token}", + ) + ) + self._routing_token = write_state.routing_token + + self._is_stream_open = False + await self.open(metadata=current_metadata) + + write_state.persisted_size = self.persisted_size + write_state.write_handle = self.write_handle + write_state.routing_token = None + + write_state.user_buffer.seek(write_state.persisted_size) + write_state.bytes_sent = write_state.persisted_size + write_state.bytes_since_last_flush = 0 + + requests = strategy.generate_requests(state) + + num_requests = len(requests) + for i, chunk_req in enumerate(requests): + if i == num_requests - 1: + chunk_req.state_lookup = True + chunk_req.flush = True + await self.write_obj_stream.send(chunk_req) + + resp = await self.write_obj_stream.recv() + if resp: + if resp.persisted_size is not None: + self.persisted_size = resp.persisted_size + state["write_state"].persisted_size = resp.persisted_size + self.offset = self.persisted_size + if resp.write_handle: + self.write_handle = resp.write_handle + state["write_state"].write_handle = resp.write_handle + self.bytes_appended_since_last_flush = 0 + + yield resp + + return generator() + + # State initialization + write_state = _WriteState(_MAX_CHUNK_SIZE_BYTES, buffer, self.flush_interval) + write_state.write_handle = self.write_handle + write_state.persisted_size = self.persisted_size + write_state.bytes_sent = self.persisted_size + write_state.bytes_since_last_flush = self.bytes_appended_since_last_flush + + retry_manager = _BidiStreamRetryManager( + _WriteResumptionStrategy(), + lambda r, s: send_and_recv_generator(r, s, metadata), + ) + await retry_manager.execute({"write_state": write_state}, retry_policy) - if is_last_chunk: - response = await self.write_obj_stream.recv() - self.persisted_size = response.persisted_size - self.offset = self.persisted_size - self.bytes_appended_since_last_flush = 0 - start_idx = end_idx + # Sync local markers + self.write_obj_stream.persisted_size = write_state.persisted_size + self.write_obj_stream.write_handle = write_state.write_handle + self.bytes_appended_since_last_flush = write_state.bytes_since_last_flush + self.persisted_size = write_state.persisted_size + self.offset = write_state.persisted_size async def simple_flush(self) -> None: """Flushes the data to the server. @@ -257,11 +458,13 @@ async def simple_flush(self) -> None: if not self._is_stream_open: raise ValueError("Stream is not open. Call open() before simple_flush().") - await self.write_obj_stream.send( - _storage_v2.BidiWriteObjectRequest( - flush=True, + async with self._lock: + await self.write_obj_stream.send( + _storage_v2.BidiWriteObjectRequest( + flush=True, + ) ) - ) + self.bytes_appended_since_last_flush = 0 async def flush(self) -> int: """Flushes the data to the server. @@ -275,16 +478,18 @@ async def flush(self) -> int: if not self._is_stream_open: raise ValueError("Stream is not open. Call open() before flush().") - await self.write_obj_stream.send( - _storage_v2.BidiWriteObjectRequest( - flush=True, - state_lookup=True, + async with self._lock: + await self.write_obj_stream.send( + _storage_v2.BidiWriteObjectRequest( + flush=True, + state_lookup=True, + ) ) - ) - response = await self.write_obj_stream.recv() - self.persisted_size = response.persisted_size - self.offset = self.persisted_size - return self.persisted_size + response = await self.write_obj_stream.recv() + self.persisted_size = response.persisted_size + self.offset = self.persisted_size + self.bytes_appended_since_last_flush = 0 + return self.persisted_size async def close(self, finalize_on_close=False) -> Union[int, _storage_v2.Object]: """Closes the underlying bidi-gRPC stream. diff --git a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py index 183a8eeb1..256911073 100644 --- a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py @@ -21,7 +21,7 @@ if you want to use these Rapid Storage APIs. """ -from typing import Optional +from typing import List, Optional, Tuple from google.cloud import _storage_v2 from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( @@ -62,6 +62,7 @@ def __init__( object_name: str, generation_number: Optional[int] = None, # None means new object write_handle: Optional[bytes] = None, + routing_token: Optional[str] = None, ) -> None: if client is None: raise ValueError("client must be provided") @@ -77,6 +78,7 @@ def __init__( ) self.client: AsyncGrpcClient.grpc_client = client self.write_handle: Optional[bytes] = write_handle + self.routing_token: Optional[str] = routing_token self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}" @@ -91,13 +93,15 @@ def __init__( self.persisted_size = 0 self.object_resource: Optional[_storage_v2.Object] = None - async def open(self) -> None: + async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None: """Opening an object for write , should do it's state lookup to know what's the persisted size is. """ if self._is_stream_open: raise ValueError("Stream is already open") + write_handle = self.write_handle if self.write_handle else None + # Create a new object or overwrite existing one if generation_number # is None. This makes it consistent with GCS JSON API behavior. # Created object type would be Appendable Object. @@ -116,37 +120,47 @@ async def open(self) -> None: bucket=self._full_bucket_name, object=self.object_name, generation=self.generation_number, + write_handle=write_handle, + routing_token=self.routing_token if self.routing_token else None, ), ) + request_params = [f"bucket={self._full_bucket_name}"] + other_metadata = [] + if metadata: + for key, value in metadata: + if key == "x-goog-request-params": + request_params.append(value) + else: + other_metadata.append((key, value)) + + current_metadata = other_metadata + current_metadata.append(("x-goog-request-params", ",".join(request_params))) + self.socket_like_rpc = AsyncBidiRpc( - self.rpc, initial_request=self.first_bidi_write_req, metadata=self.metadata + self.rpc, + initial_request=self.first_bidi_write_req, + metadata=current_metadata, ) await self.socket_like_rpc.open() # this is actually 1 send response = await self.socket_like_rpc.recv() self._is_stream_open = True - if not response.resource: - raise ValueError( - "Failed to obtain object resource after opening the stream" - ) - if not response.resource.generation: - raise ValueError( - "Failed to obtain object generation after opening the stream" - ) + if response.persisted_size: + self.persisted_size = response.persisted_size - if not response.write_handle: - raise ValueError("Failed to obtain write_handle after opening the stream") + if response.resource: + if not response.resource.size: + # Appending to a 0 byte appendable object. + self.persisted_size = 0 + else: + self.persisted_size = response.resource.size - if not response.resource.size: - # Appending to a 0 byte appendable object. - self.persisted_size = 0 - else: - self.persisted_size = response.resource.size + self.generation_number = response.resource.generation - self.generation_number = response.resource.generation - self.write_handle = response.write_handle + if response.write_handle: + self.write_handle = response.write_handle async def close(self) -> None: """Closes the bidi-gRPC connection.""" @@ -181,7 +195,16 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse: """ if not self._is_stream_open: raise ValueError("Stream is not open") - return await self.socket_like_rpc.recv() + response = await self.socket_like_rpc.recv() + # Update write_handle if present in response + if response: + if response.write_handle: + self.write_handle = response.write_handle + if response.persisted_size is not None: + self.persisted_size = response.persisted_size + if response.resource and response.resource.size: + self.persisted_size = response.resource.size + return response @property def is_stream_open(self) -> bool: diff --git a/google/cloud/storage/_experimental/asyncio/retry/_helpers.py b/google/cloud/storage/_experimental/asyncio/retry/_helpers.py index 627bf5944..d9ad2462e 100644 --- a/google/cloud/storage/_experimental/asyncio/retry/_helpers.py +++ b/google/cloud/storage/_experimental/asyncio/retry/_helpers.py @@ -18,12 +18,19 @@ from typing import Tuple, Optional from google.api_core import exceptions -from google.cloud._storage_v2.types import BidiReadObjectRedirectedError +from google.cloud._storage_v2.types import ( + BidiReadObjectRedirectedError, + BidiWriteObjectRedirectedError, +) from google.rpc import status_pb2 _BIDI_READ_REDIRECTED_TYPE_URL = ( "type.googleapis.com/google.storage.v2.BidiReadObjectRedirectedError" ) +_BIDI_WRITE_REDIRECTED_TYPE_URL = ( + "type.googleapis.com/google.storage.v2.BidiWriteObjectRedirectedError" +) +logger = logging.getLogger(__name__) def _handle_redirect( @@ -78,6 +85,41 @@ def _handle_redirect( read_handle = redirect_proto.read_handle break except Exception as e: - logging.ERROR(f"Error unpacking redirect: {e}") + logger.error(f"Error unpacking redirect: {e}") return routing_token, read_handle + + +def _extract_bidi_writes_redirect_proto(exc: Exception): + grpc_error = None + if isinstance(exc, exceptions.Aborted) and exc.errors: + grpc_error = exc.errors[0] + + if grpc_error: + if isinstance(grpc_error, BidiWriteObjectRedirectedError): + return grpc_error + + if hasattr(grpc_error, "trailing_metadata"): + trailers = grpc_error.trailing_metadata() + if not trailers: + return + + status_details_bin = None + for key, value in trailers: + if key == "grpc-status-details-bin": + status_details_bin = value + break + + if status_details_bin: + status_proto = status_pb2.Status() + try: + status_proto.ParseFromString(status_details_bin) + for detail in status_proto.details: + if detail.type_url == _BIDI_WRITE_REDIRECTED_TYPE_URL: + redirect_proto = BidiWriteObjectRedirectedError.deserialize( + detail.value + ) + return redirect_proto + except Exception: + logger.error("Error unpacking redirect details from gRPC error.") + pass diff --git a/google/cloud/storage/_experimental/asyncio/retry/writes_resumption_strategy.py b/google/cloud/storage/_experimental/asyncio/retry/writes_resumption_strategy.py index c6ae36339..bef72ce64 100644 --- a/google/cloud/storage/_experimental/asyncio/retry/writes_resumption_strategy.py +++ b/google/cloud/storage/_experimental/asyncio/retry/writes_resumption_strategy.py @@ -12,40 +12,50 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, Dict, IO, Iterable, Optional, Union +from typing import Any, Dict, IO, List, Optional, Union import google_crc32c +from google.api_core import exceptions from google.cloud._storage_v2.types import storage as storage_type from google.cloud._storage_v2.types.storage import BidiWriteObjectRedirectedError from google.cloud.storage._experimental.asyncio.retry.base_strategy import ( _BaseResumptionStrategy, ) +from google.cloud.storage._experimental.asyncio.retry._helpers import ( + _extract_bidi_writes_redirect_proto, +) + + +_BIDI_WRITE_REDIRECTED_TYPE_URL = ( + "type.googleapis.com/google.storage.v2.BidiWriteObjectRedirectedError" +) class _WriteState: """A helper class to track the state of a single upload operation. - :type spec: :class:`google.cloud.storage_v2.types.AppendObjectSpec` - :param spec: The specification for the object to write. - :type chunk_size: int :param chunk_size: The size of chunks to write to the server. :type user_buffer: IO[bytes] :param user_buffer: The data source. + + :type flush_interval: Optional[int] + :param flush_interval: The flush interval at which the data is flushed. """ def __init__( self, - spec: Union[storage_type.AppendObjectSpec, storage_type.WriteObjectSpec], chunk_size: int, user_buffer: IO[bytes], + flush_interval: Optional[int] = None, ): - self.spec = spec self.chunk_size = chunk_size self.user_buffer = user_buffer self.persisted_size: int = 0 self.bytes_sent: int = 0 + self.bytes_since_last_flush: int = 0 + self.flush_interval: Optional[int] = flush_interval self.write_handle: Union[bytes, storage_type.BidiWriteHandle, None] = None self.routing_token: Optional[str] = None self.is_finalized: bool = False @@ -56,31 +66,14 @@ class _WriteResumptionStrategy(_BaseResumptionStrategy): def generate_requests( self, state: Dict[str, Any] - ) -> Iterable[storage_type.BidiWriteObjectRequest]: + ) -> List[storage_type.BidiWriteObjectRequest]: """Generates BidiWriteObjectRequests to resume or continue the upload. - For Appendable Objects, every stream opening should send an - AppendObjectSpec. If resuming, the `write_handle` is added to that spec. - This method is not applicable for `open` methods. """ write_state: _WriteState = state["write_state"] - initial_request = storage_type.BidiWriteObjectRequest() - - # Determine if we need to send WriteObjectSpec or AppendObjectSpec - if isinstance(write_state.spec, storage_type.WriteObjectSpec): - initial_request.write_object_spec = write_state.spec - else: - if write_state.write_handle: - write_state.spec.write_handle = write_state.write_handle - - if write_state.routing_token: - write_state.spec.routing_token = write_state.routing_token - initial_request.append_object_spec = write_state.spec - - yield initial_request - + requests = [] # The buffer should already be seeked to the correct position (persisted_size) # by the `recover_state_on_failure` method before this is called. while not write_state.is_finalized: @@ -88,7 +81,7 @@ def generate_requests( # End of File detection if not chunk: - return + break checksummed_data = storage_type.ChecksummedData(content=chunk) checksum = google_crc32c.Checksum(chunk) @@ -98,16 +91,28 @@ def generate_requests( write_offset=write_state.bytes_sent, checksummed_data=checksummed_data, ) - write_state.bytes_sent += len(chunk) + chunk_len = len(chunk) + write_state.bytes_sent += chunk_len + write_state.bytes_since_last_flush += chunk_len - yield request + if ( + write_state.flush_interval + and write_state.bytes_since_last_flush >= write_state.flush_interval + ): + request.flush = True + # reset counter after marking flush + write_state.bytes_since_last_flush = 0 + + requests.append(request) + return requests def update_state_from_response( self, response: storage_type.BidiWriteObjectResponse, state: Dict[str, Any] ) -> None: """Processes a server response and updates the write state.""" write_state: _WriteState = state["write_state"] - + if response is None: + return if response.persisted_size: write_state.persisted_size = response.persisted_size @@ -129,18 +134,23 @@ async def recover_state_on_failure( last confirmed 'persisted_size' from the server. """ write_state: _WriteState = state["write_state"] - cause = getattr(error, "cause", error) - # Extract routing token and potentially a new write handle for redirection. - if isinstance(cause, BidiWriteObjectRedirectedError): - if cause.routing_token: - write_state.routing_token = cause.routing_token + redirect_proto = None + + if isinstance(error, BidiWriteObjectRedirectedError): + redirect_proto = error + else: + redirect_proto = _extract_bidi_writes_redirect_proto(error) - redirect_handle = getattr(cause, "write_handle", None) - if redirect_handle: - write_state.write_handle = redirect_handle + # Extract routing token and potentially a new write handle for redirection. + if redirect_proto: + if redirect_proto.routing_token: + write_state.routing_token = redirect_proto.routing_token + if redirect_proto.write_handle: + write_state.write_handle = redirect_proto.write_handle # We must assume any data sent beyond 'persisted_size' was lost. # Reset the user buffer to the last known good byte confirmed by the server. write_state.user_buffer.seek(write_state.persisted_size) write_state.bytes_sent = write_state.persisted_size + write_state.bytes_since_last_flush = 0 diff --git a/tests/conformance/test_bidi_writes.py b/tests/conformance/test_bidi_writes.py new file mode 100644 index 000000000..90dfaf5f8 --- /dev/null +++ b/tests/conformance/test_bidi_writes.py @@ -0,0 +1,267 @@ +import asyncio +import uuid +import grpc +import requests + +from google.api_core import exceptions +from google.auth import credentials as auth_credentials +from google.cloud import _storage_v2 as storage_v2 + +from google.api_core.retry_async import AsyncRetry +from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + AsyncAppendableObjectWriter, +) + +# --- Configuration --- +PROJECT_NUMBER = "12345" # A dummy project number is fine for the testbench. +GRPC_ENDPOINT = "localhost:8888" +HTTP_ENDPOINT = "http://localhost:9000" +CONTENT = b"A" * 1024 * 10 # 10 KB + + +def _is_retryable(exc): + return isinstance( + exc, + ( + exceptions.InternalServerError, + exceptions.ServiceUnavailable, + exceptions.DeadlineExceeded, + exceptions.TooManyRequests, + exceptions.Aborted, # For Redirects + ), + ) + + +async def run_test_scenario( + gapic_client, + http_client, + bucket_name, + object_name, + scenario, +): + """Runs a single fault-injection test scenario.""" + print(f"\n--- RUNNING SCENARIO: {scenario['name']} ---") + retry_count = 0 + + def on_retry_error(exc): + nonlocal retry_count + retry_count += 1 + print(f"Retry attempt {retry_count} triggered by: {type(exc).__name__}") + + custom_retry = AsyncRetry( + predicate=_is_retryable, + on_error=on_retry_error, + initial=0.1, # Short backoff for fast tests + multiplier=1.0, + ) + + use_default = scenario.get("use_default_policy", False) + policy_to_pass = None if use_default else custom_retry + + retry_test_id = None + try: + # 1. Create a Retry Test resource on the testbench. + retry_test_config = { + "instructions": {scenario["method"]: [scenario["instruction"]]}, + "transport": "GRPC", + } + resp = http_client.post(f"{HTTP_ENDPOINT}/retry_test", json=retry_test_config) + resp.raise_for_status() + retry_test_id = resp.json()["id"] + + # 2. Set up writer and metadata for fault injection. + writer = AsyncAppendableObjectWriter( + gapic_client, + bucket_name, + object_name, + ) + fault_injection_metadata = (("x-retry-test-id", retry_test_id),) + + # 3. Execute the write and assert the outcome. + try: + await writer.open( + metadata=fault_injection_metadata, retry_policy=policy_to_pass + ) + await writer.append( + CONTENT, metadata=fault_injection_metadata, retry_policy=policy_to_pass + ) + # await writer.finalize() + await writer.close(finalize_on_close=True) + + # If an exception was expected, this line should not be reached. + if scenario["expected_error"] is not None: + raise AssertionError( + f"Expected exception {scenario['expected_error']} was not raised." + ) + + # 4. Verify the object content. + read_request = storage_v2.ReadObjectRequest( + bucket=f"projects/_/buckets/{bucket_name}", + object=object_name, + ) + read_stream = await gapic_client.read_object(request=read_request) + data = b"" + async for chunk in read_stream: + data += chunk.checksummed_data.content + assert data == CONTENT + if scenario["expected_error"] is None: + # Scenarios like 503, 500, smarter resumption, and redirects + # SHOULD trigger at least one retry attempt. + if not use_default: + assert ( + retry_count > 0 + ), f"Test passed but no retry was actually triggered for {scenario['name']}!" + else: + print("Successfully recovered using library's default policy.") + print(f"Success: {scenario['name']}") + + except Exception as e: + if scenario["expected_error"] is None or not isinstance( + e, scenario["expected_error"] + ): + raise + + if not use_default: + assert ( + retry_count == 0 + ), f"Retry was incorrectly triggered for non-retriable error in {scenario['name']}!" + print(f"Success: caught expected exception for {scenario['name']}: {e}") + + finally: + # 5. Clean up the Retry Test resource. + if retry_test_id: + http_client.delete(f"{HTTP_ENDPOINT}/retry_test/{retry_test_id}") + + +async def main(): + """Main function to set up resources and run all test scenarios.""" + channel = grpc.aio.insecure_channel(GRPC_ENDPOINT) + creds = auth_credentials.AnonymousCredentials() + transport = storage_v2.services.storage.transports.StorageGrpcAsyncIOTransport( + channel=channel, + credentials=creds, + ) + gapic_client = storage_v2.StorageAsyncClient(transport=transport) + http_client = requests.Session() + + bucket_name = f"grpc-test-bucket-{uuid.uuid4().hex[:8]}" + object_name_prefix = "retry-test-object-" + + # Define all test scenarios + test_scenarios = [ + { + "name": "Retry on Service Unavailable (503)", + "method": "storage.objects.insert", + "instruction": "return-503", + "expected_error": None, + }, + { + "name": "Retry on 500", + "method": "storage.objects.insert", + "instruction": "return-500", + "expected_error": None, + }, + { + "name": "Retry on 504", + "method": "storage.objects.insert", + "instruction": "return-504", + "expected_error": None, + }, + { + "name": "Retry on 429", + "method": "storage.objects.insert", + "instruction": "return-429", + "expected_error": None, + }, + { + "name": "Smarter Resumption: Retry 503 after partial data", + "method": "storage.objects.insert", + "instruction": "return-503-after-2K", + "expected_error": None, + }, + { + "name": "Retry on BidiWriteObjectRedirectedError", + "method": "storage.objects.insert", + "instruction": "redirect-send-handle-and-token-tokenval", + "expected_error": None, + }, + { + "name": "Fail on 401", + "method": "storage.objects.insert", + "instruction": "return-401", + "expected_error": exceptions.Unauthorized, + }, + { + "name": "Default Policy: Retry on 503", + "method": "storage.objects.insert", + "instruction": "return-503", + "expected_error": None, + "use_default_policy": True, + }, + { + "name": "Default Policy: Retry on 503", + "method": "storage.objects.insert", + "instruction": "return-500", + "expected_error": None, + "use_default_policy": True, + }, + { + "name": "Default Policy: Retry on BidiWriteObjectRedirectedError", + "method": "storage.objects.insert", + "instruction": "redirect-send-handle-and-token-tokenval", + "expected_error": None, + "use_default_policy": True, + }, + { + "name": "Default Policy: Smarter Ressumption", + "method": "storage.objects.insert", + "instruction": "return-503-after-2K", + "expected_error": None, + "use_default_policy": True, + }, + ] + + try: + bucket_resource = storage_v2.Bucket(project=f"projects/{PROJECT_NUMBER}") + create_bucket_request = storage_v2.CreateBucketRequest( + parent="projects/_", bucket_id=bucket_name, bucket=bucket_resource + ) + await gapic_client.create_bucket(request=create_bucket_request) + + for i, scenario in enumerate(test_scenarios): + object_name = f"{object_name_prefix}{i}" + await run_test_scenario( + gapic_client, + http_client, + bucket_name, + object_name, + scenario, + ) + + except Exception: + import traceback + + traceback.print_exc() + finally: + # Clean up the test bucket. + try: + list_objects_req = storage_v2.ListObjectsRequest( + parent=f"projects/_/buckets/{bucket_name}", + ) + list_objects_res = await gapic_client.list_objects(request=list_objects_req) + async for obj in list_objects_res: + delete_object_req = storage_v2.DeleteObjectRequest( + bucket=f"projects/_/buckets/{bucket_name}", object=obj.name + ) + await gapic_client.delete_object(request=delete_object_req) + + delete_bucket_req = storage_v2.DeleteBucketRequest( + name=f"projects/_/buckets/{bucket_name}" + ) + await gapic_client.delete_bucket(request=delete_bucket_req) + except Exception as e: + print(f"Warning: Cleanup failed: {e}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/unit/asyncio/retry/test_writes_resumption_strategy.py b/tests/unit/asyncio/retry/test_writes_resumption_strategy.py index 7d8b7934e..138035e3b 100644 --- a/tests/unit/asyncio/retry/test_writes_resumption_strategy.py +++ b/tests/unit/asyncio/retry/test_writes_resumption_strategy.py @@ -13,12 +13,13 @@ # limitations under the License. import io -import unittest import unittest.mock as mock from datetime import datetime import pytest import google_crc32c +from google.rpc import status_pb2 +from google.api_core import exceptions from google.cloud._storage_v2.types import storage as storage_type from google.cloud.storage._experimental.asyncio.retry.writes_resumption_strategy import ( @@ -28,136 +29,174 @@ from google.cloud._storage_v2.types.storage import BidiWriteObjectRedirectedError -class TestWriteResumptionStrategy(unittest.TestCase): - def _get_target_class(self): - return _WriteResumptionStrategy +@pytest.fixture +def strategy(): + """Fixture to provide a WriteResumptionStrategy instance.""" + return _WriteResumptionStrategy() - def _make_one(self, *args, **kwargs): - return self._get_target_class()(*args, **kwargs) - def test_ctor(self): - strategy = self._make_one() - self.assertIsInstance(strategy, self._get_target_class()) +class TestWriteResumptionStrategy: + """Test suite for WriteResumptionStrategy.""" - def test_generate_requests_initial_new_object(self): - """ - Verify the initial request sequence for a new upload (WriteObjectSpec). - """ - strategy = self._make_one() - mock_buffer = io.BytesIO(b"0123456789") - # Use WriteObjectSpec for new objects - mock_spec = storage_type.WriteObjectSpec( - resource=storage_type.Object(name="test-object") - ) - state = { - "write_state": _WriteState( - mock_spec, chunk_size=4, user_buffer=mock_buffer - ), - } + # ------------------------------------------------------------------------- + # Tests for generate_requests + # ------------------------------------------------------------------------- - requests = list(strategy.generate_requests(state)) + def test_generate_requests_initial_chunking(self, strategy): + """Verify initial data generation starts at offset 0 and chunks correctly.""" + mock_buffer = io.BytesIO(b"abcdefghij") + write_state = _WriteState(chunk_size=3, user_buffer=mock_buffer) + state = {"write_state": write_state} + + requests = strategy.generate_requests(state) + + # Expected: 4 requests (3, 3, 3, 1) + assert len(requests) == 4 - # Check first request (Spec) - self.assertEqual(requests[0].write_object_spec, mock_spec) - self.assertFalse(requests[0].state_lookup) + # Verify Request 1 + assert requests[0].write_offset == 0 + assert requests[0].checksummed_data.content == b"abc" - # Check data chunks - self.assertEqual(requests[1].write_offset, 0) - self.assertEqual(requests[1].checksummed_data.content, b"0123") - self.assertEqual(requests[2].write_offset, 4) - self.assertEqual(requests[2].checksummed_data.content, b"4567") - self.assertEqual(requests[3].write_offset, 8) - self.assertEqual(requests[3].checksummed_data.content, b"89") + # Verify Request 2 + assert requests[1].write_offset == 3 + assert requests[1].checksummed_data.content == b"def" - # Total requests: 1 Spec + 3 Chunks - self.assertEqual(len(requests), 4) + # Verify Request 3 + assert requests[2].write_offset == 6 + assert requests[2].checksummed_data.content == b"ghi" - def test_generate_requests_initial_existing_object(self): + # Verify Request 4 + assert requests[3].write_offset == 9 + assert requests[3].checksummed_data.content == b"j" + + def test_generate_requests_resumption(self, strategy): """ - Verify the initial request sequence for appending to an existing object (AppendObjectSpec). + Verify request generation when resuming. + The strategy should generate chunks starting from the current 'bytes_sent'. """ - strategy = self._make_one() - mock_buffer = io.BytesIO(b"0123") - # Use AppendObjectSpec for existing objects - mock_spec = storage_type.AppendObjectSpec( - object_="test-object", bucket="test-bucket" - ) - state = { - "write_state": _WriteState( - mock_spec, chunk_size=4, user_buffer=mock_buffer - ), - } + mock_buffer = io.BytesIO(b"0123456789") + write_state = _WriteState(chunk_size=4, user_buffer=mock_buffer) + + # Simulate resumption state: 4 bytes already sent/persisted + write_state.persisted_size = 4 + write_state.bytes_sent = 4 + # Buffer must be seeked to 4 before calling generate + mock_buffer.seek(4) - requests = list(strategy.generate_requests(state)) + state = {"write_state": write_state} - # Check first request (Spec) - self.assertEqual(requests[0].append_object_spec, mock_spec) - self.assertFalse(requests[0].state_lookup) + requests = strategy.generate_requests(state) - # Check data chunk - self.assertEqual(requests[1].write_offset, 0) - self.assertEqual(requests[1].checksummed_data.content, b"0123") + # Since 4 bytes are done, we expect remaining 6 bytes: [4 bytes, 2 bytes] + assert len(requests) == 2 - def test_generate_requests_empty_file(self): - """ - Verify request sequence for an empty file. Should just be the Spec. - """ - strategy = self._make_one() + # Check first generated request starts at offset 4 + assert requests[0].write_offset == 4 + assert requests[0].checksummed_data.content == b"4567" + + # Check second generated request starts at offset 8 + assert requests[1].write_offset == 8 + assert requests[1].checksummed_data.content == b"89" + + def test_generate_requests_empty_file(self, strategy): + """Verify request sequence for an empty file.""" mock_buffer = io.BytesIO(b"") - mock_spec = storage_type.AppendObjectSpec(object_="test-object") - state = { - "write_state": _WriteState( - mock_spec, chunk_size=4, user_buffer=mock_buffer - ), - } + write_state = _WriteState(chunk_size=4, user_buffer=mock_buffer) + state = {"write_state": write_state} - requests = list(strategy.generate_requests(state)) + requests = strategy.generate_requests(state) - self.assertEqual(len(requests), 1) - self.assertEqual(requests[0].append_object_spec, mock_spec) + assert len(requests) == 0 - def test_generate_requests_resumption(self): - """ - Verify request sequence when resuming an upload. - """ - strategy = self._make_one() - mock_buffer = io.BytesIO(b"0123456789") - mock_spec = storage_type.AppendObjectSpec(object_="test-object") + def test_generate_requests_checksum_verification(self, strategy): + """Verify CRC32C is calculated correctly for each chunk.""" + chunk_data = b"test_data" + mock_buffer = io.BytesIO(chunk_data) + write_state = _WriteState(chunk_size=10, user_buffer=mock_buffer) + state = {"write_state": write_state} - write_state = _WriteState(mock_spec, chunk_size=4, user_buffer=mock_buffer) - write_state.persisted_size = 4 - write_state.bytes_sent = 4 - write_state.write_handle = storage_type.BidiWriteHandle(handle=b"test-handle") - mock_buffer.seek(4) + requests = strategy.generate_requests(state) + expected_crc = google_crc32c.Checksum(chunk_data).digest() + expected_int = int.from_bytes(expected_crc, "big") + assert requests[0].checksummed_data.crc32c == expected_int + + def test_generate_requests_flush_logic_exact_interval(self, strategy): + """Verify the flush bit is set exactly when the interval is reached.""" + mock_buffer = io.BytesIO(b"A" * 12) + # 2 byte chunks, flush every 4 bytes + write_state = _WriteState( + chunk_size=2, user_buffer=mock_buffer, flush_interval=4 + ) state = {"write_state": write_state} - requests = list(strategy.generate_requests(state)) + requests = strategy.generate_requests(state) + + # Request index 1 (4 bytes total) should have flush=True + assert requests[0].flush == False + assert requests[1].flush == True + + # Request index 2 (8 bytes total) should have flush=True + assert requests[2].flush == False + assert requests[3].flush == True + + # Request index 3 (12 bytes total) should have flush=True + assert requests[4].flush == False + assert requests[5].flush == True - # Check first request has handle and lookup - self.assertEqual( - requests[0].append_object_spec.write_handle.handle, b"test-handle" + # Verify counter reset in state + assert write_state.bytes_since_last_flush == 0 + + def test_generate_requests_flush_logic_none_interval(self, strategy): + """Verify flush is never set if interval is None.""" + mock_buffer = io.BytesIO(b"A" * 10) + write_state = _WriteState( + chunk_size=2, user_buffer=mock_buffer, flush_interval=None ) + state = {"write_state": write_state} - # Check data starts from offset 4 - self.assertEqual(requests[1].write_offset, 4) - self.assertEqual(requests[1].checksummed_data.content, b"4567") - self.assertEqual(requests[2].write_offset, 8) - self.assertEqual(requests[2].checksummed_data.content, b"89") + requests = strategy.generate_requests(state) + + for req in requests: + assert req.flush == False + + def test_generate_requests_flush_logic_data_less_than_interval(self, strategy): + """Verify flush is not set if data sent is less than interval.""" + mock_buffer = io.BytesIO(b"A" * 5) + # Flush every 10 bytes + write_state = _WriteState( + chunk_size=2, user_buffer=mock_buffer, flush_interval=10 + ) + state = {"write_state": write_state} + + requests = strategy.generate_requests(state) + + # Total 5 bytes < 10 bytes interval + for req in requests: + assert req.flush == False + + assert write_state.bytes_since_last_flush == 5 + + def test_generate_requests_honors_finalized_state(self, strategy): + """If state is already finalized, no requests should be generated.""" + mock_buffer = io.BytesIO(b"data") + write_state = _WriteState(chunk_size=4, user_buffer=mock_buffer) + write_state.is_finalized = True + state = {"write_state": write_state} + + requests = strategy.generate_requests(state) + assert len(requests) == 0 @pytest.mark.asyncio - async def test_generate_requests_after_failure_and_recovery(self): + async def test_generate_requests_after_failure_and_recovery(self, strategy): """ - Verify recovery and resumption flow. + Verify recovery and resumption flow (Integration of recover + generate). """ - strategy = self._make_one() - mock_buffer = io.BytesIO(b"0123456789abcdef") - mock_spec = storage_type.AppendObjectSpec(object_="test-object") - state = { - "write_state": _WriteState(mock_spec, chunk_size=4, user_buffer=mock_buffer) - } - write_state = state["write_state"] + mock_buffer = io.BytesIO(b"0123456789abcdef") # 16 bytes + write_state = _WriteState(chunk_size=4, user_buffer=mock_buffer) + state = {"write_state": write_state} + # Simulate initial progress: sent 8 bytes write_state.bytes_sent = 8 mock_buffer.seek(8) @@ -169,122 +208,147 @@ async def test_generate_requests_after_failure_and_recovery(self): state, ) + # Simulate Failure Triggering Recovery await strategy.recover_state_on_failure(Exception("network error"), state) - self.assertEqual(mock_buffer.tell(), 4) - self.assertEqual(write_state.bytes_sent, 4) + # Assertions after recovery + # 1. Buffer should rewind to persisted_size (4) + assert mock_buffer.tell() == 4 + # 2. bytes_sent should track persisted_size (4) + assert write_state.bytes_sent == 4 - requests = list(strategy.generate_requests(state)) + requests = strategy.generate_requests(state) - self.assertTrue(requests[0].state_lookup) - self.assertEqual( - requests[0].append_object_spec.write_handle.handle, b"handle-1" - ) + # Remaining data from offset 4 to 16 (12 bytes total) + # Chunks: [4-8], [8-12], [12-16] + assert len(requests) == 3 - self.assertEqual(requests[1].write_offset, 4) - self.assertEqual(requests[1].checksummed_data.content, b"4567") + # Verify resumption offset + assert requests[0].write_offset == 4 + assert requests[0].checksummed_data.content == b"4567" - def test_update_state_from_response(self): - """Verify state updates from server responses.""" - strategy = self._make_one() - mock_buffer = io.BytesIO(b"0123456789") - mock_spec = storage_type.AppendObjectSpec(object_="test-object") - state = { - "write_state": _WriteState( - mock_spec, chunk_size=4, user_buffer=mock_buffer - ), - } - write_state = state["write_state"] + # ------------------------------------------------------------------------- + # Tests for update_state_from_response + # ------------------------------------------------------------------------- - response1 = storage_type.BidiWriteObjectResponse( - write_handle=storage_type.BidiWriteHandle(handle=b"handle-1") - ) - strategy.update_state_from_response(response1, state) - self.assertEqual(write_state.write_handle.handle, b"handle-1") + def test_update_state_from_response_all_fields(self, strategy): + """Verify all fields from a BidiWriteObjectResponse update the state.""" + write_state = _WriteState(chunk_size=4, user_buffer=io.BytesIO()) + state = {"write_state": write_state} - response2 = storage_type.BidiWriteObjectResponse(persisted_size=1024) - strategy.update_state_from_response(response2, state) - self.assertEqual(write_state.persisted_size, 1024) + # 1. Update persisted_size + strategy.update_state_from_response( + storage_type.BidiWriteObjectResponse(persisted_size=123), state + ) + assert write_state.persisted_size == 123 - final_resource = storage_type.Object( - name="test-object", bucket="b", size=2048, finalize_time=datetime.now() + # 2. Update write_handle + handle = storage_type.BidiWriteHandle(handle=b"new-handle") + strategy.update_state_from_response( + storage_type.BidiWriteObjectResponse(write_handle=handle), state ) - response3 = storage_type.BidiWriteObjectResponse(resource=final_resource) - strategy.update_state_from_response(response3, state) - self.assertEqual(write_state.persisted_size, 2048) - self.assertTrue(write_state.is_finalized) + assert write_state.write_handle == handle - @pytest.mark.asyncio - async def test_recover_state_on_failure_handles_redirect(self): - """ - Verify redirection error handling. - """ - strategy = self._make_one() - mock_buffer = mock.MagicMock(spec=io.BytesIO) - mock_spec = storage_type.AppendObjectSpec(object_="test-object") + # 3. Update from Resource (finalization) + resource = storage_type.Object(size=1000, finalize_time=datetime.now()) + strategy.update_state_from_response( + storage_type.BidiWriteObjectResponse(resource=resource), state + ) + assert write_state.persisted_size == 1000 + assert write_state.is_finalized - write_state = _WriteState(mock_spec, chunk_size=4, user_buffer=mock_buffer) + def test_update_state_from_response_none(self, strategy): + """Verify None response doesn't crash.""" + write_state = _WriteState(chunk_size=4, user_buffer=io.BytesIO()) state = {"write_state": write_state} + strategy.update_state_from_response(None, state) + assert write_state.persisted_size == 0 - redirect_error = BidiWriteObjectRedirectedError(routing_token="new-token-123") - wrapped_error = Exception("RPC error") - wrapped_error.cause = redirect_error - - await strategy.recover_state_on_failure(wrapped_error, state) - - self.assertEqual(write_state.routing_token, "new-token-123") - mock_buffer.seek.assert_called_with(0) + # ------------------------------------------------------------------------- + # Tests for recover_state_on_failure + # ------------------------------------------------------------------------- @pytest.mark.asyncio - async def test_recover_state_on_failure_handles_redirect_with_handle(self): - """Verify redirection that includes a write handle.""" - strategy = self._make_one() - mock_buffer = mock.MagicMock(spec=io.BytesIO) - mock_spec = storage_type.AppendObjectSpec(object_="test-object") + async def test_recover_state_on_failure_rewind_logic(self, strategy): + """Verify buffer seek and counter resets on generic failure (Non-redirect).""" + mock_buffer = io.BytesIO(b"0123456789") + write_state = _WriteState(chunk_size=2, user_buffer=mock_buffer) - write_state = _WriteState(mock_spec, chunk_size=4, user_buffer=mock_buffer) - state = {"write_state": write_state} + # Simulate progress: sent 8 bytes, but server only persisted 4 + write_state.bytes_sent = 8 + write_state.persisted_size = 4 + write_state.bytes_since_last_flush = 2 + mock_buffer.seek(8) - redirect_error = BidiWriteObjectRedirectedError( - routing_token="new-token-456", write_handle=b"redirect-handle" + # Simulate generic 503 error without trailers + await strategy.recover_state_on_failure( + exceptions.ServiceUnavailable("busy"), {"write_state": write_state} ) - wrapped_error = Exception("RPC error") - wrapped_error.cause = redirect_error - await strategy.recover_state_on_failure(wrapped_error, state) + # Buffer must be seeked back to 4 + assert mock_buffer.tell() == 4 + assert write_state.bytes_sent == 4 + # Flush counter must be reset to avoid incorrect firing after resume + assert write_state.bytes_since_last_flush == 0 - self.assertEqual(write_state.routing_token, "new-token-456") - self.assertEqual(write_state.write_handle, b"redirect-handle") + @pytest.mark.asyncio + async def test_recover_state_on_failure_direct_redirect(self, strategy): + """Verify handling when the error is a BidiWriteObjectRedirectedError.""" + write_state = _WriteState(chunk_size=4, user_buffer=io.BytesIO()) + state = {"write_state": write_state} - mock_buffer.seek.assert_called_with(0) + redirect = BidiWriteObjectRedirectedError( + routing_token="tok-1", write_handle=storage_type.BidiWriteHandle(handle=b"h-1"), + ) - def test_generate_requests_sends_crc32c_checksum(self): - strategy = self._make_one() - mock_buffer = io.BytesIO(b"0123") - mock_spec = storage_type.AppendObjectSpec(object_="test-object") - state = { - "write_state": _WriteState( - mock_spec, chunk_size=4, user_buffer=mock_buffer - ), - } + await strategy.recover_state_on_failure(redirect, state) - requests = list(strategy.generate_requests(state)) + assert write_state.routing_token == "tok-1" + assert write_state.write_handle.handle == b"h-1" - expected_crc = google_crc32c.Checksum(b"0123") - expected_crc_int = int.from_bytes(expected_crc.digest(), "big") - self.assertEqual(requests[1].checksummed_data.crc32c, expected_crc_int) + @pytest.mark.asyncio + async def test_recover_state_on_failure_wrapped_redirect(self, strategy): + """Verify handling when RedirectedError is inside Aborted.errors.""" + write_state = _WriteState(chunk_size=4, user_buffer=io.BytesIO()) - def test_generate_requests_with_routing_token(self): - strategy = self._make_one() - mock_buffer = io.BytesIO(b"") - mock_spec = storage_type.AppendObjectSpec(object_="test-object") + redirect = BidiWriteObjectRedirectedError(routing_token="tok-wrapped") + # google-api-core Aborted often wraps multiple errors + error = exceptions.Aborted("conflict", errors=[redirect]) - write_state = _WriteState(mock_spec, chunk_size=4, user_buffer=mock_buffer) - write_state.routing_token = "redirected-token" - state = {"write_state": write_state} + await strategy.recover_state_on_failure(error, {"write_state": write_state}) - requests = list(strategy.generate_requests(state)) + assert write_state.routing_token == "tok-wrapped" - self.assertEqual( - requests[0].append_object_spec.routing_token, "redirected-token" - ) + @pytest.mark.asyncio + async def test_recover_state_on_failure_trailer_metadata_redirect(self, strategy): + """Verify complex parsing from 'grpc-status-details-bin' in trailers.""" + write_state = _WriteState(chunk_size=4, user_buffer=io.BytesIO()) + + redirect_proto = BidiWriteObjectRedirectedError(routing_token="metadata-token") + status = status_pb2.Status() + detail = status.details.add() + detail.type_url = "type.googleapis.com/google.storage.v2.BidiWriteObjectRedirectedError" + detail.value = BidiWriteObjectRedirectedError.serialize(redirect_proto) + + # FIX: No spec= here, because Aborted doesn't have trailing_metadata in its base definition + mock_error = mock.MagicMock() + mock_error.errors = [] + mock_error.trailing_metadata.return_value = [ + ("grpc-status-details-bin", status.SerializeToString()) + ] + + with mock.patch("google.cloud.storage._experimental.asyncio.retry.writes_resumption_strategy._extract_bidi_writes_redirect_proto", return_value=redirect_proto): + await strategy.recover_state_on_failure(mock_error, {"write_state": write_state}) + + assert write_state.routing_token == "metadata-token" + + def test_write_state_initialization(self): + """Verify WriteState starts with clean counters.""" + buffer = io.BytesIO(b"test") + ws = _WriteState(chunk_size=10, user_buffer=buffer, flush_interval=100) + + assert ws.persisted_size == 0 + assert ws.bytes_sent == 0 + assert ws.bytes_since_last_flush == 0 + assert ws.flush_interval == 100 + assert not ws.is_finalized diff --git a/tests/unit/asyncio/test_async_appendable_object_writer.py b/tests/unit/asyncio/test_async_appendable_object_writer.py index 07f7047d8..9f8fc3c1e 100644 --- a/tests/unit/asyncio/test_async_appendable_object_writer.py +++ b/tests/unit/asyncio/test_async_appendable_object_writer.py @@ -12,23 +12,24 @@ # See the License for the specific language governing permissions and # limitations under the License. -from io import BytesIO +import io +import unittest +import unittest.mock as mock +from unittest.mock import AsyncMock, MagicMock import pytest -from unittest import mock - -from google_crc32c import Checksum from google.api_core import exceptions +from google.rpc import status_pb2 +from google.cloud._storage_v2.types import storage as storage_type +from google.cloud._storage_v2.types.storage import BidiWriteObjectRedirectedError from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( AsyncAppendableObjectWriter, -) -from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + _is_write_retryable, _MAX_CHUNK_SIZE_BYTES, _DEFAULT_FLUSH_INTERVAL_BYTES, ) -from google.cloud import _storage_v2 - +# Constants BUCKET = "test-bucket" OBJECT = "test-object" GENERATION = 123 @@ -37,641 +38,387 @@ EIGHT_MIB = 8 * 1024 * 1024 -@pytest.fixture -def mock_client(): - """Mock the async gRPC client.""" - return mock.AsyncMock() - - -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" -) -def test_init(mock_write_object_stream, mock_client): - """Test the constructor of AsyncAppendableObjectWriter.""" - writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - - assert writer.client == mock_client - assert writer.bucket_name == BUCKET - assert writer.object_name == OBJECT - assert writer.generation is None - assert writer.write_handle is None - assert not writer._is_stream_open - assert writer.offset is None - assert writer.persisted_size is None - assert writer.bytes_appended_since_last_flush == 0 - - mock_write_object_stream.assert_called_once_with( - client=mock_client, - bucket_name=BUCKET, - object_name=OBJECT, - generation_number=None, - write_handle=None, - ) - assert writer.write_obj_stream == mock_write_object_stream.return_value - - -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" -) -def test_init_with_optional_args(mock_write_object_stream, mock_client): - """Test the constructor with optional arguments.""" - writer = AsyncAppendableObjectWriter( - mock_client, - BUCKET, - OBJECT, - generation=GENERATION, - write_handle=WRITE_HANDLE, - ) - - assert writer.generation == GENERATION - assert writer.write_handle == WRITE_HANDLE - assert writer.bytes_appended_since_last_flush == 0 - - mock_write_object_stream.assert_called_once_with( - client=mock_client, - bucket_name=BUCKET, - object_name=OBJECT, - generation_number=GENERATION, - write_handle=WRITE_HANDLE, - ) - - -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" -) -def test_init_with_writer_options(mock_write_object_stream, mock_client): - """Test the constructor with optional arguments.""" - writer = AsyncAppendableObjectWriter( - mock_client, - BUCKET, - OBJECT, - writer_options={"FLUSH_INTERVAL_BYTES": EIGHT_MIB}, - ) - - assert writer.flush_interval == EIGHT_MIB - assert writer.bytes_appended_since_last_flush == 0 - - mock_write_object_stream.assert_called_once_with( - client=mock_client, - bucket_name=BUCKET, - object_name=OBJECT, - generation_number=None, - write_handle=None, - ) - - -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" -) -def test_init_with_flush_interval_less_than_chunk_size_raises_error(mock_client): - """Test that an OutOfRange error is raised if flush_interval is less than the chunk size.""" - - with pytest.raises(exceptions.OutOfRange): - AsyncAppendableObjectWriter( - mock_client, - BUCKET, - OBJECT, - writer_options={"FLUSH_INTERVAL_BYTES": _MAX_CHUNK_SIZE_BYTES - 1}, +class TestIsWriteRetryable: + """Exhaustive tests for retry predicate logic.""" + + def test_standard_transient_errors(self, mock_appendable_writer): + for exc in [ + exceptions.InternalServerError("500"), + exceptions.ServiceUnavailable("503"), + exceptions.DeadlineExceeded("timeout"), + exceptions.TooManyRequests("429"), + ]: + assert _is_write_retryable(exc) + + def test_aborted_with_redirect_proto(self, mock_appendable_writer): + # Direct redirect error wrapped in Aborted + redirect = BidiWriteObjectRedirectedError(routing_token="token") + exc = exceptions.Aborted("aborted", errors=[redirect]) + assert _is_write_retryable(exc) + + def test_aborted_with_trailers(self, mock_appendable_writer): + # Setup Status with Redirect Detail + status = status_pb2.Status() + detail = status.details.add() + detail.type_url = ( + "type.googleapis.com/google.storage.v2.BidiWriteObjectRedirectedError" ) + # Mock error with trailing_metadata method + mock_grpc_error = MagicMock() + mock_grpc_error.trailing_metadata.return_value = [ + ("grpc-status-details-bin", status.SerializeToString()) + ] -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" -) -def test_init_with_flush_interval_not_multiple_of_chunk_size_raises_error(mock_client): - """Test that an OutOfRange error is raised if flush_interval is not a multiple of the chunk size.""" - - with pytest.raises(exceptions.OutOfRange): - AsyncAppendableObjectWriter( - mock_client, - BUCKET, - OBJECT, - writer_options={"FLUSH_INTERVAL_BYTES": _MAX_CHUNK_SIZE_BYTES + 1}, - ) + # Aborted wraps the grpc error + exc = exceptions.Aborted("aborted", errors=[mock_grpc_error]) + assert _is_write_retryable(exc) + def test_aborted_without_metadata(self, mock_appendable_writer): + mock_grpc_error = MagicMock() + mock_grpc_error.trailing_metadata.return_value = [] + exc = exceptions.Aborted("bare aborted", errors=[mock_grpc_error]) + assert not _is_write_retryable(exc) -@mock.patch("google.cloud.storage._experimental.asyncio._utils.google_crc32c") -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client" -) -def test_init_raises_if_crc32c_c_extension_is_missing( - mock_grpc_client, mock_google_crc32c -): - mock_google_crc32c.implementation = "python" - - with pytest.raises(exceptions.FailedPrecondition) as exc_info: - AsyncAppendableObjectWriter(mock_grpc_client, "bucket", "object") + def test_non_retryable_errors(self, mock_appendable_writer): + assert not _is_write_retryable(exceptions.BadRequest("400")) + assert not _is_write_retryable(exceptions.NotFound("404")) - assert "The google-crc32c package is not installed with C support" in str( - exc_info.value - ) - -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" -) -async def test_state_lookup(mock_write_object_stream, mock_client): - """Test state_lookup method.""" - # Arrange - writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - writer._is_stream_open = True - mock_stream = mock_write_object_stream.return_value - mock_stream.send = mock.AsyncMock() - mock_stream.recv = mock.AsyncMock( - return_value=_storage_v2.BidiWriteObjectResponse(persisted_size=PERSISTED_SIZE) +@pytest.fixture +def mock_appendable_writer(): + """Fixture to provide a mock AsyncAppendableObjectWriter setup.""" + mock_client = mock.AsyncMock() + # Internal stream class patch + stream_patcher = mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" ) + mock_stream_cls = stream_patcher.start() + mock_stream = mock_stream_cls.return_value - expected_request = _storage_v2.BidiWriteObjectRequest(state_lookup=True) - - # Act - response = await writer.state_lookup() - - # Assert - mock_stream.send.assert_awaited_once_with(expected_request) - mock_stream.recv.assert_awaited_once() - assert writer.persisted_size == PERSISTED_SIZE - assert response == PERSISTED_SIZE - - -@pytest.mark.asyncio -async def test_state_lookup_without_open_raises_value_error(mock_client): - """Test that state_lookup raises an error if the stream is not open.""" - writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - with pytest.raises( - ValueError, - match="Stream is not open. Call open\\(\\) before state_lookup\\(\\).", - ): - await writer.state_lookup() - + # Configure all async methods explicitly + mock_stream.open = AsyncMock() + mock_stream.close = AsyncMock() + mock_stream.send = AsyncMock() + mock_stream.recv = AsyncMock() -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" -) -async def test_open_appendable_object_writer(mock_write_object_stream, mock_client): - """Test the open method.""" - # Arrange - writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - mock_stream = mock_write_object_stream.return_value - mock_stream.open = mock.AsyncMock() - - mock_stream.generation_number = GENERATION - mock_stream.write_handle = WRITE_HANDLE + # Default mock properties + mock_stream.is_stream_open = False mock_stream.persisted_size = 0 - - # Act - await writer.open() - - # Assert - mock_stream.open.assert_awaited_once() - assert writer._is_stream_open - assert writer.generation == GENERATION - assert writer.write_handle == WRITE_HANDLE - assert writer.persisted_size == 0 - - -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" -) -async def test_open_appendable_object_writer_existing_object( - mock_write_object_stream, mock_client -): - """Test the open method.""" - # Arrange - writer = AsyncAppendableObjectWriter( - mock_client, BUCKET, OBJECT, generation=GENERATION - ) - mock_stream = mock_write_object_stream.return_value - mock_stream.open = mock.AsyncMock() - mock_stream.generation_number = GENERATION mock_stream.write_handle = WRITE_HANDLE - mock_stream.persisted_size = PERSISTED_SIZE - - # Act - await writer.open() - - # Assert - mock_stream.open.assert_awaited_once() - assert writer._is_stream_open - assert writer.generation == GENERATION - assert writer.write_handle == WRITE_HANDLE - assert writer.persisted_size == PERSISTED_SIZE - - -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" -) -async def test_open_when_already_open_raises_error( - mock_write_object_stream, mock_client -): - """Test that opening an already open writer raises a ValueError.""" - # Arrange - writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - writer._is_stream_open = True # Manually set to open - - # Act & Assert - with pytest.raises(ValueError, match="Underlying bidi-gRPC stream is already open"): - await writer.open() + yield { + "mock_client": mock_client, + "mock_stream_cls": mock_stream_cls, + "mock_stream": mock_stream, + "stream_patcher": stream_patcher, + } + + stream_patcher.stop() + + +class TestAsyncAppendableObjectWriter: + def _make_one(self, mock_client, **kwargs): + return AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT, **kwargs) + + # ------------------------------------------------------------------------- + # Initialization & Configuration Tests + # ------------------------------------------------------------------------- + + def test_init_defaults(self, mock_appendable_writer): + writer = self._make_one(mock_appendable_writer["mock_client"]) + assert writer.bucket_name == BUCKET + assert writer.object_name == OBJECT + assert writer.persisted_size is None + assert writer.bytes_appended_since_last_flush == 0 + assert writer.flush_interval == _DEFAULT_FLUSH_INTERVAL_BYTES + + def test_init_with_writer_options(self, mock_appendable_writer): + writer = self._make_one(mock_appendable_writer["mock_client"], writer_options={"FLUSH_INTERVAL_BYTES": EIGHT_MIB}) + assert writer.flush_interval == EIGHT_MIB + + def test_init_validation_chunk_size_raises(self, mock_appendable_writer): + with pytest.raises(exceptions.OutOfRange): + self._make_one( + mock_appendable_writer["mock_client"], + writer_options={"FLUSH_INTERVAL_BYTES": _MAX_CHUNK_SIZE_BYTES - 1} + ) + + def test_init_validation_multiple_raises(self, mock_appendable_writer): + with pytest.raises(exceptions.OutOfRange): + self._make_one( + mock_appendable_writer["mock_client"], + writer_options={"FLUSH_INTERVAL_BYTES": _MAX_CHUNK_SIZE_BYTES + 1} + ) + + def test_init_raises_if_crc32c_missing(self, mock_appendable_writer): + with mock.patch( + "google.cloud.storage._experimental.asyncio._utils.google_crc32c" + ) as mock_crc: + mock_crc.implementation = "python" + with pytest.raises(exceptions.FailedPrecondition): + self._make_one(mock_appendable_writer["mock_client"]) + + # ------------------------------------------------------------------------- + # Stream Lifecycle Tests + # ------------------------------------------------------------------------- + + @pytest.mark.asyncio + async def test_state_lookup_success(self, mock_appendable_writer): + writer = self._make_one(mock_appendable_writer['mock_client']) + writer._is_stream_open = True + writer.write_obj_stream = mock_appendable_writer['mock_stream'] + + mock_appendable_writer['mock_stream'].recv.return_value = storage_type.BidiWriteObjectResponse( + persisted_size=100 + ) -@pytest.mark.asyncio -async def test_unimplemented_methods_raise_error(mock_client): - """Test that all currently unimplemented methods raise NotImplementedError.""" - writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + size = await writer.state_lookup() - with pytest.raises(NotImplementedError): - await writer.append_from_string("data") + mock_appendable_writer['mock_stream'].send.assert_awaited_once() + assert size == 100 + assert writer.persisted_size == 100 - with pytest.raises(NotImplementedError): - await writer.append_from_stream(mock.Mock()) + @pytest.mark.asyncio + async def test_state_lookup_raises_if_not_open(self, mock_appendable_writer): + writer = self._make_one(mock_appendable_writer['mock_client']) + with pytest.raises(ValueError, match="Stream is not open"): + await writer.state_lookup() + @pytest.mark.asyncio + async def test_open_success(self, mock_appendable_writer): + writer = self._make_one(mock_appendable_writer['mock_client']) + mock_appendable_writer['mock_stream'].generation_number = 456 + mock_appendable_writer['mock_stream'].write_handle = b"new-h" + mock_appendable_writer['mock_stream'].persisted_size = 0 -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" -) -async def test_flush(mock_write_object_stream, mock_client): - """Test that flush sends the correct request and updates state.""" - writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - writer._is_stream_open = True - mock_stream = mock_write_object_stream.return_value - mock_stream.send = mock.AsyncMock() - mock_stream.recv = mock.AsyncMock( - return_value=_storage_v2.BidiWriteObjectResponse(persisted_size=1024) - ) - - persisted_size = await writer.flush() + await writer.open() - expected_request = _storage_v2.BidiWriteObjectRequest(flush=True, state_lookup=True) - mock_stream.send.assert_awaited_once_with(expected_request) - mock_stream.recv.assert_awaited_once() - assert writer.persisted_size == 1024 - assert writer.offset == 1024 - assert persisted_size == 1024 + assert writer._is_stream_open + assert writer.generation == 456 + assert writer.write_handle == b"new-h" + mock_appendable_writer['mock_stream'].open.assert_awaited_once() + + @pytest.mark.asyncio + async def test_open_already_open_raises(self, mock_appendable_writer): + writer = self._make_one(mock_appendable_writer['mock_client']) + writer._is_stream_open = True + with pytest.raises(ValueError, match="already open"): + await writer.open() + + def test_on_open_error_redirection(self, mock_appendable_writer): + """Verify redirect info is extracted from helper.""" + writer = self._make_one(mock_appendable_writer['mock_client']) + redirect = BidiWriteObjectRedirectedError( + routing_token="rt1", + write_handle=storage_type.BidiWriteHandle(handle=b"h1"), + generation=777, + ) + with mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._extract_bidi_writes_redirect_proto", + return_value=redirect, + ): + writer._on_open_error(exceptions.Aborted("redirect")) + + assert writer._routing_token == "rt1" + assert writer.write_handle.handle == b"h1" + assert writer.generation == 777 + + # ------------------------------------------------------------------------- + # Append & Integration Tests + # ------------------------------------------------------------------------- + + @pytest.mark.asyncio + async def test_append_integration_basic(self, mock_appendable_writer): + """Verify append orchestrates manager and drives the internal generator.""" + writer = self._make_one(mock_appendable_writer['mock_client']) + writer._is_stream_open = True + writer.write_obj_stream = mock_appendable_writer['mock_stream'] + writer.persisted_size = 0 + + data = b"test-data" + + with mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._BidiStreamRetryManager" + ) as MockManager: + + async def mock_execute(state, policy): + factory = MockManager.call_args[0][1] + dummy_reqs = [storage_type.BidiWriteObjectRequest()] + gen = factory(dummy_reqs, state) + + mock_appendable_writer['mock_stream'].recv.side_effect = [ + storage_type.BidiWriteObjectResponse( + persisted_size=len(data), + write_handle=storage_type.BidiWriteHandle(handle=b"h2"), + ), + None, + ] + async for _ in gen: + pass + + MockManager.return_value.execute.side_effect = mock_execute + await writer.append(data) + + assert writer.persisted_size == len(data) + sent_req = mock_appendable_writer['mock_stream'].send.call_args[0][0] + assert sent_req.state_lookup + assert sent_req.flush + + @pytest.mark.asyncio + async def test_append_recovery_reopens_stream(self, mock_appendable_writer): + """Verifies re-opening logic on retry.""" + writer = self._make_one(mock_appendable_writer['mock_client'], write_handle=b"h1") + writer._is_stream_open = True + writer.write_obj_stream = mock_appendable_writer['mock_stream'] + # Setup mock to allow close() call + mock_appendable_writer['mock_stream'].is_stream_open = True + + async def mock_open(metadata=None): + writer.write_obj_stream = mock_appendable_writer['mock_stream'] + writer._is_stream_open = True + writer.persisted_size = 5 + writer.write_handle = b"h_recovered" + + with mock.patch.object( + writer, "open", side_effect=mock_open + ) as mock_writer_open: + with mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._BidiStreamRetryManager" + ) as MockManager: + + async def mock_execute(state, policy): + factory = MockManager.call_args[0][1] + # Simulate Attempt 1 fail + gen1 = factory([], state) + try: + await gen1.__anext__() + except Exception: + pass + # Simulate Attempt 2 + gen2 = factory([], state) + mock_appendable_writer['mock_stream'].recv.return_value = None + async for _ in gen2: + pass + + MockManager.return_value.execute.side_effect = mock_execute + await writer.append(b"0123456789") + + mock_appendable_writer['mock_stream'].close.assert_awaited() + mock_writer_open.assert_awaited() + assert writer.persisted_size == 5 + + @pytest.mark.asyncio + async def test_append_unimplemented_string_raises(self, mock_appendable_writer): + writer = self._make_one(mock_appendable_writer['mock_client']) + with pytest.raises(NotImplementedError): + await writer.append_from_string("test") + + # ------------------------------------------------------------------------- + # Flush, Close, Finalize + # ------------------------------------------------------------------------- + + @pytest.mark.asyncio + async def test_flush_resets_counters(self, mock_appendable_writer): + writer = self._make_one(mock_appendable_writer['mock_client']) + writer._is_stream_open = True + writer.write_obj_stream = mock_appendable_writer['mock_stream'] + writer.bytes_appended_since_last_flush = 100 + + mock_appendable_writer['mock_stream'].recv.return_value = storage_type.BidiWriteObjectResponse( + persisted_size=200 + ) -@pytest.mark.asyncio -async def test_flush_without_open_raises_value_error(mock_client): - """Test that flush raises an error if the stream is not open.""" - writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - with pytest.raises( - ValueError, match="Stream is not open. Call open\\(\\) before flush\\(\\)." - ): await writer.flush() + assert writer.bytes_appended_since_last_flush == 0 + assert writer.persisted_size == 200 -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" -) -async def test_simple_flush(mock_write_object_stream, mock_client): - """Test that flush sends the correct request and updates state.""" - # Arrange - writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - writer._is_stream_open = True - mock_stream = mock_write_object_stream.return_value - mock_stream.send = mock.AsyncMock() - - # Act - await writer.simple_flush() - - # Assert - mock_stream.send.assert_awaited_once_with( - _storage_v2.BidiWriteObjectRequest(flush=True) - ) - + @pytest.mark.asyncio + async def test_simple_flush(self, mock_appendable_writer): + writer = self._make_one(mock_appendable_writer['mock_client']) + writer._is_stream_open = True + writer.write_obj_stream = mock_appendable_writer['mock_stream'] + writer.bytes_appended_since_last_flush = 50 -@pytest.mark.asyncio -async def test_simple_flush_without_open_raises_value_error(mock_client): - """Test that flush raises an error if the stream is not open.""" - writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - with pytest.raises( - ValueError, - match="Stream is not open. Call open\\(\\) before simple_flush\\(\\).", - ): await writer.simple_flush() - -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" -) -async def test_close(mock_write_object_stream, mock_client): - writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - writer._is_stream_open = True - writer.offset = 1024 - writer.persisted_size = 1024 - mock_stream = mock_write_object_stream.return_value - mock_stream.send = mock.AsyncMock() - mock_stream.recv = mock.AsyncMock( - return_value=_storage_v2.BidiWriteObjectResponse(persisted_size=1024) - ) - mock_stream.close = mock.AsyncMock() - writer.finalize = mock.AsyncMock() - - persisted_size = await writer.close() - - writer.finalize.assert_not_awaited() - mock_stream.close.assert_awaited_once() - assert writer.offset is None - assert persisted_size == 1024 - assert not writer._is_stream_open - - -@pytest.mark.asyncio -async def test_close_without_open_raises_value_error(mock_client): - """Test that close raises an error if the stream is not open.""" - writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - with pytest.raises( - ValueError, match="Stream is not open. Call open\\(\\) before close\\(\\)." - ): - await writer.close() - - -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" -) -async def test_finalize_on_close(mock_write_object_stream, mock_client): - """Test close with finalizing.""" - # Arrange - mock_resource = _storage_v2.Object(name=OBJECT, bucket=BUCKET, size=2048) - writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - writer._is_stream_open = True - writer.offset = 1024 - mock_stream = mock_write_object_stream.return_value - mock_stream.send = mock.AsyncMock() - mock_stream.recv = mock.AsyncMock( - return_value=_storage_v2.BidiWriteObjectResponse(resource=mock_resource) - ) - mock_stream.close = mock.AsyncMock() - - # Act - result = await writer.close(finalize_on_close=True) - - # Assert - mock_stream.close.assert_awaited_once() - assert not writer._is_stream_open - assert writer.offset is None - assert writer.object_resource == mock_resource - assert writer.persisted_size == 2048 - assert result == mock_resource - - -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" -) -async def test_finalize(mock_write_object_stream, mock_client): - """Test that finalize sends the correct request and updates state.""" - writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - writer._is_stream_open = True - mock_resource = _storage_v2.Object(name=OBJECT, bucket=BUCKET, size=123) - mock_stream = mock_write_object_stream.return_value - mock_stream.send = mock.AsyncMock() - mock_stream.recv = mock.AsyncMock( - return_value=_storage_v2.BidiWriteObjectResponse(resource=mock_resource) - ) - mock_stream.close = mock.AsyncMock() - - gcs_object = await writer.finalize() - - mock_stream.send.assert_awaited_once_with( - _storage_v2.BidiWriteObjectRequest(finish_write=True) - ) - mock_stream.recv.assert_awaited_once() - mock_stream.close.assert_awaited_once() - assert writer.object_resource == mock_resource - assert writer.persisted_size == 123 - assert gcs_object == mock_resource - assert writer._is_stream_open is False - assert writer.offset is None - - -@pytest.mark.asyncio -async def test_finalize_without_open_raises_value_error(mock_client): - """Test that finalize raises an error if the stream is not open.""" - writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - with pytest.raises( - ValueError, match="Stream is not open. Call open\\(\\) before finalize\\(\\)." - ): - await writer.finalize() - - -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" -) -async def test_append_raises_error_if_not_open(mock_write_object_stream, mock_client): - """Test that append raises an error if the stream is not open.""" - writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - with pytest.raises( - ValueError, match="Stream is not open. Call open\\(\\) before append\\(\\)." - ): - await writer.append(b"some data") - - -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" -) -async def test_append_with_empty_data(mock_write_object_stream, mock_client): - """Test that append does nothing if data is empty.""" - writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - writer._is_stream_open = True - mock_stream = mock_write_object_stream.return_value - mock_stream.send = mock.AsyncMock() - - await writer.append(b"") - - mock_stream.send.assert_not_awaited() - - -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" -) -async def test_append_sends_data_in_chunks(mock_write_object_stream, mock_client): - """Test that append sends data in chunks and updates offset.""" - from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( - _MAX_CHUNK_SIZE_BYTES, - ) - - writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - writer._is_stream_open = True - writer.persisted_size = 100 - mock_stream = mock_write_object_stream.return_value - mock_stream.send = mock.AsyncMock() - - data = b"a" * (_MAX_CHUNK_SIZE_BYTES + 1) - mock_stream.recv = mock.AsyncMock( - return_value=_storage_v2.BidiWriteObjectResponse( - persisted_size=100 + len(data) + mock_appendable_writer['mock_stream'].send.assert_awaited_with( + storage_type.BidiWriteObjectRequest(flush=True) ) - ) - - await writer.append(data) - - assert mock_stream.send.await_count == 2 - first_request = mock_stream.send.await_args_list[0].args[0] - second_request = mock_stream.send.await_args_list[1].args[0] - - # First chunk - assert first_request.write_offset == 100 - assert len(first_request.checksummed_data.content) == _MAX_CHUNK_SIZE_BYTES - assert first_request.checksummed_data.crc32c == int.from_bytes( - Checksum(data[:_MAX_CHUNK_SIZE_BYTES]).digest(), byteorder="big" - ) - assert not first_request.flush - assert not first_request.state_lookup - - # Second chunk (last chunk) - assert second_request.write_offset == 100 + _MAX_CHUNK_SIZE_BYTES - assert len(second_request.checksummed_data.content) == 1 - assert second_request.checksummed_data.crc32c == int.from_bytes( - Checksum(data[_MAX_CHUNK_SIZE_BYTES:]).digest(), byteorder="big" - ) - assert second_request.flush - assert second_request.state_lookup - - assert writer.offset == 100 + len(data) - - -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" -) -async def test_append_flushes_when_buffer_is_full( - mock_write_object_stream, mock_client -): - """Test that append flushes the stream when the buffer size is reached.""" - - writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - writer._is_stream_open = True - writer.persisted_size = 0 - mock_stream = mock_write_object_stream.return_value - mock_stream.send = mock.AsyncMock() - mock_stream.recv = mock.AsyncMock() - - data = b"a" * _DEFAULT_FLUSH_INTERVAL_BYTES - await writer.append(data) - - num_chunks = _DEFAULT_FLUSH_INTERVAL_BYTES // _MAX_CHUNK_SIZE_BYTES - assert mock_stream.send.await_count == num_chunks - - # All but the last request should not have flush or state_lookup set. - for i in range(num_chunks - 1): - request = mock_stream.send.await_args_list[i].args[0] - assert not request.flush - assert not request.state_lookup - - # The last request should have flush and state_lookup set. - last_request = mock_stream.send.await_args_list[-1].args[0] - assert last_request.flush - assert last_request.state_lookup - assert writer.bytes_appended_since_last_flush == 0 - - -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" -) -async def test_append_handles_large_data(mock_write_object_stream, mock_client): - """Test that append handles data larger than the buffer size.""" - - writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - writer._is_stream_open = True - writer.persisted_size = 0 - mock_stream = mock_write_object_stream.return_value - mock_stream.send = mock.AsyncMock() - mock_stream.recv = mock.AsyncMock() - - data = b"a" * (_DEFAULT_FLUSH_INTERVAL_BYTES * 2 + 1) - await writer.append(data) - - flushed_requests = [ - call.args[0] for call in mock_stream.send.await_args_list if call.args[0].flush - ] - assert len(flushed_requests) == 3 - - last_request = mock_stream.send.await_args_list[-1].args[0] - assert last_request.state_lookup - - -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" -) -async def test_append_data_two_times(mock_write_object_stream, mock_client): - """Test that append sends data correctly when called multiple times.""" - from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( - _MAX_CHUNK_SIZE_BYTES, - ) - - writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - writer._is_stream_open = True - writer.persisted_size = 0 - mock_stream = mock_write_object_stream.return_value - mock_stream.send = mock.AsyncMock() - - data1 = b"a" * (_MAX_CHUNK_SIZE_BYTES + 10) - mock_stream.recv = mock.AsyncMock( - return_value=_storage_v2.BidiWriteObjectResponse( - persisted_size= len(data1) + assert writer.bytes_appended_since_last_flush == 0 + + @pytest.mark.asyncio + async def test_close_without_finalize(self, mock_appendable_writer): + writer = self._make_one(mock_appendable_writer['mock_client']) + writer._is_stream_open = True + writer.write_obj_stream = mock_appendable_writer['mock_stream'] + writer.persisted_size = 50 + + size = await writer.close() + + mock_appendable_writer['mock_stream'].close.assert_awaited() + assert not writer._is_stream_open + assert size == 50 + + @pytest.mark.asyncio + async def test_finalize_lifecycle(self, mock_appendable_writer): + writer = self._make_one(mock_appendable_writer['mock_client']) + writer._is_stream_open = True + writer.write_obj_stream = mock_appendable_writer['mock_stream'] + + resource = storage_type.Object(size=999) + mock_appendable_writer['mock_stream'].recv.return_value = storage_type.BidiWriteObjectResponse( + resource=resource ) - ) - await writer.append(data1) - assert mock_stream.send.await_count == 2 - last_request_data1 = mock_stream.send.await_args_list[-1].args[0] - assert last_request_data1.flush - assert last_request_data1.state_lookup + res = await writer.finalize() - data2 = b"b" * (_MAX_CHUNK_SIZE_BYTES + 20) - mock_stream.recv = mock.AsyncMock( - return_value=_storage_v2.BidiWriteObjectResponse( - persisted_size= len(data2) + len(data1) + assert res == resource + assert writer.persisted_size == 999 + mock_appendable_writer['mock_stream'].send.assert_awaited_with( + storage_type.BidiWriteObjectRequest(finish_write=True) ) - ) - await writer.append(data2) - - assert mock_stream.send.await_count == 4 - last_request_data2 = mock_stream.send.await_args_list[-1].args[0] - assert last_request_data2.flush - assert last_request_data2.state_lookup - - total_data_length = len(data1) + len(data2) - assert writer.offset == total_data_length - - -@pytest.mark.asyncio -@pytest.mark.parametrize( - "file_size, block_size", - [ - (10, 4 * 1024), - (0, _DEFAULT_FLUSH_INTERVAL_BYTES), - (20 * 1024 * 1024, _DEFAULT_FLUSH_INTERVAL_BYTES), - (16 * 1024 * 1024, _DEFAULT_FLUSH_INTERVAL_BYTES), - ], -) -async def test_append_from_file(file_size, block_size, mock_client): - # arrange - fp = BytesIO(b"a" * file_size) - writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - writer._is_stream_open = True - writer.append = mock.AsyncMock() - - # act - await writer.append_from_file(fp, block_size=block_size) - - # assert - exepected_calls = ( - file_size // block_size - if file_size % block_size == 0 - else file_size // block_size + 1 - ) - assert writer.append.await_count == exepected_calls + mock_appendable_writer['mock_stream'].close.assert_awaited() + assert not writer._is_stream_open + + @pytest.mark.asyncio + async def test_close_with_finalize_on_close(self, mock_appendable_writer): + writer = self._make_one(mock_appendable_writer['mock_client']) + writer._is_stream_open = True + writer.finalize = AsyncMock() + + await writer.close(finalize_on_close=True) + writer.finalize.assert_awaited_once() + + # ------------------------------------------------------------------------- + # Helper Integration Tests + # ------------------------------------------------------------------------- + + @pytest.mark.asyncio + async def test_append_from_file_integration(self, mock_appendable_writer): + writer = self._make_one(mock_appendable_writer['mock_client']) + writer._is_stream_open = True + writer.append = AsyncMock() + + fp = io.BytesIO(b"a" * 12) + await writer.append_from_file(fp, block_size=4) + + assert writer.append.await_count == 3 + + @pytest.mark.asyncio + async def test_methods_require_open_stream_raises(self, mock_appendable_writer): + writer = self._make_one(mock_appendable_writer['mock_client']) + methods = [ + writer.append(b"data"), + writer.flush(), + writer.simple_flush(), + writer.close(), + writer.finalize(), + writer.state_lookup(), + ] + for coro in methods: + with pytest.raises(ValueError, match="Stream is not open"): + await coro diff --git a/tests/unit/asyncio/test_async_grpc_client.py b/tests/unit/asyncio/test_async_grpc_client.py index eb06ab938..7321f99ad 100644 --- a/tests/unit/asyncio/test_async_grpc_client.py +++ b/tests/unit/asyncio/test_async_grpc_client.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import unittest +import pytest from unittest import mock from google.auth import credentials as auth_credentials from google.auth.credentials import AnonymousCredentials @@ -24,7 +24,7 @@ def _make_credentials(spec=None): return mock.Mock(spec=spec) -class TestAsyncGrpcClient(unittest.TestCase): +class TestAsyncGrpcClient: @mock.patch("google.cloud._storage_v2.StorageAsyncClient") def test_constructor_default_options(self, mock_async_storage_client): from google.cloud.storage._experimental.asyncio import async_grpc_client @@ -111,7 +111,7 @@ def test_grpc_client_property(self, mock_grpc_gapic_client): client_info=mock_client_info, client_options=mock_client_options, ) - self.assertIs(retrieved_client, mock_grpc_gapic_client.return_value) + assert retrieved_client is mock_grpc_gapic_client.return_value @mock.patch("google.cloud._storage_v2.StorageAsyncClient") def test_grpc_client_with_anon_creds(self, mock_grpc_gapic_client): @@ -131,7 +131,7 @@ def test_grpc_client_with_anon_creds(self, mock_grpc_gapic_client): retrieved_client = client.grpc_client # Assert - self.assertIs(retrieved_client, mock_grpc_gapic_client.return_value) + assert retrieved_client is mock_grpc_gapic_client.return_value mock_transport_cls.create_channel.assert_called_once_with( attempt_direct_path=True, credentials=anonymous_creds diff --git a/tests/unit/asyncio/test_async_write_object_stream.py b/tests/unit/asyncio/test_async_write_object_stream.py index c6ea8a8ff..aec0b3794 100644 --- a/tests/unit/asyncio/test_async_write_object_stream.py +++ b/tests/unit/asyncio/test_async_write_object_stream.py @@ -12,10 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import unittest.mock as mock +from unittest.mock import AsyncMock, MagicMock import pytest -from unittest import mock -from unittest.mock import AsyncMock from google.cloud.storage._experimental.asyncio.async_write_object_stream import ( _AsyncWriteObjectStream, ) @@ -25,372 +25,193 @@ OBJECT = "my-object" GENERATION = 12345 WRITE_HANDLE = b"test-handle" +FULL_BUCKET_PATH = f"projects/_/buckets/{BUCKET}" @pytest.fixture def mock_client(): - """Mock the async gRPC client.""" - mock_transport = mock.AsyncMock() + """Fixture to provide a mock gRPC client.""" + client = MagicMock() + # Mocking transport internal structures + mock_transport = MagicMock() mock_transport.bidi_write_object = mock.sentinel.bidi_write_object mock_transport._wrapped_methods = { mock.sentinel.bidi_write_object: mock.sentinel.wrapped_bidi_write_object } - - mock_gapic_client = mock.AsyncMock() - mock_gapic_client._transport = mock_transport - - client = mock.AsyncMock() - client._client = mock_gapic_client + client._client._transport = mock_transport return client -async def instantiate_write_obj_stream(mock_client, mock_cls_async_bidi_rpc, open=True): - """Helper to create an instance of _AsyncWriteObjectStream and open it by default.""" - socket_like_rpc = AsyncMock() - mock_cls_async_bidi_rpc.return_value = socket_like_rpc - socket_like_rpc.open = AsyncMock() - socket_like_rpc.send = AsyncMock() - socket_like_rpc.close = AsyncMock() - - mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse) - mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) - mock_response.resource.generation = GENERATION - mock_response.resource.size = 0 - mock_response.write_handle = WRITE_HANDLE - socket_like_rpc.recv = AsyncMock(return_value=mock_response) - - write_obj_stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) - - if open: - await write_obj_stream.open() - - return write_obj_stream - - -def test_async_write_object_stream_init(mock_client): - """Test the constructor of _AsyncWriteObjectStream.""" - stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) - - assert stream.client == mock_client - assert stream.bucket_name == BUCKET - assert stream.object_name == OBJECT - assert stream.generation_number is None - assert stream.write_handle is None - assert stream._full_bucket_name == f"projects/_/buckets/{BUCKET}" - assert stream.rpc == mock.sentinel.wrapped_bidi_write_object - assert stream.metadata == ( - ("x-goog-request-params", f"bucket=projects/_/buckets/{BUCKET}"), - ) - assert stream.socket_like_rpc is None - assert not stream._is_stream_open - assert stream.first_bidi_write_req is None - assert stream.persisted_size == 0 - assert stream.object_resource is None - - -def test_async_write_object_stream_init_with_generation_and_handle(mock_client): - """Test the constructor with optional arguments.""" - generation = 12345 - write_handle = b"test-handle" - stream = _AsyncWriteObjectStream( - mock_client, - BUCKET, - OBJECT, - generation_number=generation, - write_handle=write_handle, - ) - - assert stream.generation_number == generation - assert stream.write_handle == write_handle +class TestAsyncWriteObjectStream: + """Test suite for AsyncWriteObjectStream.""" + # ------------------------------------------------------------------------- + # Initialization Tests + # ------------------------------------------------------------------------- -def test_async_write_object_stream_init_raises_value_error(): - """Test that the constructor raises ValueError for missing arguments.""" - with pytest.raises(ValueError, match="client must be provided"): - _AsyncWriteObjectStream(None, BUCKET, OBJECT) + def test_init_basic(self, mock_client): + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + assert stream.bucket_name == BUCKET + assert stream.object_name == OBJECT + assert stream._full_bucket_name == FULL_BUCKET_PATH + assert stream.metadata == (("x-goog-request-params", f"bucket={FULL_BUCKET_PATH}"),) + assert not stream.is_stream_open - with pytest.raises(ValueError, match="bucket_name must be provided"): - _AsyncWriteObjectStream(mock.Mock(), None, OBJECT) + def test_init_raises_value_error(self, mock_client): + with pytest.raises(ValueError, match="client must be provided"): + _AsyncWriteObjectStream(None, BUCKET, OBJECT) + with pytest.raises(ValueError, match="bucket_name must be provided"): + _AsyncWriteObjectStream(mock_client, None, OBJECT) + with pytest.raises(ValueError, match="object_name must be provided"): + _AsyncWriteObjectStream(mock_client, BUCKET, None) - with pytest.raises(ValueError, match="object_name must be provided"): - _AsyncWriteObjectStream(mock.Mock(), BUCKET, None) + # ------------------------------------------------------------------------- + # Open Stream Tests + # ------------------------------------------------------------------------- - -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" -) -async def test_open_for_new_object(mock_async_bidi_rpc, mock_client): - """Test opening a stream for a new object.""" - # Arrange - socket_like_rpc = mock.AsyncMock() - mock_async_bidi_rpc.return_value = socket_like_rpc - socket_like_rpc.open = mock.AsyncMock() - - mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse) - mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) - mock_response.resource.generation = GENERATION - mock_response.resource.size = 0 - mock_response.write_handle = WRITE_HANDLE - socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response) - - stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) - - # Act - await stream.open() - - # Assert - assert stream._is_stream_open - socket_like_rpc.open.assert_called_once() - socket_like_rpc.recv.assert_called_once() - assert stream.generation_number == GENERATION - assert stream.write_handle == WRITE_HANDLE - assert stream.persisted_size == 0 - - -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" -) -async def test_open_for_existing_object(mock_async_bidi_rpc, mock_client): - """Test opening a stream for an existing object.""" - # Arrange - socket_like_rpc = mock.AsyncMock() - mock_async_bidi_rpc.return_value = socket_like_rpc - socket_like_rpc.open = mock.AsyncMock() - - mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse) - mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) - mock_response.resource.size = 1024 - mock_response.resource.generation = GENERATION - mock_response.write_handle = WRITE_HANDLE - socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response) - - stream = _AsyncWriteObjectStream( - mock_client, BUCKET, OBJECT, generation_number=GENERATION + @mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" ) - - # Act - await stream.open() - - # Assert - assert stream._is_stream_open - socket_like_rpc.open.assert_called_once() - socket_like_rpc.recv.assert_called_once() - assert stream.generation_number == GENERATION - assert stream.write_handle == WRITE_HANDLE - assert stream.persisted_size == 1024 - - -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" -) -async def test_open_when_already_open_raises_error(mock_async_bidi_rpc, mock_client): - """Test that opening an already open stream raises a ValueError.""" - # Arrange - socket_like_rpc = mock.AsyncMock() - mock_async_bidi_rpc.return_value = socket_like_rpc - socket_like_rpc.open = mock.AsyncMock() - - mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse) - mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) - mock_response.resource.generation = GENERATION - mock_response.resource.size = 0 - mock_response.write_handle = WRITE_HANDLE - socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response) - - stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) - await stream.open() - - # Act & Assert - with pytest.raises(ValueError, match="Stream is already open"): + @pytest.mark.asyncio + async def test_open_new_object(self, mock_rpc_cls, mock_client): + mock_rpc = mock_rpc_cls.return_value + mock_rpc.open = AsyncMock() + + # We don't use spec here to avoid descriptor issues with nested protos + mock_response = MagicMock() + mock_response.persisted_size = 0 + mock_response.resource.generation = GENERATION + mock_response.resource.size = 0 + mock_response.write_handle = WRITE_HANDLE + mock_rpc.recv = AsyncMock(return_value=mock_response) + + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) await stream.open() + # Check if BidiRpc was initialized with WriteObjectSpec + call_args = mock_rpc_cls.call_args + initial_request = call_args.kwargs["initial_request"] + assert initial_request.write_object_spec is not None + assert initial_request.write_object_spec.resource.name == OBJECT + assert initial_request.write_object_spec.appendable -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" -) -async def test_open_raises_error_on_missing_object_resource( - mock_async_bidi_rpc, mock_client -): - """Test that open raises ValueError if object_resource is not in the response.""" - socket_like_rpc = mock.AsyncMock() - mock_async_bidi_rpc.return_value = socket_like_rpc - - mock_reponse = mock.AsyncMock() - type(mock_reponse).resource = mock.PropertyMock(return_value=None) - socket_like_rpc.recv.return_value = mock_reponse - - # Note: Don't use below code as unittest library automatically assigns an - # `AsyncMock` object to an attribute, if not set. - # socket_like_rpc.recv.return_value = mock.AsyncMock( - # return_value=_storage_v2.BidiWriteObjectResponse(resource=None) - # ) - - stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) - with pytest.raises( - ValueError, match="Failed to obtain object resource after opening the stream" - ): - await stream.open() + assert stream.is_stream_open + assert stream.write_handle == WRITE_HANDLE + assert stream.generation_number == GENERATION - -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" -) -async def test_open_raises_error_on_missing_generation( - mock_async_bidi_rpc, mock_client -): - """Test that open raises ValueError if generation is not in the response.""" - socket_like_rpc = mock.AsyncMock() - mock_async_bidi_rpc.return_value = socket_like_rpc - - # Configure the mock response object - mock_response = mock.AsyncMock() - type(mock_response.resource).generation = mock.PropertyMock(return_value=None) - socket_like_rpc.recv.return_value = mock_response - - stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) - with pytest.raises( - ValueError, match="Failed to obtain object generation after opening the stream" - ): - await stream.open() - - -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" -) -async def test_open_raises_error_on_missing_write_handle( - mock_async_bidi_rpc, mock_client -): - """Test that open raises ValueError if write_handle is not in the response.""" - socket_like_rpc = mock.AsyncMock() - mock_async_bidi_rpc.return_value = socket_like_rpc - socket_like_rpc.recv = mock.AsyncMock( - return_value=_storage_v2.BidiWriteObjectResponse( - resource=_storage_v2.Object(generation=GENERATION), write_handle=None - ) + @mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" ) - stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) - with pytest.raises(ValueError, match="Failed to obtain write_handle"): + @pytest.mark.asyncio + async def test_open_existing_object_with_token(self, mock_rpc_cls, mock_client): + mock_rpc = mock_rpc_cls.return_value + mock_rpc.open = AsyncMock() + + # Ensure resource is None so persisted_size logic doesn't get overwritten by child mocks + mock_response = MagicMock() + mock_response.persisted_size = 1024 + mock_response.resource = None + mock_response.write_handle = WRITE_HANDLE + mock_rpc.recv = AsyncMock(return_value=mock_response) + + stream = _AsyncWriteObjectStream( + mock_client, + BUCKET, + OBJECT, + generation_number=GENERATION, + routing_token="token-123", + ) await stream.open() + # Verify AppendObjectSpec attributes + initial_request = mock_rpc_cls.call_args.kwargs["initial_request"] + assert initial_request.append_object_spec is not None + assert initial_request.append_object_spec.generation == GENERATION + assert initial_request.append_object_spec.routing_token == "token-123" + assert stream.persisted_size == 1024 -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" -) -async def test_close(mock_cls_async_bidi_rpc, mock_client): - """Test that close successfully closes the stream.""" - # Arrange - write_obj_stream = await instantiate_write_obj_stream( - mock_client, mock_cls_async_bidi_rpc, open=True + @mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" ) - - # Act - await write_obj_stream.close() - - # Assert - write_obj_stream.socket_like_rpc.close.assert_called_once() - assert not write_obj_stream.is_stream_open - - -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" -) -async def test_close_without_open_should_raise_error( - mock_cls_async_bidi_rpc, mock_client -): - """Test that closing a stream that is not open raises a ValueError.""" - # Arrange - write_obj_stream = await instantiate_write_obj_stream( - mock_client, mock_cls_async_bidi_rpc, open=False - ) - - # Act & Assert - with pytest.raises(ValueError, match="Stream is not open"): - await write_obj_stream.close() - - -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" -) -async def test_send(mock_cls_async_bidi_rpc, mock_client): - """Test that send calls the underlying rpc's send method.""" - # Arrange - write_obj_stream = await instantiate_write_obj_stream( - mock_client, mock_cls_async_bidi_rpc, open=True - ) - - # Act - bidi_write_object_request = _storage_v2.BidiWriteObjectRequest() - await write_obj_stream.send(bidi_write_object_request) - - # Assert - write_obj_stream.socket_like_rpc.send.assert_called_once_with( - bidi_write_object_request - ) - - -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" -) -async def test_send_without_open_should_raise_error( - mock_cls_async_bidi_rpc, mock_client -): - """Test that sending on a stream that is not open raises a ValueError.""" - # Arrange - write_obj_stream = await instantiate_write_obj_stream( - mock_client, mock_cls_async_bidi_rpc, open=False - ) - - # Act & Assert - with pytest.raises(ValueError, match="Stream is not open"): - await write_obj_stream.send(_storage_v2.BidiWriteObjectRequest()) - - -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" -) -async def test_recv(mock_cls_async_bidi_rpc, mock_client): - """Test that recv calls the underlying rpc's recv method.""" - # Arrange - write_obj_stream = await instantiate_write_obj_stream( - mock_client, mock_cls_async_bidi_rpc, open=True - ) - bidi_write_object_response = _storage_v2.BidiWriteObjectResponse() - write_obj_stream.socket_like_rpc.recv = AsyncMock( - return_value=bidi_write_object_response - ) - - # Act - response = await write_obj_stream.recv() - - # Assert - write_obj_stream.socket_like_rpc.recv.assert_called_once() - assert response == bidi_write_object_response - - -@pytest.mark.asyncio -@mock.patch( - "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" -) -async def test_recv_without_open_should_raise_error( - mock_cls_async_bidi_rpc, mock_client -): - """Test that receiving on a stream that is not open raises a ValueError.""" - # Arrange - write_obj_stream = await instantiate_write_obj_stream( - mock_client, mock_cls_async_bidi_rpc, open=False + @pytest.mark.asyncio + async def test_open_metadata_merging(self, mock_rpc_cls, mock_client): + mock_rpc = mock_rpc_cls.return_value + mock_rpc.open = AsyncMock() + mock_rpc.recv = AsyncMock(return_value=MagicMock(resource=None)) + + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + extra_metadata = [("x-custom", "val"), ("x-goog-request-params", "extra=param")] + + await stream.open(metadata=extra_metadata) + + # Verify that metadata combined bucket and extra params + passed_metadata = mock_rpc_cls.call_args.kwargs["metadata"] + meta_dict = dict(passed_metadata) + assert meta_dict["x-custom"] == "val" + # Params should be comma separated + params = meta_dict["x-goog-request-params"] + assert f"bucket={FULL_BUCKET_PATH}" in params + assert "extra=param" in params + + @pytest.mark.asyncio + async def test_open_already_open_raises(self, mock_client): + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + stream._is_stream_open = True + with pytest.raises(ValueError, match="already open"): + await stream.open() + + # ------------------------------------------------------------------------- + # Send & Recv & Close Tests + # ------------------------------------------------------------------------- + + @mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" ) + @pytest.mark.asyncio + async def test_send_and_recv_logic(self, mock_rpc_cls, mock_client): + # Setup open stream + mock_rpc = mock_rpc_cls.return_value + mock_rpc.open = AsyncMock() + mock_rpc.send = AsyncMock() # Crucial: Must be AsyncMock + mock_rpc.recv = AsyncMock(return_value=MagicMock(resource=None)) + + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + await stream.open() - # Act & Assert - with pytest.raises(ValueError, match="Stream is not open"): - await write_obj_stream.recv() + # Test Send + req = _storage_v2.BidiWriteObjectRequest(write_offset=0) + await stream.send(req) + mock_rpc.send.assert_awaited_with(req) + + # Test Recv with state update + mock_response = MagicMock() + mock_response.persisted_size = 5000 + mock_response.write_handle = b"new-handle" + mock_response.resource = None + mock_rpc.recv.return_value = mock_response + + res = await stream.recv() + assert res.persisted_size == 5000 + assert stream.persisted_size == 5000 + assert stream.write_handle == b"new-handle" + + @pytest.mark.asyncio + async def test_close_success(self, mock_client): + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + stream._is_stream_open = True + stream.socket_like_rpc = AsyncMock() + stream.socket_like_rpc.close = AsyncMock() + + await stream.close() + stream.socket_like_rpc.close.assert_awaited_once() + assert not stream.is_stream_open + + @pytest.mark.asyncio + async def test_methods_require_open_raises(self, mock_client): + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + with pytest.raises(ValueError, match="Stream is not open"): + await stream.send(MagicMock()) + with pytest.raises(ValueError, match="Stream is not open"): + await stream.recv() + with pytest.raises(ValueError, match="Stream is not open"): + await stream.close()