Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
if you want to use these Rapid Storage APIs.

"""
from typing import Optional
from typing import List, Optional, Tuple
from google.cloud import _storage_v2
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import (
Expand Down Expand Up @@ -62,6 +62,7 @@ def __init__(
object_name: str,
generation_number: Optional[int] = None, # None means new object
write_handle: Optional[bytes] = None,
routing_token: Optional[str] = None,
) -> None:
if client is None:
raise ValueError("client must be provided")
Expand All @@ -77,6 +78,7 @@ def __init__(
)
self.client: AsyncGrpcClient.grpc_client = client
self.write_handle: Optional[bytes] = write_handle
self.routing_token: Optional[str] = routing_token

self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}"

Expand All @@ -91,13 +93,15 @@ def __init__(
self.persisted_size = 0
self.object_resource: Optional[_storage_v2.Object] = None

async def open(self) -> None:
async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None:
"""Opening an object for write , should do it's state lookup
to know what's the persisted size is.
"""
if self._is_stream_open:
raise ValueError("Stream is already open")

write_handle = self.write_handle if self.write_handle else None

# Create a new object or overwrite existing one if generation_number
# is None. This makes it consistent with GCS JSON API behavior.
# Created object type would be Appendable Object.
Expand All @@ -116,37 +120,47 @@ async def open(self) -> None:
bucket=self._full_bucket_name,
object=self.object_name,
generation=self.generation_number,
write_handle=write_handle,
routing_token=self.routing_token if self.routing_token else None,
),
)

request_params = [f"bucket={self._full_bucket_name}"]
other_metadata = []
if metadata:
for key, value in metadata:
if key == "x-goog-request-params":
request_params.append(value)
else:
other_metadata.append((key, value))

current_metadata = other_metadata
current_metadata.append(("x-goog-request-params", ",".join(request_params)))

self.socket_like_rpc = AsyncBidiRpc(
self.rpc, initial_request=self.first_bidi_write_req, metadata=self.metadata
self.rpc,
initial_request=self.first_bidi_write_req,
metadata=current_metadata,
)

await self.socket_like_rpc.open() # this is actually 1 send
response = await self.socket_like_rpc.recv()
self._is_stream_open = True

if not response.resource:
raise ValueError(
"Failed to obtain object resource after opening the stream"
)
if not response.resource.generation:
raise ValueError(
"Failed to obtain object generation after opening the stream"
)
if response.persisted_size:
self.persisted_size = response.persisted_size

if not response.write_handle:
raise ValueError("Failed to obtain write_handle after opening the stream")
if response.resource:
if not response.resource.size:
# Appending to a 0 byte appendable object.
self.persisted_size = 0
else:
self.persisted_size = response.resource.size

if not response.resource.size:
# Appending to a 0 byte appendable object.
self.persisted_size = 0
else:
self.persisted_size = response.resource.size
self.generation_number = response.resource.generation

self.generation_number = response.resource.generation
self.write_handle = response.write_handle
if response.write_handle:
self.write_handle = response.write_handle

async def close(self) -> None:
"""Closes the bidi-gRPC connection."""
Expand Down Expand Up @@ -181,7 +195,16 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse:
"""
if not self._is_stream_open:
raise ValueError("Stream is not open")
return await self.socket_like_rpc.recv()
response = await self.socket_like_rpc.recv()
# Update write_handle if present in response
if response:
if response.write_handle:
self.write_handle = response.write_handle
if response.persisted_size is not None:
self.persisted_size = response.persisted_size
if response.resource and response.resource.size:
self.persisted_size = response.resource.size
return response

@property
def is_stream_open(self) -> bool:
Expand Down
46 changes: 44 additions & 2 deletions google/cloud/storage/_experimental/asyncio/retry/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@
from typing import Tuple, Optional

from google.api_core import exceptions
from google.cloud._storage_v2.types import BidiReadObjectRedirectedError
from google.cloud._storage_v2.types import (
BidiReadObjectRedirectedError,
BidiWriteObjectRedirectedError,
)
from google.rpc import status_pb2

_BIDI_READ_REDIRECTED_TYPE_URL = (
"type.googleapis.com/google.storage.v2.BidiReadObjectRedirectedError"
)
_BIDI_WRITE_REDIRECTED_TYPE_URL = (
"type.googleapis.com/google.storage.v2.BidiWriteObjectRedirectedError"
)
logger = logging.getLogger(__name__)


def _handle_redirect(
Expand Down Expand Up @@ -78,6 +85,41 @@ def _handle_redirect(
read_handle = redirect_proto.read_handle
break
except Exception as e:
logging.ERROR(f"Error unpacking redirect: {e}")
logger.error(f"Error unpacking redirect: {e}")

return routing_token, read_handle


def _extract_bidi_writes_redirect_proto(exc: Exception):
grpc_error = None
if isinstance(exc, exceptions.Aborted) and exc.errors:
grpc_error = exc.errors[0]

if grpc_error:
if isinstance(grpc_error, BidiWriteObjectRedirectedError):
return grpc_error

if hasattr(grpc_error, "trailing_metadata"):
trailers = grpc_error.trailing_metadata()
if not trailers:
return

status_details_bin = None
for key, value in trailers:
if key == "grpc-status-details-bin":
status_details_bin = value
break

if status_details_bin:
status_proto = status_pb2.Status()
try:
status_proto.ParseFromString(status_details_bin)
for detail in status_proto.details:
if detail.type_url == _BIDI_WRITE_REDIRECTED_TYPE_URL:
redirect_proto = BidiWriteObjectRedirectedError.deserialize(
detail.value
)
return redirect_proto
except Exception:
logger.error("Error unpacking redirect details from gRPC error.")
pass
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,50 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, Dict, IO, Iterable, Optional, Union
from typing import Any, Dict, IO, List, Optional, Union

import google_crc32c
from google.api_core import exceptions
from google.cloud._storage_v2.types import storage as storage_type
from google.cloud._storage_v2.types.storage import BidiWriteObjectRedirectedError
from google.cloud.storage._experimental.asyncio.retry.base_strategy import (
_BaseResumptionStrategy,
)
from google.cloud.storage._experimental.asyncio.retry._helpers import (
_extract_bidi_writes_redirect_proto,
)


_BIDI_WRITE_REDIRECTED_TYPE_URL = (
"type.googleapis.com/google.storage.v2.BidiWriteObjectRedirectedError"
)


class _WriteState:
"""A helper class to track the state of a single upload operation.

:type spec: :class:`google.cloud.storage_v2.types.AppendObjectSpec`
:param spec: The specification for the object to write.

:type chunk_size: int
:param chunk_size: The size of chunks to write to the server.

:type user_buffer: IO[bytes]
:param user_buffer: The data source.

:type flush_interval: Optional[int]
:param flush_interval: The flush interval at which the data is flushed.
"""

def __init__(
self,
spec: Union[storage_type.AppendObjectSpec, storage_type.WriteObjectSpec],
chunk_size: int,
user_buffer: IO[bytes],
flush_interval: Optional[int] = None,
):
self.spec = spec
self.chunk_size = chunk_size
self.user_buffer = user_buffer
self.persisted_size: int = 0
self.bytes_sent: int = 0
self.bytes_since_last_flush: int = 0
self.flush_interval: Optional[int] = flush_interval
self.write_handle: Union[bytes, storage_type.BidiWriteHandle, None] = None
self.routing_token: Optional[str] = None
self.is_finalized: bool = False
Expand All @@ -56,39 +66,22 @@ class _WriteResumptionStrategy(_BaseResumptionStrategy):

def generate_requests(
self, state: Dict[str, Any]
) -> Iterable[storage_type.BidiWriteObjectRequest]:
) -> List[storage_type.BidiWriteObjectRequest]:
"""Generates BidiWriteObjectRequests to resume or continue the upload.

For Appendable Objects, every stream opening should send an
AppendObjectSpec. If resuming, the `write_handle` is added to that spec.

This method is not applicable for `open` methods.
"""
write_state: _WriteState = state["write_state"]

initial_request = storage_type.BidiWriteObjectRequest()

# Determine if we need to send WriteObjectSpec or AppendObjectSpec
if isinstance(write_state.spec, storage_type.WriteObjectSpec):
initial_request.write_object_spec = write_state.spec
else:
if write_state.write_handle:
write_state.spec.write_handle = write_state.write_handle

if write_state.routing_token:
write_state.spec.routing_token = write_state.routing_token
initial_request.append_object_spec = write_state.spec

yield initial_request

requests = []
# The buffer should already be seeked to the correct position (persisted_size)
# by the `recover_state_on_failure` method before this is called.
while not write_state.is_finalized:
chunk = write_state.user_buffer.read(write_state.chunk_size)

# End of File detection
if not chunk:
return
break

checksummed_data = storage_type.ChecksummedData(content=chunk)
checksum = google_crc32c.Checksum(chunk)
Expand All @@ -98,16 +91,28 @@ def generate_requests(
write_offset=write_state.bytes_sent,
checksummed_data=checksummed_data,
)
write_state.bytes_sent += len(chunk)
chunk_len = len(chunk)
write_state.bytes_sent += chunk_len
write_state.bytes_since_last_flush += chunk_len

yield request
if (
write_state.flush_interval
and write_state.bytes_since_last_flush >= write_state.flush_interval
):
request.flush = True
# reset counter after marking flush
write_state.bytes_since_last_flush = 0

requests.append(request)
return requests

def update_state_from_response(
self, response: storage_type.BidiWriteObjectResponse, state: Dict[str, Any]
) -> None:
"""Processes a server response and updates the write state."""
write_state: _WriteState = state["write_state"]

if response is None:
return
if response.persisted_size:
write_state.persisted_size = response.persisted_size

Expand All @@ -129,18 +134,23 @@ async def recover_state_on_failure(
last confirmed 'persisted_size' from the server.
"""
write_state: _WriteState = state["write_state"]
cause = getattr(error, "cause", error)

# Extract routing token and potentially a new write handle for redirection.
if isinstance(cause, BidiWriteObjectRedirectedError):
if cause.routing_token:
write_state.routing_token = cause.routing_token
redirect_proto = None

if isinstance(error, BidiWriteObjectRedirectedError):
redirect_proto = error
else:
redirect_proto = _extract_bidi_writes_redirect_proto(error)

redirect_handle = getattr(cause, "write_handle", None)
if redirect_handle:
write_state.write_handle = redirect_handle
# Extract routing token and potentially a new write handle for redirection.
if redirect_proto:
if redirect_proto.routing_token:
write_state.routing_token = redirect_proto.routing_token
if redirect_proto.write_handle:
write_state.write_handle = redirect_proto.write_handle

# We must assume any data sent beyond 'persisted_size' was lost.
# Reset the user buffer to the last known good byte confirmed by the server.
write_state.user_buffer.seek(write_state.persisted_size)
write_state.bytes_sent = write_state.persisted_size
write_state.bytes_since_last_flush = 0
Loading