diff --git a/gcloud/storage/blob.py b/gcloud/storage/blob.py index fb7357ccd1c2..9835df6a7f60 100644 --- a/gcloud/storage/blob.py +++ b/gcloud/storage/blob.py @@ -260,12 +260,12 @@ def download_to_file(self, file_obj, client=None): download_url = self.media_link # Use apitools 'Download' facility. - download = Download.from_stream(file_obj, auto_transfer=False) - headers = {} + download = Download.from_stream(file_obj) + if self.chunk_size is not None: download.chunksize = self.chunk_size - headers['Range'] = 'bytes=0-%d' % (self.chunk_size - 1,) - request = Request(download_url, 'GET', headers) + + request = Request(download_url, 'GET') # Use the private ``_connection`` rather than the public # ``.connection``, since the public connection may be a batch. A @@ -275,8 +275,6 @@ def download_to_file(self, file_obj, client=None): # it has all three (http, API_BASE_URL and build_api_url). download.initialize_download(request, client._connection.http) - download.stream_file(use_chunks=True) - def download_to_filename(self, filename, client=None): """Download the contents of this blob into a named file. @@ -386,7 +384,10 @@ def upload_from_file(self, file_obj, rewind=False, size=None, } upload = Upload(file_obj, content_type, total_bytes, - auto_transfer=False, chunksize=self.chunk_size) + auto_transfer=False) + + if self.chunk_size is not None: + upload.chunksize = self.chunk_size url_builder = _UrlBuilder(bucket_name=self.bucket.name, object_name=self.name) diff --git a/gcloud/storage/test_blob.py b/gcloud/storage/test_blob.py index e0bfa8b54a04..e4760607a767 100644 --- a/gcloud/storage/test_blob.py +++ b/gcloud/storage/test_blob.py @@ -345,7 +345,8 @@ def test_upload_from_file_size_failure(self): def _upload_from_file_simple_test_helper(self, properties=None, content_type_arg=None, - expected_content_type=None): + expected_content_type=None, + chunk_size=5): from six.moves.http_client import OK from six.moves.urllib.parse import parse_qsl from six.moves.urllib.parse import urlsplit @@ -361,7 +362,7 @@ def _upload_from_file_simple_test_helper(self, properties=None, bucket = _Bucket(client) blob = self._makeOne(BLOB_NAME, bucket=bucket, properties=properties) blob._CHUNK_SIZE_MULTIPLE = 1 - blob.chunk_size = 5 + blob.chunk_size = chunk_size with _NamedTemporaryFile() as temp: with open(temp.name, 'wb') as file_obj: @@ -390,6 +391,11 @@ def test_upload_from_file_simple(self): self._upload_from_file_simple_test_helper( expected_content_type='application/octet-stream') + def test_upload_from_file_simple_w_chunk_size_None(self): + self._upload_from_file_simple_test_helper( + expected_content_type='application/octet-stream', + chunk_size=None) + def test_upload_from_file_simple_with_content_type(self): EXPECTED_CONTENT_TYPE = 'foo/bar' self._upload_from_file_simple_test_helper( diff --git a/gcloud/streaming/buffered_stream.py b/gcloud/streaming/buffered_stream.py index 0920ea0499e2..be3eb19c3980 100644 --- a/gcloud/streaming/buffered_stream.py +++ b/gcloud/streaming/buffered_stream.py @@ -1,17 +1,23 @@ -# pylint: skip-file """Small helper class to provide a small slice of a stream. This class reads ahead to detect if we are at the end of the stream. """ -from gcloud.streaming import exceptions +from gcloud.streaming.exceptions import NotYetImplementedError -# TODO(user): Consider replacing this with a StringIO. class BufferedStream(object): + """Buffers a stream, reading ahead to determine if we're at the end. - """Buffers a stream, reading ahead to determine if we're at the end.""" + :type stream: readable file-like object + :param stream: the stream to be buffered + :type start: integer + :param start: the starting point in the stream + + :type size: integer + :param size: the size of the buffer + """ def __init__(self, stream, start, size): self._stream = stream self._start_pos = start @@ -30,30 +36,46 @@ def __len__(self): @property def stream_exhausted(self): + """Does the stream have bytes remaining beyond the buffer + + :rtype: boolean + """ return self._stream_at_end @property def stream_end_position(self): + """Point to which stream was read into the buffer + + :rtype: integer + """ return self._end_pos @property def _bytes_remaining(self): + """Bytes remaining to be read from the buffer + + :rtype: integer + """ return len(self._buffered_data) - self._buffer_pos - def read(self, size=None): # pylint: disable=invalid-name - """Reads from the buffer.""" + def read(self, size=None): + """Read bytes from the buffer. + + :type size: integer or None + :param size: How many bytes to read (defaults to all remaining bytes). + """ if size is None or size < 0: - raise exceptions.NotYetImplementedError( + raise NotYetImplementedError( 'Illegal read of size %s requested on BufferedStream. ' 'Wrapped stream %s is at position %s-%s, ' '%s bytes remaining.' % (size, self._stream, self._start_pos, self._end_pos, self._bytes_remaining)) - data = b'' - if self._bytes_remaining: - size = min(size, self._bytes_remaining) - data = self._buffered_data[ - self._buffer_pos:self._buffer_pos + size] - self._buffer_pos += size + if not self._bytes_remaining: + return b'' + + size = min(size, self._bytes_remaining) + data = self._buffered_data[self._buffer_pos:self._buffer_pos + size] + self._buffer_pos += size return data diff --git a/gcloud/streaming/exceptions.py b/gcloud/streaming/exceptions.py index 7c3e46d7ecf5..42a480e654b7 100644 --- a/gcloud/streaming/exceptions.py +++ b/gcloud/streaming/exceptions.py @@ -1,41 +1,38 @@ -# pylint: skip-file """Exceptions for generated client libraries.""" class Error(Exception): - """Base class for all exceptions.""" -class TypecheckError(Error, TypeError): - - """An object of an incorrect type is provided.""" - - class NotFoundError(Error): - """A specified resource could not be found.""" class UserError(Error): - """Base class for errors related to user input.""" class InvalidDataError(Error): - """Base class for any invalid data error.""" class CommunicationError(Error): - """Any communication error talking to an API server.""" class HttpError(CommunicationError): + """Error making a request. Soon to be HttpError. + + :type response: dict + :param response: headers from the response which returned the error - """Error making a request. Soon to be HttpError.""" + :type content: bytes + :param content: payload of the response which returned the error + :type url: string + :param url: URL of the response which returned the error + """ def __init__(self, response, content, url): super(HttpError, self).__init__() self.response = response @@ -49,70 +46,89 @@ def __str__(self): @property def status_code(self): - # TODO(craigcitro): Turn this into something better than a - # KeyError if there is no status. + """Status code for the response. + + :rtype: integer + :returns: the code + """ return int(self.response['status']) @classmethod - def FromResponse(cls, http_response): + def from_response(cls, http_response): + """Factory: construct an exception from a response. + + :type http_response: :class:`gcloud.streaming.http_wrapper.Response` + :param http_response: the response which returned the error + + :rtype: :class:`HttpError` + """ return cls(http_response.info, http_response.content, http_response.request_url) class InvalidUserInputError(InvalidDataError): - """User-provided input is invalid.""" class ConfigurationValueError(UserError): - """Some part of the user-specified client configuration is invalid.""" class TransferError(CommunicationError): - """Errors related to transfers.""" class TransferRetryError(TransferError): - """Retryable errors related to transfers.""" class TransferInvalidError(TransferError): - """The given transfer is invalid.""" class RequestError(CommunicationError): - """The request was not successful.""" class RetryAfterError(HttpError): + """The response contained a retry-after header. + + :type response: dict + :param response: headers from the response which returned the error - """The response contained a retry-after header.""" + :type content: bytes + :param content: payload of the response which returned the error + :type url: string + :param url: URL of the response which returned the error + + :type retry_after: integer + :param retry_after: seconds to wait before retrying + """ def __init__(self, response, content, url, retry_after): super(RetryAfterError, self).__init__(response, content, url) self.retry_after = int(retry_after) @classmethod - def FromResponse(cls, http_response): + def from_response(cls, http_response): + """Factory: construct an exception from a response. + + :type http_response: :class:`gcloud.streaming.http_wrapper.Response` + :param http_response: the response which returned the error + + :rtype: :class:`RetryAfterError` + """ return cls(http_response.info, http_response.content, http_response.request_url, http_response.retry_after) class BadStatusCodeError(HttpError): - """The request completed but returned a bad status code.""" class NotYetImplementedError(Error): - """This functionality is not yet implemented.""" class StreamExhausted(Error): - """Attempted to read more bytes from a stream than were available.""" diff --git a/gcloud/streaming/http_wrapper.py b/gcloud/streaming/http_wrapper.py index cb75babb1d70..7318b9152d63 100644 --- a/gcloud/streaming/http_wrapper.py +++ b/gcloud/streaming/http_wrapper.py @@ -1,8 +1,7 @@ -# pylint: skip-file """HTTP wrapper for apitools. This library wraps the underlying http library we use, which is -currently httplib2. +currently :mod:`httplib2`. """ import collections @@ -13,8 +12,8 @@ import httplib2 import six -from six.moves import http_client -from six.moves.urllib import parse +from six.moves import http_client # pylint: disable=F0401 +from six.moves.urllib import parse # pylint: disable=F0401 from gcloud.streaming.exceptions import BadStatusCodeError from gcloud.streaming.exceptions import RequestError @@ -33,9 +32,11 @@ RESUME_INCOMPLETE, ) -class _ExceptionRetryArgs(collections.namedtuple( - '_ExceptionRetryArgs', ['http', 'http_request', 'exc', 'num_retries', - 'max_retry_wait'])): + +class _ExceptionRetryArgs( + collections.namedtuple( + '_ExceptionRetryArgs', + ['http', 'http_request', 'exc', 'num_retries', 'max_retry_wait'])): """Bundle of information for retriable exceptions. :type http: :class:`httplib2.Http` (or conforming alternative) @@ -54,7 +55,7 @@ class _ExceptionRetryArgs(collections.namedtuple( @contextlib.contextmanager -def _Httplib2Debuglevel(http_request, level, http=None): +def _httplib2_debug_level(http_request, level, http=None): """Temporarily change the value of httplib2.debuglevel, if necessary. If http_request has a `loggable_body` distinct from `body`, then we @@ -64,14 +65,14 @@ def _Httplib2Debuglevel(http_request, level, http=None): an httplib2.Http object is provided, we'll also change the level on any cached connections attached to it. - Args: - http_request: a Request we're logging. - level: (int) the debuglevel for logging. - http: (optional) an httplib2.Http whose connections we should - set the debuglevel on. + :type http_request: :class:`Request` + :param http_request: the request to be logged. + + :type level: integer + :param level: the debuglevel for logging. - Yields: - None. + :type http: :class:`httplib2.Http`, or ``None`` + :param http: the instance on whose connections to set the debuglevel. """ if http_request.loggable_body is None: yield @@ -97,9 +98,20 @@ def _Httplib2Debuglevel(http_request, level, http=None): class Request(object): + """Encapsulates the data for an HTTP request. - """Class encapsulating the data for an HTTP request.""" + :type url: string + :param url: the URL for the request + :type http_method: string + :param http_method: the HTTP method to use for the request + + :type headers: mapping or None + :param headers: headers to be sent with the request + + :type body: string + :param body: body to be sent with the request + """ def __init__(self, url='', http_method='GET', headers=None, body=''): self.url = url self.http_method = http_method @@ -110,10 +122,21 @@ def __init__(self, url='', http_method='GET', headers=None, body=''): @property def loggable_body(self): + """Request body for logging purposes + + :rtype: string + """ return self.__loggable_body @loggable_body.setter def loggable_body(self, value): + """Update request body for logging purposes + + :type value: string + :param value: updated body + + :raises: :exc:`RequestError` if the request does not have a body. + """ if self.body is None: raise RequestError( 'Cannot set loggable body on request with no body') @@ -121,11 +144,21 @@ def loggable_body(self, value): @property def body(self): + """Request body + + :rtype: string + """ return self.__body @body.setter def body(self, value): - """Sets the request body; handles logging and length measurement.""" + """Update the request body + + Handles logging and length measurement. + + :type value: string + :param value: updated body + """ self.__body = value if value is not None: # Avoid calling len() which cannot exceed 4GiB in 32-bit python. @@ -158,10 +191,13 @@ def _process_content_range(content_range): # Note: currently the order of fields here is important, since we want # to be able to pass in the result from httplib2.request. -class Response(collections.namedtuple( - 'HttpResponse', ['info', 'content', 'request_url'])): +_ResponseTuple = collections.namedtuple( + 'HttpResponse', ['info', 'content', 'request_url']) + - """Class encapsulating data for an HTTP response.""" +class Response(_ResponseTuple): + """Encapsulates data for an HTTP response. + """ __slots__ = () def __len__(self): @@ -169,13 +205,12 @@ def __len__(self): @property def length(self): - """Return the length of this response. + """Length of this response. - We expose this as an attribute since using len() directly can fail - for responses larger than sys.maxint. + Exposed as an attribute since using ``len()`` directly can fail + for responses larger than ``sys.maxint``. - Returns: - Response length (as int or long) + :rtype: integer or long """ if 'content-encoding' in self.info and 'content-range' in self.info: # httplib2 rewrites content-length in the case of a compressed @@ -190,33 +225,59 @@ def length(self): @property def status_code(self): + """HTTP status code + + :rtype: integer + """ return int(self.info['status']) @property def retry_after(self): + """Retry interval (if set). + + :rtype: integer + :returns: interval in seconds + """ if 'retry-after' in self.info: return int(self.info['retry-after']) @property def is_redirect(self): + """Does this response contain a redirect + + :rtype: boolean + :returns: True if the status code indicates a redirect and the + 'location' header is present. + """ return (self.status_code in _REDIRECT_STATUS_CODES and 'location' in self.info) def _check_response(response): + """Validate a response + + :type response: :class:`Response` + :param response: the response to validate + + :raises: :exc:`gcloud.streaming.exceptions.RequestError` if response is + None, :exc:`gcloud.streaming.exceptions.BadStatusCodeError` if + response status code indicates an error, or + :exc:`gcloud.streaming.exceptions.RetryAfterError` if response + indicates a retry interval. + """ if response is None: # Caller shouldn't call us if the response is None, but handle anyway. raise RequestError( 'Request did not return a response.') elif (response.status_code >= 500 or response.status_code == TOO_MANY_REQUESTS): - raise BadStatusCodeError.FromResponse(response) + raise BadStatusCodeError.from_response(response) elif response.retry_after: - raise RetryAfterError.FromResponse(response) + raise RetryAfterError.from_response(response) def _rebuild_http_connections(http): - """Rebuilds all http connections in the httplib2.Http instance. + """Rebuild all http connections in the httplib2.Http instance. httplib2 overloads the map in http.connections to contain two different types of values: @@ -225,8 +286,8 @@ def _rebuild_http_connections(http): Here we remove all of the entries for actual connections so that on the next request httplib2 will rebuild them from the connection types. - Args: - http: An httplib2.Http instance. + :type http: :class:`httplib2.Http` + :param http: the instance whose connections are to be rebuilt """ if getattr(http, 'connections', None): for conn_key in list(http.connections.keys()): @@ -237,10 +298,10 @@ def _rebuild_http_connections(http): def handle_http_exceptions(retry_args): """Exception handler for http failures. - This catches known failures and rebuilds the underlying HTTP connections. + Catches known failures and rebuild the underlying HTTP connections. - Args: - retry_args: An _ExceptionRetryArgs tuple. + :type retry_args: :class:`_ExceptionRetryArgs` + :param retry_args: the exception information to be evaluated. """ # If the server indicates how long to wait, use that value. Otherwise, # calculate the wait time on our own. @@ -290,25 +351,28 @@ def handle_http_exceptions(retry_args): def _make_api_request_no_retry(http, http_request, redirections=5, check_response_func=_check_response): - """Send http_request via the given http. + """Send an HTTP request via the given http instance. This wrapper exists to handle translation between the plain httplib2 request/response types and the Request and Response types above. - Args: - http: An httplib2.Http instance, or a http multiplexer that delegates to - an underlying http, for example, HTTPMultiplexer. - http_request: A Request to send. - redirections: (int, default 5) Number of redirects to follow. - check_response_func: Function to validate the HTTP response. - Arguments are (Response, response content, url). + :type http: :class:`httplib2.Http` + :param http: an instance which impelements the `Http` API. + + :type http_request: :class:`Request` + :param http_request: the request to send. - Returns: - A Response object. + :type redirections: integer + :param redirections: Number of redirects to follow. - Raises: - RequestError if no response could be parsed. + :type check_response_func: function taking (response, content, url). + :param check_response_func: Function to validate the HTTP response. + :rtype: :class:`Response` + :returns: an object representing the server's response + + :raises: :exc:`gcloud.streaming.exceptions.RequestError` if no response + could be parsed. """ connection_type = None # Handle overrides for connection types. This is used if the caller @@ -321,7 +385,7 @@ def _make_api_request_no_retry(http, http_request, redirections=5, # Custom printing only at debuglevel 4 new_debuglevel = 4 if httplib2.debuglevel == 4 else 0 - with _Httplib2Debuglevel(http_request, new_debuglevel, http=http): + with _httplib2_debug_level(http_request, new_debuglevel, http=http): info, content = http.request( str(http_request.url), method=str(http_request.http_method), body=http_request.body, headers=http_request.headers, @@ -335,32 +399,46 @@ def _make_api_request_no_retry(http, http_request, redirections=5, return response -def make_api_request(http, http_request, retries=7, max_retry_wait=60, - redirections=5, - retry_func=handle_http_exceptions, - check_response_func=_check_response, - wo_retry_func=_make_api_request_no_retry): - """Send http_request via the given http, performing error/retry handling. - - Args: - http: An httplib2.Http instance, or a http multiplexer that delegates to - an underlying http, for example, HTTPMultiplexer. - http_request: A Request to send. - retries: (int, default 7) Number of retries to attempt on retryable - replies (such as 429 or 5XX). - max_retry_wait: (int, default 60) Maximum number of seconds to wait - when retrying. - redirections: (int, default 5) Number of redirects to follow. - retry_func: Function to handle retries on exceptions. Arguments are - (Httplib2.Http, Request, Exception, int num_retries). - check_response_func: Function to validate the HTTP response. - Arguments are (Response, response content, url). - wo_retry_func: Function to make HTTP request without retries. Arguments - are: (http, http_request, redirections, check_response_func) - - Returns: - A Response object. +def make_api_request(http, http_request, + retries=7, + max_retry_wait=60, + redirections=5, + retry_func=handle_http_exceptions, + check_response_func=_check_response, + wo_retry_func=_make_api_request_no_retry): + """Send an HTTP request via the given http, performing error/retry handling. + + :type http: :class:`httplib2.Http` + :param http: an instance which impelements the `Http` API. + + :type http_request: :class:`Request` + :param http_request: the request to send. + + :type retries: integer + :param retries: Number of retries to attempt on retryable + responses (such as 429 or 5XX). + + :type max_retry_wait: integer + :param max_retry_wait: Maximum number of seconds to wait when retrying. + + :type redirections: integer + :param redirections: Number of redirects to follow. + + :type retry_func: function taking (http, request, exception, num_retries). + :param retry_func: Function to handle retries on exceptions. + + :type check_response_func: function taking (response, content, url). + :param check_response_func: Function to validate the HTTP response. + + :type wo_retry_func: function taking + (http, request, redirections, check_response_func) + :param wo_retry_func: Function to make HTTP request without retries. + + :rtype: :class:`Response` + :returns: an object representing the server's response + :raises: :exc:`gcloud.streaming.exceptions.RequestError` if no response + could be parsed. """ retry = 0 while True: @@ -369,24 +447,36 @@ def make_api_request(http, http_request, retries=7, max_retry_wait=60, http, http_request, redirections=redirections, check_response_func=check_response_func) # retry_func will consume the exception types it handles and raise. - # pylint: disable=broad-except - except Exception as e: + except Exception as exc: # pylint: disable=broad-except retry += 1 if retry >= retries: raise else: retry_func(_ExceptionRetryArgs( - http, http_request, e, retry, max_retry_wait)) + http, http_request, exc, retry, max_retry_wait)) _HTTP_FACTORIES = [] def _register_http_factory(factory): + """Register a custom HTTP factory. + + :type factory: callable taking keyword arguments, returning an Http + instance (or an instance implementing the same API); + :param factory: the new factory (it may return ``None`` to defer to + a later factory or the default). + """ _HTTP_FACTORIES.append(factory) def get_http(**kwds): + """Construct an Http instance. + + :param kwds: keyword arguments to pass to factories. + + :rtype: :class:`httplib2.Http` (or a workalike) + """ for factory in _HTTP_FACTORIES: http = factory(**kwds) if http is not None: diff --git a/gcloud/streaming/stream_slice.py b/gcloud/streaming/stream_slice.py index df3023d2b4ce..ae798a32b244 100644 --- a/gcloud/streaming/stream_slice.py +++ b/gcloud/streaming/stream_slice.py @@ -1,13 +1,17 @@ -# pylint: skip-file """Small helper class to provide a small slice of a stream.""" -from gcloud.streaming import exceptions +from gcloud.streaming.exceptions import StreamExhausted class StreamSlice(object): + """Provides a slice-like object for streams. - """Provides a slice-like object for streams.""" + :type stream: readable file-like object + :param stream: the stream to be buffered + :type max_bytes: integer + :param max_bytes: maximum number of bytes to return in the slice + """ def __init__(self, stream, max_bytes): self._stream = stream self._remaining_bytes = max_bytes @@ -27,27 +31,32 @@ def __nonzero__(self): @property def length(self): - # For 32-bit python2.x, len() cannot exceed a 32-bit number. + """Maximum number of bytes to return in the slice. + + .. note:: + + For 32-bit python2.x, len() cannot exceed a 32-bit number. + + :rtype: integer + """ return self._max_bytes - def read(self, size=None): # pylint: disable=missing-docstring - """Read at most size bytes from this slice. + def read(self, size=None): + """Read bytes from the slice. Compared to other streams, there is one case where we may unexpectedly raise an exception on read: if the underlying stream is exhausted (i.e. returns no bytes on read), and the size of this slice indicates we should still be able to read more bytes, we - raise exceptions.StreamExhausted. - - Args: - size: If provided, read no more than size bytes from the stream. + raise :exc:`StreamExhausted`. - Returns: - The bytes read from this slice. + :type size: integer or None + :param size: If provided, read no more than size bytes from the stream. - Raises: - exceptions.StreamExhausted + :rtype: bytes + :returns: bytes read from this slice. + :raises: :exc:`gcloud.streaming.exceptions.StreamExhausted` """ if size is not None: read_size = min(size, self._remaining_bytes) @@ -55,7 +64,7 @@ def read(self, size=None): # pylint: disable=missing-docstring read_size = self._remaining_bytes data = self._stream.read(read_size) if read_size > 0 and not data: - raise exceptions.StreamExhausted( + raise StreamExhausted( 'Not enough bytes in stream; expected %d, exhausted ' 'after %d' % ( self._max_bytes, diff --git a/gcloud/streaming/test_buffered_stream.py b/gcloud/streaming/test_buffered_stream.py index 01764bbee5a6..9347ca5349ef 100644 --- a/gcloud/streaming/test_buffered_stream.py +++ b/gcloud/streaming/test_buffered_stream.py @@ -1,4 +1,3 @@ -# pylint: skip-file import unittest2 diff --git a/gcloud/streaming/test_exceptions.py b/gcloud/streaming/test_exceptions.py index 02af5339cb75..4f7ffc35a61e 100644 --- a/gcloud/streaming/test_exceptions.py +++ b/gcloud/streaming/test_exceptions.py @@ -1,4 +1,3 @@ -# pylint: skip-file import unittest2 @@ -25,7 +24,7 @@ def test_ctor(self): "HttpError accessing : " "response: <{'status': '404'}>, content ") - def test_FromResponse(self): + def test_from_response(self): RESPONSE = {'status': '404'} CONTENT = b'CONTENT' URL = 'http://www.example.com' @@ -36,7 +35,7 @@ class _Response(object): request_url = URL klass = self._getTargetClass() - exception = klass.FromResponse(_Response()) + exception = klass.from_response(_Response()) self.assertTrue(isinstance(exception, klass)) self.assertEqual(exception.response, RESPONSE) self.assertEqual(exception.content, CONTENT) @@ -67,7 +66,7 @@ def test_ctor(self): "HttpError accessing : " "response: <{'status': '404'}>, content ") - def test_FromResponse(self): + def test_from_response(self): RESPONSE = {'status': '404'} CONTENT = b'CONTENT' URL = 'http://www.example.com' @@ -80,7 +79,7 @@ class _Response(object): retry_after = RETRY_AFTER klass = self._getTargetClass() - exception = klass.FromResponse(_Response()) + exception = klass.from_response(_Response()) self.assertTrue(isinstance(exception, klass)) self.assertEqual(exception.response, RESPONSE) self.assertEqual(exception.content, CONTENT) diff --git a/gcloud/streaming/test_http_wrapper.py b/gcloud/streaming/test_http_wrapper.py index 182dc48d06c4..301f19eacf08 100644 --- a/gcloud/streaming/test_http_wrapper.py +++ b/gcloud/streaming/test_http_wrapper.py @@ -1,12 +1,11 @@ -# pylint: skip-file import unittest2 -class Test__Httplib2Debuglevel(unittest2.TestCase): +class Test__httplib2_debug_level(unittest2.TestCase): def _getTargetClass(self): - from gcloud.streaming.http_wrapper import _Httplib2Debuglevel - return _Httplib2Debuglevel + from gcloud.streaming.http_wrapper import _httplib2_debug_level + return _httplib2_debug_level def _makeOne(self, *args, **kw): return self._getTargetClass()(*args, **kw) @@ -170,7 +169,6 @@ def test_length_wo_content_length_w_content_range(self): def test_retry_after_w_header(self): CONTENT = 'CONTENT' URL = 'http://example.com/api' - RANGE = 'bytes 0-122/5678' info = { 'status': '200', 'retry-after': '123', @@ -181,7 +179,6 @@ def test_retry_after_w_header(self): def test_is_redirect_w_code_wo_location(self): CONTENT = 'CONTENT' URL = 'http://example.com/api' - RANGE = 'bytes 0-122/5678' info = { 'status': '301', } @@ -191,7 +188,6 @@ def test_is_redirect_w_code_wo_location(self): def test_is_redirect_w_code_w_location(self): CONTENT = 'CONTENT' URL = 'http://example.com/api' - RANGE = 'bytes 0-122/5678' info = { 'status': '301', 'location': 'http://example.com/other', @@ -450,7 +446,7 @@ def test_w_BadStatusCodeError(self): from gcloud._testing import _Monkey from gcloud.streaming.exceptions import BadStatusCodeError response = _Response(500) - exc = BadStatusCodeError.FromResponse(response) + exc = BadStatusCodeError.from_response(response) retry_args = self._build_retry_args(exc) monkey, logged, slept = self._monkeyMUT() @@ -466,7 +462,7 @@ def test_w_RetryAfterError(self): from gcloud.streaming.http_wrapper import TOO_MANY_REQUESTS RETRY_AFTER = 25 response = _Response(TOO_MANY_REQUESTS, RETRY_AFTER) - exc = RetryAfterError.FromResponse(response) + exc = RetryAfterError.from_response(response) retry_args = self._build_retry_args(exc) monkey, logged, slept = self._monkeyMUT() @@ -486,13 +482,13 @@ class _Nonesuch(Exception): def _raises(): raise _Nonesuch - monkey, logged, slept = self._monkeyMUT() + monkey, _, _ = self._monkeyMUT() with monkey: with self.assertRaises(_Nonesuch): try: _raises() - except Exception as exc: + except _Nonesuch as exc: retry_args = _Dummy(exc=exc) self._callFUT(retry_args) @@ -631,11 +627,11 @@ def _wo_exception(*args, **kw): check_response_func=_checked.append) self.assertTrue(response is RESPONSE) - self.assertEqual(_created, - [((HTTP, REQUEST), { - 'redirections': 5, - 'check_response_func': _checked.append, - })]) + expected_kw = { + 'redirections': 5, + 'check_response_func': _checked.append, + } + self.assertEqual(_created, [((HTTP, REQUEST), expected_kw)]) self.assertEqual(_checked, []) # not called by '_wo_exception' def test_w_exceptions_lt_max_retries(self): @@ -666,12 +662,12 @@ def _retry(args): self.assertTrue(response is RESPONSE) self.assertEqual(len(_created), 5) + expected_kw = { + 'redirections': 5, + 'check_response_func': _checked.append, + } for attempt in _created: - self.assertEqual(attempt, - ((HTTP, REQUEST), { - 'redirections': 5, - 'check_response_func': _checked.append, - })) + self.assertEqual(attempt, ((HTTP, REQUEST), expected_kw)) self.assertEqual(_checked, []) # not called by '_wo_exception' self.assertEqual(len(_retried), 4) for index, retry in enumerate(_retried): @@ -682,7 +678,7 @@ def _retry(args): self.assertEqual(retry.max_retry_wait, WAIT) def test_w_exceptions_gt_max_retries(self): - HTTP, REQUEST, RESPONSE = object(), object(), object() + HTTP, REQUEST = object(), object() WAIT = 10, _created, _checked, _retried = [], [], [] @@ -705,12 +701,12 @@ def _retry(args): check_response_func=_checked.append) self.assertEqual(len(_created), 3) + expected_kw = { + 'redirections': 5, + 'check_response_func': _checked.append, + } for attempt in _created: - self.assertEqual(attempt, - ((HTTP, REQUEST), { - 'redirections': 5, - 'check_response_func': _checked.append, - })) + self.assertEqual(attempt, ((HTTP, REQUEST), expected_kw)) self.assertEqual(_checked, []) # not called by '_wo_exception' self.assertEqual(len(_retried), 2) for index, retry in enumerate(_retried): diff --git a/gcloud/streaming/test_stream_slice.py b/gcloud/streaming/test_stream_slice.py index fabb68a49eea..24776c41fb6f 100644 --- a/gcloud/streaming/test_stream_slice.py +++ b/gcloud/streaming/test_stream_slice.py @@ -1,4 +1,3 @@ -# pylint: skip-file import unittest2 diff --git a/gcloud/streaming/test_transfer.py b/gcloud/streaming/test_transfer.py index de09c8dc3e08..c1e2578598bd 100644 --- a/gcloud/streaming/test_transfer.py +++ b/gcloud/streaming/test_transfer.py @@ -1,4 +1,4 @@ -# pylint: skip-file +# pylint: disable=C0302 import unittest2 @@ -13,11 +13,12 @@ def _makeOne(self, *args, **kw): return self._getTargetClass()(*args, **kw) def test_ctor_defaults(self): + from gcloud.streaming.transfer import _DEFAULT_CHUNKSIZE stream = _Stream() xfer = self._makeOne(stream) self.assertTrue(xfer.stream is stream) self.assertFalse(xfer.close_stream) - self.assertEqual(xfer.chunksize, 1 << 20) + self.assertEqual(xfer.chunksize, _DEFAULT_CHUNKSIZE) self.assertTrue(xfer.auto_transfer) self.assertTrue(xfer.bytes_http is None) self.assertTrue(xfer.http is None) @@ -59,17 +60,15 @@ def test_bytes_http_setter(self): self.assertTrue(xfer.bytes_http is BYTES_HTTP) def test_num_retries_setter_invalid(self): - from gcloud.streaming.exceptions import TypecheckError stream = _Stream() xfer = self._makeOne(stream) - with self.assertRaises(TypecheckError): + with self.assertRaises(ValueError): xfer.num_retries = object() def test_num_retries_setter_negative(self): - from gcloud.streaming.exceptions import InvalidDataError stream = _Stream() xfer = self._makeOne(stream) - with self.assertRaises(InvalidDataError): + with self.assertRaises(ValueError): xfer.num_retries = -1 def test__initialize_not_already_initialized_w_http(self): @@ -199,7 +198,7 @@ def test_from_file_w_existing_file_w_override_wo_auto_transfer(self): with open(filename, 'w') as fileobj: fileobj.write('EXISTING FILE') download = klass.from_file(filename, overwrite=True, - auto_transfer=False) + auto_transfer=False) self.assertFalse(download.auto_transfer) del download # closes stream with open(filename, 'rb') as fileobj: @@ -219,7 +218,7 @@ def test_from_stream_explicit(self): stream = _Stream() klass = self._getTargetClass() download = klass.from_stream(stream, auto_transfer=False, - total_size=SIZE, chunksize=CHUNK_SIZE) + total_size=SIZE, chunksize=CHUNK_SIZE) self.assertTrue(download.stream is stream) self.assertFalse(download.auto_transfer) self.assertEqual(download.total_size, SIZE) @@ -268,14 +267,7 @@ def test_initialize_download_already_initialized(self): with self.assertRaises(TransferInvalidError): download.initialize_download(request, http=object()) - def test_initialize_download_wo_http_or_client(self): - from gcloud.streaming.exceptions import UserError - request = _Request() - download = self._makeOne(_Stream()) - with self.assertRaises(UserError): - download.initialize_download(request) - - def test_initialize_download_wo_client_wo_autotransfer(self): + def test_initialize_download_wo_autotransfer(self): request = _Request() http = object() download = self._makeOne(_Stream(), auto_transfer=False) @@ -283,16 +275,6 @@ def test_initialize_download_wo_client_wo_autotransfer(self): self.assertTrue(download.http is http) self.assertEqual(download.url, request.url) - def test_initialize_download_w_client_wo_autotransfer(self): - FINALIZED_URL = 'http://example.com/other' - request = _Request() - http = object() - client = _Client(http, FINALIZED_URL) - download = self._makeOne(_Stream(), auto_transfer=False) - download.initialize_download(request, client=client) - self.assertTrue(download.http is http) - self.assertEqual(download.url, FINALIZED_URL) - def test_initialize_download_w_autotransfer_failing(self): from six.moves import http_client from gcloud._testing import _Monkey @@ -305,8 +287,7 @@ def test_initialize_download_w_autotransfer_failing(self): response = _makeResponse(http_client.BAD_REQUEST) requester = _MakeRequest(response) - with _Monkey(MUT, - make_api_request=requester): + with _Monkey(MUT, make_api_request=requester): with self.assertRaises(HttpError): download.initialize_download(request, http) @@ -338,7 +319,6 @@ def test_initialize_download_w_autotransfer_w_content_location(self): def test__normalize_start_end_w_end_w_start_lt_0(self): from gcloud.streaming.exceptions import TransferInvalidError - request = _Request() download = self._makeOne(_Stream()) with self.assertRaises(TransferInvalidError): @@ -346,7 +326,6 @@ def test__normalize_start_end_w_end_w_start_lt_0(self): def test__normalize_start_end_w_end_w_start_gt_total(self): from gcloud.streaming.exceptions import TransferInvalidError - request = _Request() download = self._makeOne(_Stream()) download._set_total({'content-range': 'bytes 0-1/2'}) @@ -355,7 +334,6 @@ def test__normalize_start_end_w_end_w_start_gt_total(self): def test__normalize_start_end_w_end_lt_start(self): from gcloud.streaming.exceptions import TransferInvalidError - request = _Request() download = self._makeOne(_Stream()) download._set_total({'content-range': 'bytes 0-1/2'}) @@ -363,20 +341,17 @@ def test__normalize_start_end_w_end_lt_start(self): download._normalize_start_end(1, 0) def test__normalize_start_end_w_end_gt_start(self): - request = _Request() download = self._makeOne(_Stream()) download._set_total({'content-range': 'bytes 0-1/2'}) self.assertEqual(download._normalize_start_end(1, 2), (1, 1)) def test__normalize_start_end_wo_end_w_start_lt_0(self): - request = _Request() download = self._makeOne(_Stream()) download._set_total({'content-range': 'bytes 0-1/2'}) self.assertEqual(download._normalize_start_end(-2), (0, 1)) self.assertEqual(download._normalize_start_end(-1), (1, 1)) def test__normalize_start_end_wo_end_w_start_ge_0(self): - request = _Request() download = self._makeOne(_Stream()) download._set_total({'content-range': 'bytes 0-1/100'}) self.assertEqual(download._normalize_start_end(0), (0, 99)) @@ -419,7 +394,8 @@ def test__compute_end_byte_w_start_ge_0_w_end_w_total_size(self): CHUNK_SIZE = 50 download = self._makeOne(_Stream(), chunksize=CHUNK_SIZE) download._set_total({'content-range': 'bytes 0-1/10'}) - self.assertEqual(download._compute_end_byte(0, 100, use_chunks=False), 9) + self.assertEqual(download._compute_end_byte(0, 100, use_chunks=False), + 9) self.assertEqual(download._compute_end_byte(0, 8, use_chunks=False), 8) def test__compute_end_byte_w_start_ge_0_wo_end_w_total_size(self): @@ -430,32 +406,29 @@ def test__compute_end_byte_w_start_ge_0_wo_end_w_total_size(self): def test__get_chunk_not_initialized(self): from gcloud.streaming.exceptions import TransferInvalidError - request = _Request() - http = object() download = self._makeOne(_Stream()) with self.assertRaises(TransferInvalidError): - found = download._get_chunk(0, 10) + download._get_chunk(0, 10) def test__get_chunk(self): from six.moves import http_client from gcloud._testing import _Monkey from gcloud.streaming import transfer as MUT - request = _Request() http = object() download = self._makeOne(_Stream()) - download._initialize(http, request.URL) + download._initialize(http, self.URL) response = _makeResponse(http_client.OK) requester = _MakeRequest(response) with _Monkey(MUT, - Request=lambda url: request, + Request=_Request, make_api_request=requester): found = download._get_chunk(0, 10) self.assertTrue(found is response) self.assertTrue(len(requester._requested), 1) - self.assertTrue(requester._requested[0][0] is request) + request = requester._requested[0][0] self.assertEqual(request.headers['range'], 'bytes=0-10') def test__process_response_w_FORBIDDEN(self): @@ -530,11 +503,9 @@ def test__process_response_w_NO_CONTENT(self): def test_get_range_not_initialized(self): from gcloud.streaming.exceptions import TransferInvalidError - request = _Request() - http = object() download = self._makeOne(_Stream()) with self.assertRaises(TransferInvalidError): - found = download.get_range(0, 10) + download.get_range(0, 10) def test_get_range_wo_total_size_complete(self): from six.moves import http_client @@ -544,22 +515,21 @@ def test_get_range_wo_total_size_complete(self): LEN = len(CONTENT) REQ_RANGE = 'bytes=0-%d' % (LEN,) RESP_RANGE = 'bytes 0-%d/%d' % (LEN - 1, LEN) - request = _Request() http = object() stream = _Stream() download = self._makeOne(stream) - download._initialize(http, request.URL) + download._initialize(http, self.URL) info = {'content-range': RESP_RANGE} response = _makeResponse(http_client.OK, info, CONTENT) requester = _MakeRequest(response) with _Monkey(MUT, - Request=lambda url: request, + Request=_Request, make_api_request=requester): download.get_range(0, LEN) self.assertTrue(len(requester._requested), 1) - self.assertTrue(requester._requested[0][0] is request) + request = requester._requested[0][0] self.assertEqual(request.headers, {'range': REQ_RANGE}) self.assertEqual(stream._written, [CONTENT]) self.assertEqual(download.total_size, LEN) @@ -574,22 +544,21 @@ def test_get_range_wo_total_size_wo_end(self): CHUNK_SIZE = 123 REQ_RANGE = 'bytes=%d-%d' % (START, START + CHUNK_SIZE - 1,) RESP_RANGE = 'bytes %d-%d/%d' % (START, LEN - 1, LEN) - request = _Request() http = object() stream = _Stream() download = self._makeOne(stream, chunksize=CHUNK_SIZE) - download._initialize(http, request.URL) + download._initialize(http, self.URL) info = {'content-range': RESP_RANGE} response = _makeResponse(http_client.OK, info, CONTENT[START:]) requester = _MakeRequest(response) with _Monkey(MUT, - Request=lambda url: request, + Request=_Request, make_api_request=requester): download.get_range(START) self.assertTrue(len(requester._requested), 1) - self.assertTrue(requester._requested[0][0] is request) + request = requester._requested[0][0] self.assertEqual(request.headers, {'range': REQ_RANGE}) self.assertEqual(stream._written, [CONTENT[START:]]) self.assertEqual(download.total_size, LEN) @@ -603,24 +572,22 @@ def test_get_range_w_total_size_partial(self): PARTIAL_LEN = 5 REQ_RANGE = 'bytes=0-%d' % (PARTIAL_LEN,) RESP_RANGE = 'bytes 0-%d/%d' % (PARTIAL_LEN, LEN,) - request = _Request() http = object() stream = _Stream() download = self._makeOne(stream, total_size=LEN) - download._initialize(http, request.URL) + download._initialize(http, self.URL) info = {'content-range': RESP_RANGE} - response = _makeResponse(http_client.OK, info, - CONTENT[:PARTIAL_LEN]) + response = _makeResponse(http_client.OK, info, CONTENT[:PARTIAL_LEN]) response.length = LEN requester = _MakeRequest(response) with _Monkey(MUT, - Request=lambda url: request, + Request=_Request, make_api_request=requester): download.get_range(0, PARTIAL_LEN) self.assertTrue(len(requester._requested), 1) - self.assertTrue(requester._requested[0][0] is request) + request = requester._requested[0][0] self.assertEqual(request.headers, {'range': REQ_RANGE}) self.assertEqual(stream._written, [CONTENT[:PARTIAL_LEN]]) self.assertEqual(download.total_size, LEN) @@ -636,23 +603,22 @@ def test_get_range_w_empty_chunk(self): CHUNK_SIZE = 123 REQ_RANGE = 'bytes=%d-%d' % (START, START + CHUNK_SIZE - 1,) RESP_RANGE = 'bytes %d-%d/%d' % (START, LEN - 1, LEN) - request = _Request() http = object() stream = _Stream() download = self._makeOne(stream, chunksize=CHUNK_SIZE) - download._initialize(http, request.URL) + download._initialize(http, self.URL) info = {'content-range': RESP_RANGE} response = _makeResponse(http_client.OK, info) requester = _MakeRequest(response) with _Monkey(MUT, - Request=lambda url: request, + Request=_Request, make_api_request=requester): with self.assertRaises(TransferRetryError): download.get_range(START) self.assertTrue(len(requester._requested), 1) - self.assertTrue(requester._requested[0][0] is request) + request = requester._requested[0][0] self.assertEqual(request.headers, {'range': REQ_RANGE}) self.assertEqual(stream._written, ['']) self.assertEqual(download.total_size, LEN) @@ -666,22 +632,21 @@ def test_get_range_w_total_size_wo_use_chunks(self): CHUNK_SIZE = 3 REQ_RANGE = 'bytes=0-%d' % (LEN - 1,) RESP_RANGE = 'bytes 0-%d/%d' % (LEN - 1, LEN,) - request = _Request() http = object() stream = _Stream() download = self._makeOne(stream, total_size=LEN, chunksize=CHUNK_SIZE) - download._initialize(http, request.URL) + download._initialize(http, self.URL) info = {'content-range': RESP_RANGE} response = _makeResponse(http_client.OK, info, CONTENT) requester = _MakeRequest(response) with _Monkey(MUT, - Request=lambda url: request, + Request=_Request, make_api_request=requester): download.get_range(0, use_chunks=False) self.assertTrue(len(requester._requested), 1) - self.assertTrue(requester._requested[0][0] is request) + request = requester._requested[0][0] self.assertEqual(request.headers, {'range': REQ_RANGE}) self.assertEqual(stream._written, [CONTENT]) self.assertEqual(download.total_size, LEN) @@ -697,29 +662,27 @@ def test_get_range_w_multiple_chunks(self): RESP_RANGE_1 = 'bytes 0-%d/%d' % (CHUNK_SIZE - 1, LEN) REQ_RANGE_2 = 'bytes=%d-%d' % (CHUNK_SIZE, LEN - 1) RESP_RANGE_2 = 'bytes %d-%d/%d' % (CHUNK_SIZE, LEN - 1, LEN) - request_1, request_2 = _Request(), _Request() - _requests = [request_1, request_2] http = object() stream = _Stream() download = self._makeOne(stream, chunksize=CHUNK_SIZE) - download._initialize(http, request_1.URL) + download._initialize(http, self.URL) info_1 = {'content-range': RESP_RANGE_1} response_1 = _makeResponse(http_client.PARTIAL_CONTENT, info_1, - CONTENT[:CHUNK_SIZE]) + CONTENT[:CHUNK_SIZE]) info_2 = {'content-range': RESP_RANGE_2} response_2 = _makeResponse(http_client.OK, info_2, - CONTENT[CHUNK_SIZE:]) + CONTENT[CHUNK_SIZE:]) requester = _MakeRequest(response_1, response_2) with _Monkey(MUT, - Request=lambda url: _requests.pop(0), + Request=_Request, make_api_request=requester): download.get_range(0) self.assertTrue(len(requester._requested), 2) - self.assertTrue(requester._requested[0][0] is request_1) + request_1 = requester._requested[0][0] self.assertEqual(request_1.headers, {'range': REQ_RANGE_1}) - self.assertTrue(requester._requested[1][0] is request_2) + request_2 = requester._requested[1][0] self.assertEqual(request_2.headers, {'range': REQ_RANGE_2}) self.assertEqual(stream._written, [b'ABC', b'DE']) self.assertEqual(download.total_size, LEN) @@ -729,7 +692,7 @@ def test_stream_file_not_initialized(self): download = self._makeOne(_Stream()) with self.assertRaises(TransferInvalidError): - found = download.stream_file() + download.stream_file() def test_stream_file_w_initial_response_complete(self): from six.moves import http_client @@ -775,12 +738,12 @@ def test_stream_file_w_initial_response_incomplete(self): request = _Request() with _Monkey(MUT, - Request=lambda url: request, + Request=_Request, make_api_request=requester): download.stream_file() self.assertTrue(len(requester._requested), 1) - self.assertTrue(requester._requested[0][0] is request) + request = requester._requested[0][0] self.assertEqual(request.headers, {'range': REQ_RANGE_2}) self.assertEqual(stream._written, [CONTENT[:CHUNK_SIZE], CONTENT[CHUNK_SIZE:]]) @@ -806,12 +769,12 @@ def test_stream_file_wo_initial_response_wo_total_size(self): request = _Request() with _Monkey(MUT, - Request=lambda url: request, + Request=_Request, make_api_request=requester): download.stream_file() self.assertTrue(len(requester._requested), 1) - self.assertTrue(requester._requested[0][0] is request) + request = requester._requested[0][0] self.assertEqual(request.headers, {'range': REQ_RANGE}) self.assertEqual(stream._written, [CONTENT]) self.assertEqual(download.total_size, LEN) @@ -830,6 +793,7 @@ def _makeOne(self, stream, mime_type=MIME_TYPE, *args, **kw): return self._getTargetClass()(stream, mime_type, *args, **kw) def test_ctor_defaults(self): + from gcloud.streaming.transfer import _DEFAULT_CHUNKSIZE stream = _Stream() upload = self._makeOne(stream) self.assertTrue(upload.stream is stream) @@ -840,6 +804,7 @@ def test_ctor_defaults(self): self.assertEqual(upload.progress, 0) self.assertTrue(upload.strategy is None) self.assertTrue(upload.total_size is None) + self.assertEqual(upload.chunksize, _DEFAULT_CHUNKSIZE) def test_ctor_w_kwds(self): stream = _Stream() @@ -963,7 +928,6 @@ def test_total_size_setter_initialized(self): def test_total_size_setter_not_initialized(self): SIZE = 123 upload = self._makeOne(_Stream) - http = object() upload.total_size = SIZE self.assertEqual(upload.total_size, SIZE) @@ -1063,7 +1027,7 @@ def test_configure_request_w_simple_wo_body(self): upload.configure_request(config, request, url_builder) - self.assertEqual(url_builder.query_params, {'uploadType': 'media'}) + self.assertEqual(url_builder.query_params, {'uploadType': 'media'}) self.assertEqual(url_builder.relative_path, config.simple_path) self.assertEqual(request.headers, {'content-type': self.MIME_TYPE}) @@ -1084,7 +1048,7 @@ def test_configure_request_w_simple_w_body(self): upload.configure_request(config, request, url_builder) - self.assertEqual(url_builder.query_params, {'uploadType': 'multipart'}) + self.assertEqual(url_builder.query_params, {'uploadType': 'multipart'}) self.assertEqual(url_builder.relative_path, config.simple_path) parser = Parser() @@ -1124,7 +1088,7 @@ def test_configure_request_w_resumable_wo_total_size(self): upload.configure_request(config, request, url_builder) - self.assertEqual(url_builder.query_params, {'uploadType': 'resumable'}) + self.assertEqual(url_builder.query_params, {'uploadType': 'resumable'}) self.assertEqual(url_builder.relative_path, config.resumable_path) self.assertEqual(request.headers, @@ -1143,7 +1107,7 @@ def test_configure_request_w_resumable_w_total_size(self): upload.configure_request(config, request, url_builder) - self.assertEqual(url_builder.query_params, {'uploadType': 'resumable'}) + self.assertEqual(url_builder.query_params, {'uploadType': 'resumable'}) self.assertEqual(url_builder.relative_path, config.resumable_path) self.assertEqual(request.headers, @@ -1252,7 +1216,6 @@ def test_refresh_upload_state_w_RESUME_INCOMPLETE_wo_range(self): from gcloud.streaming.transfer import RESUMABLE_UPLOAD CONTENT = b'ABCDEFGHIJ' LEN = len(CONTENT) - LAST = 5 http = object() stream = _Stream() upload = self._makeOne(stream, total_size=LEN) @@ -1315,15 +1278,6 @@ def test_initialize_upload_no_strategy(self): with self.assertRaises(UserError): upload.initialize_upload(request, http=object()) - def test_initialize_upload_wo_client_wo_http(self): - from gcloud.streaming.exceptions import UserError - from gcloud.streaming.transfer import SIMPLE_UPLOAD - request = _Request() - upload = self._makeOne(_Stream()) - upload.strategy = SIMPLE_UPLOAD - with self.assertRaises(UserError): - upload.initialize_upload(request) - def test_initialize_upload_simple_w_http(self): from gcloud.streaming.transfer import SIMPLE_UPLOAD request = _Request() @@ -1347,8 +1301,6 @@ def test_initialize_upload_w_http_resumable_not_initialized_w_error(self): from gcloud.streaming import transfer as MUT from gcloud.streaming.exceptions import HttpError from gcloud.streaming.transfer import RESUMABLE_UPLOAD - CONTENT = b'ABCDEFGHIJ' - LEN = len(CONTENT) request = _Request() upload = self._makeOne(_Stream()) upload.strategy = RESUMABLE_UPLOAD @@ -1364,8 +1316,6 @@ def test_initialize_upload_w_http_wo_auto_transfer_w_OK(self): from gcloud._testing import _Monkey from gcloud.streaming import transfer as MUT from gcloud.streaming.transfer import RESUMABLE_UPLOAD - CONTENT = b'ABCDEFGHIJ' - LEN = len(CONTENT) request = _Request() upload = self._makeOne(_Stream(), auto_transfer=False) upload.strategy = RESUMABLE_UPLOAD @@ -1382,16 +1332,13 @@ def test_initialize_upload_w_http_wo_auto_transfer_w_OK(self): self.assertEqual(len(requester._requested), 1) self.assertTrue(requester._requested[0][0] is request) - def test_initialize_upload_w_client_w_auto_transfer_w_OK(self): + def test_initialize_upload_w_granularity_w_auto_transfer_w_OK(self): from six.moves import http_client from gcloud._testing import _Monkey from gcloud.streaming import transfer as MUT from gcloud.streaming.transfer import RESUMABLE_UPLOAD CONTENT = b'ABCDEFGHIJ' - LEN = len(CONTENT) - FINALIZED_URL = 'http://example.com/upload?id=foobar&final' http = object() - client = _Client(http, FINALIZED_URL) request = _Request() upload = self._makeOne(_Stream(CONTENT), chunksize=1000) upload.strategy = RESUMABLE_UPLOAD @@ -1404,16 +1351,16 @@ def test_initialize_upload_w_client_w_auto_transfer_w_OK(self): with _Monkey(MUT, Request=_Request, make_api_request=requester): - upload.initialize_upload(request, client=client) + upload.initialize_upload(request, http) self.assertEqual(upload._server_chunk_granularity, 100) - self.assertEqual(upload.url, FINALIZED_URL) + self.assertEqual(upload.url, self.UPLOAD_URL) self.assertEqual(requester._responses, []) self.assertEqual(len(requester._requested), 2) self.assertTrue(requester._requested[0][0] is request) chunk_request = requester._requested[1][0] self.assertTrue(isinstance(chunk_request, _Request)) - self.assertEqual(chunk_request.url, FINALIZED_URL) + self.assertEqual(chunk_request.url, self.UPLOAD_URL) self.assertEqual(chunk_request.http_method, 'PUT') self.assertEqual(chunk_request.body, CONTENT) @@ -1533,6 +1480,7 @@ def test_stream_file_incomplete(self): make_api_request=requester): response = upload.stream_file() + self.assertTrue(response is response_2) self.assertEqual(len(requester._responses), 0) self.assertEqual(len(requester._requested), 2) @@ -1566,9 +1514,10 @@ def test_stream_file_incomplete_w_transfer_error(self): upload._server_chunk_granularity = 6 upload._initialize(http, self.UPLOAD_URL) - info = {'content-length': '0', - 'range': 'bytes=0-4', # simulate error, s.b. '0-5' - } + info = { + 'content-length': '0', + 'range': 'bytes=0-4', # simulate error, s.b. '0-5' + } response = _makeResponse(RESUME_INCOMPLETE, info) requester = _MakeRequest(response) @@ -1641,9 +1590,7 @@ def test__send_media_request_w_error(self): response_2 = _makeResponse(RESUME_INCOMPLETE, info_2) requester = _MakeRequest(response_1, response_2) - with _Monkey(MUT, - Request=_Request, - make_api_request=requester): + with _Monkey(MUT, Request=_Request, make_api_request=requester): with self.assertRaises(HttpError): upload._send_media_request(request, 9) @@ -1778,10 +1725,12 @@ def test__send_chunk_wo_total_size_stream_not_exhausted(self): self.assertEqual(request.url, self.UPLOAD_URL) self.assertEqual(request.http_method, 'PUT') self.assertEqual(request.body, CONTENT[:CHUNK_SIZE]) - self.assertEqual(request.headers, - {'content-length': '%d' % CHUNK_SIZE, # speling! - 'Content-Type': self.MIME_TYPE, - 'Content-Range': 'bytes 0-%d/*' % (CHUNK_SIZE- 1,)}) + expected_headers = { + 'content-length': '%d' % CHUNK_SIZE, # speling! + 'Content-Type': self.MIME_TYPE, + 'Content-Range': 'bytes 0-%d/*' % (CHUNK_SIZE - 1,), + } + self.assertEqual(request.headers, expected_headers) self.assertEqual(end, CHUNK_SIZE) def test__send_chunk_w_total_size_stream_not_exhausted(self): @@ -1807,11 +1756,12 @@ def test__send_chunk_w_total_size_stream_not_exhausted(self): self.assertTrue(isinstance(body_stream, StreamSlice)) self.assertTrue(body_stream._stream is stream) self.assertEqual(len(body_stream), CHUNK_SIZE) - self.assertEqual(request.headers, - {'content-length': '%d' % CHUNK_SIZE, # speling! - 'Content-Type': self.MIME_TYPE, - 'Content-Range': 'bytes 0-%d/%d' - % (CHUNK_SIZE- 1, SIZE)}) + expected_headers = { + 'content-length': '%d' % CHUNK_SIZE, # speling! + 'Content-Type': self.MIME_TYPE, + 'Content-Range': 'bytes 0-%d/%d' % (CHUNK_SIZE - 1, SIZE), + } + self.assertEqual(request.headers, expected_headers) self.assertEqual(end, CHUNK_SIZE) def test__send_chunk_w_total_size_stream_exhausted(self): @@ -1855,7 +1805,7 @@ class _UploadConfig(object): resumable_path = '/resumable/endpoint' simple_multipart = True simple_path = '/upload/endpoint' - + class _Stream(object): _closed = False @@ -1896,16 +1846,6 @@ def __init__(self, url=URL, http_method='GET', body='', headers=None): self.headers = headers -class _Client(object): - - def __init__(self, http, finalized_url): - self.http = http - self._finalized_url = finalized_url - - def FinalizeTransferUrl(self, existing_url): - return self._finalized_url - - class _MakeRequest(object): def __init__(self, *responses): @@ -1941,17 +1881,18 @@ def __call__(self, request, end): return self._response -def _tempdir(): +def _tempdir_maker(): import contextlib import shutil import tempfile @contextlib.contextmanager - def _tempdir(): + def _tempdir_mgr(): temp_dir = tempfile.mkdtemp() yield temp_dir shutil.rmtree(temp_dir) - return _tempdir + return _tempdir_mgr -_tempdir = _tempdir() +_tempdir = _tempdir_maker() +del _tempdir_maker diff --git a/gcloud/streaming/test_util.py b/gcloud/streaming/test_util.py index 0922334d29ed..a2cfef48ed99 100644 --- a/gcloud/streaming/test_util.py +++ b/gcloud/streaming/test_util.py @@ -1,33 +1,6 @@ -# pylint: skip-file import unittest2 -class Test_type_check(unittest2.TestCase): - - def _callFUT(self, *args, **kw): - from gcloud.streaming.util import type_check - return type_check(*args, **kw) - - def test_pass(self): - self.assertEqual(self._callFUT(123, int), 123) - - def test_fail_w_explicit_msg(self): - from gcloud.streaming.exceptions import TypecheckError - with self.assertRaises(TypecheckError) as err: - self._callFUT(123, str, 'foo') - self.assertEqual(err.exception.args, ('foo',)) - - def test_fail_w_single_type_no_msg(self): - from gcloud.streaming.exceptions import TypecheckError - with self.assertRaises(TypecheckError) as err: - self._callFUT(123, str) - - def test_fail_w_tuple_no_msg(self): - from gcloud.streaming.exceptions import TypecheckError - with self.assertRaises(TypecheckError) as err: - self._callFUT(123, (list, tuple)) - - class Test_calculate_wait_for_retry(unittest2.TestCase): def _callFUT(self, *args, **kw): diff --git a/gcloud/streaming/transfer.py b/gcloud/streaming/transfer.py index 2063d0fc2ace..7069d82e458b 100644 --- a/gcloud/streaming/transfer.py +++ b/gcloud/streaming/transfer.py @@ -1,4 +1,3 @@ -# pylint: skip-file """Upload and download support for apitools.""" import email.generator as email_generator @@ -14,7 +13,6 @@ from gcloud.streaming.exceptions import CommunicationError from gcloud.streaming.exceptions import ConfigurationValueError from gcloud.streaming.exceptions import HttpError -from gcloud.streaming.exceptions import InvalidDataError from gcloud.streaming.exceptions import InvalidUserInputError from gcloud.streaming.exceptions import NotFoundError from gcloud.streaming.exceptions import TransferInvalidError @@ -27,7 +25,6 @@ from gcloud.streaming.http_wrapper import RESUME_INCOMPLETE from gcloud.streaming.stream_slice import StreamSlice from gcloud.streaming.util import acceptable_mime_type -from gcloud.streaming.util import type_check RESUMABLE_UPLOAD_THRESHOLD = 5 << 20 @@ -35,109 +32,183 @@ RESUMABLE_UPLOAD = 'resumable' +_DEFAULT_CHUNKSIZE = 1 << 20 + + class _Transfer(object): + """Generic bits common to Uploads and Downloads. + + :type stream: file-like object + :param stream: stream to/from which data is downloaded/uploaded. + + :type close_stream: boolean + :param close_stream: should this instance close the stream when deleted + + :type chunksize: integer + :param chunksize: the size of chunks used to download/upload a file. - """Generic bits common to Uploads and Downloads.""" + :type auto_transfer: boolean + :param auto_transfer: should this instance automatically begin transfering + data when initialized - def __init__(self, stream, close_stream=False, chunksize=None, - auto_transfer=True, http=None, num_retries=5): - self.__bytes_http = None - self.__close_stream = close_stream - self.__http = http - self.__stream = stream - self.__url = None + :type http: :class:`httplib2.Http` (or workalike) + :param http: Http instance used to perform requests. + + :type num_retries: integer + :param num_retries: how many retries should the transfer attempt + """ + def __init__(self, stream, close_stream=False, + chunksize=_DEFAULT_CHUNKSIZE, auto_transfer=True, + http=None, num_retries=5): + self._bytes_http = None + self._close_stream = close_stream + self._http = http + self._stream = stream + self._url = None - self.__num_retries = 5 # Let the @property do validation self.num_retries = num_retries self.retry_func = handle_http_exceptions self.auto_transfer = auto_transfer - self.chunksize = chunksize or 1048576 + self.chunksize = chunksize def __repr__(self): return str(self) @property def close_stream(self): - return self.__close_stream + """Should this instance close the stream when deleted + + :rtype: boolean + """ + return self._close_stream @property def http(self): - return self.__http + """Http instance used to perform requests. + + :rtype: :class:`httplib2.Http` (or workalike) + """ + return self._http @property def bytes_http(self): - return self.__bytes_http or self.http + """Http instance used to perform binary requests. + + Defaults to :attr:`http`. + + :rtype: :class:`httplib2.Http` (or workalike) + """ + return self._bytes_http or self.http @bytes_http.setter def bytes_http(self, value): - self.__bytes_http = value + """Update Http instance used to perform binary requests. + + :type value: :class:`httplib2.Http` (or workalike) + :param value: new instance + """ + self._bytes_http = value @property def num_retries(self): - return self.__num_retries + """How many retries should the transfer attempt + + :rtype: integer + """ + return self._num_retries @num_retries.setter def num_retries(self, value): - type_check(value, six.integer_types) + """Update how many retries should the transfer attempt + + :type value: integer + """ + if not isinstance(value, six.integer_types): + raise ValueError("num_retries: pass an integer") + if value < 0: - raise InvalidDataError( + raise ValueError( 'Cannot have negative value for num_retries') - self.__num_retries = value + self._num_retries = value @property def stream(self): - return self.__stream + """Stream to/from which data is downloaded/uploaded. + + :rtype: file-like object + """ + return self._stream @property def url(self): - return self.__url + """URL to / from which data is downloaded/uploaded. + + :rtype: string + """ + return self._url def _initialize(self, http, url): - """Initialize this download by setting self.http and self.url. + """Initialize this download by setting :attr:`http` and :attr`url`. - We want the user to be able to override self.http by having set + Allow the user to be able to pre-initialize :attr:`http` by setting the value in the constructor; in that case, we ignore the provided http. - Args: - http: An httplib2.Http instance or None. - url: The url for this transfer. + :type http: :class:`httplib2.Http` (or a worklike) or None. + :param http: the Http instance to use to make requests. - Returns: - None. Initializes self. + :type url: string + :param url: The url for this transfer. """ self._ensure_uninitialized() if self.http is None: - self.__http = http or get_http() - self.__url = url + self._http = http or get_http() + self._url = url @property def initialized(self): + """Has the instance been initialized + + :rtype: boolean + """ return self.url is not None and self.http is not None def _ensure_initialized(self): + """Helper: assert that the instance is initialized. + + :raises: :exc:`gcloud.streaming.exceptions.TransferInvalidError` + if the instance is not initialized. + """ if not self.initialized: raise TransferInvalidError( 'Cannot use uninitialized %s', type(self).__name__) def _ensure_uninitialized(self): + """Helper: assert that the instance is not initialized. + + :raises: :exc:`gcloud.streaming.exceptions.TransferInvalidError` + if the instance is already initialized. + """ if self.initialized: raise TransferInvalidError( 'Cannot re-initialize %s', type(self).__name__) def __del__(self): - if self.__close_stream: - self.__stream.close() + if self._close_stream: + self._stream.close() class Download(_Transfer): + """Represent a single download. - """Data for a single download. + :type stream: file-like object + :param stream: stream to/from which data is downloaded/uploaded. - Public attributes: - chunksize: default chunksize to use for transfers. + :type kwds: dict + :param kwds: keyword arguments: all except ``total_size`` are passed + through to :meth:`_Transfer.__init__()`. """ _ACCEPTABLE_STATUSES = set(( http_client.OK, @@ -150,13 +221,27 @@ def __init__(self, stream, **kwds): total_size = kwds.pop('total_size', None) super(Download, self).__init__(stream, **kwds) self._initial_response = None - self.__progress = 0 - self.__total_size = total_size - self.__encoding = None + self._progress = 0 + self._total_size = total_size + self._encoding = None @classmethod def from_file(cls, filename, overwrite=False, auto_transfer=True, **kwds): - """Create a new download object from a filename.""" + """Create a new download object from a filename. + + :type filename: string + :param filename: path/filename for the target file + + :type overwrite: boolean + :param overwrite: should an existing file be overwritten + + :type auto_transfer: boolean + :param auto_transfer: should the transfer be started immediately + + :type kwds: dict + :param kwds: keyword arguments: passed + through to :meth:`_Transfer.__init__()`. + """ path = os.path.expanduser(filename) if os.path.exists(path) and not overwrite: raise InvalidUserInputError( @@ -166,21 +251,47 @@ def from_file(cls, filename, overwrite=False, auto_transfer=True, **kwds): @classmethod def from_stream(cls, stream, auto_transfer=True, total_size=None, **kwds): - """Create a new Download object from a stream.""" + """Create a new Download object from a stream. + + :type stream: writable file-like object + :param stream: the target file + + :type total_size: integer or None + :param total_size: total size of the file to be downloaded + + :type auto_transfer: boolean + :param auto_transfer: should the transfer be started immediately + + :type kwds: dict + :param kwds: keyword arguments: passed + through to :meth:`_Transfer.__init__()`. + """ return cls(stream, auto_transfer=auto_transfer, total_size=total_size, **kwds) @property def progress(self): - return self.__progress + """Number of bytes have been downloaded. + + :rtype: integer >= 0 + """ + return self._progress @property def total_size(self): - return self.__total_size + """Total number of bytes to be downloaded. + + :rtype: integer or None + """ + return self._total_size @property def encoding(self): - return self.__encoding + """'Content-Encoding' used to transfer the file + + :rtype: string or None + """ + return self._encoding def __repr__(self): if not self.initialized: @@ -190,41 +301,47 @@ def __repr__(self): self.progress, self.total_size, self.url) def configure_request(self, http_request, url_builder): + """Update http_request/url_builder with download-appropriate values. + + :type http_request: :class:`gcloud.streaming.http_wrapper.Request` + :param http_request: the request to be updated + + :type url_builder: instance with settable 'query_params' attribute. + :param url_builder: transfer policy object to be updated + """ url_builder.query_params['alt'] = 'media' - # TODO(craigcitro): We need to send range requests because by - # default httplib2 stores entire reponses in memory. Override - # httplib2's download method (as gsutil does) so that this is not - # necessary. http_request.headers['Range'] = 'bytes=0-%d' % (self.chunksize - 1,) def _set_total(self, info): + """Update 'total_size' based on data from a response. + + :type info: mapping + :param info: response headers + """ if 'content-range' in info: _, _, total = info['content-range'].rpartition('/') if total != '*': - self.__total_size = int(total) + self._total_size = int(total) # Note "total_size is None" means we don't know it; if no size # info was returned on our initial range request, that means we # have a 0-byte file. (That last statement has been verified # empirically, but is not clearly documented anywhere.) if self.total_size is None: - self.__total_size = 0 + self._total_size = 0 + + def initialize_download(self, http_request, http): + """Initialize this download. + + If the instance has :attr:`auto_transfer` enabled, begins the + download immediately. - def initialize_download(self, http_request, http=None, client=None): - """Initialize this download by making a request. + :type http_request: :class:`gcloud.streaming.http_wrapper.Request` + :param http_request: the request to use to initialize this download. - Args: - http_request: The HttpRequest to use to initialize this download. - http: The httplib2.Http instance for this request. - client: If provided, let this client process the final URL before - sending any additional requests. If client is provided and - http is not, client.http will be used instead. + :type http: :class:`httplib2.Http` (or workalike) + :param http: Http instance for this request. """ self._ensure_uninitialized() - if http is None and client is None: - raise UserError('Must provide client or http.') - http = http or client.http - if client is not None: - http_request.url = client.FinalizeTransferUrl(http_request.url) url = http_request.url if self.auto_transfer: end_byte = self._compute_end_byte(0) @@ -232,12 +349,10 @@ def initialize_download(self, http_request, http=None, client=None): response = make_api_request( self.bytes_http or http, http_request) if response.status_code not in self._ACCEPTABLE_STATUSES: - raise HttpError.FromResponse(response) + raise HttpError.from_response(response) self._initial_response = response self._set_total(response.info) url = response.info.get('content-location', response.request_url) - if client is not None: - url = client.FinalizeTransferUrl(url) self._initialize(http, url) # Unless the user has requested otherwise, we want to just # go ahead and pump the bytes now. @@ -245,6 +360,20 @@ def initialize_download(self, http_request, http=None, client=None): self.stream_file(use_chunks=True) def _normalize_start_end(self, start, end=None): + """Validate / fix up byte range. + + :type start: integer + :param start: start byte of the range: if negative, used as an + offset from the end. + + :type end: integer + :param end: end byte of the range. + + :rtype: tuple, (start, end) + :returns: the normalized start, end pair. + :raises: :exc:`gcloud.streaming.exceptions.TransferInvalidError` + for invalid combinations of start, end. + """ if end is not None: if start < 0: raise TransferInvalidError( @@ -263,6 +392,18 @@ def _normalize_start_end(self, start, end=None): return start, self.total_size - 1 def _set_range_header(self, request, start, end=None): + """Update the 'Range' header in a request to match a byte range. + + :type request: :class:`gcloud.streaming.http_wrapper.Request` + :param request: the request to update + + :type start: integer + :param start: start byte of the range: if negative, used as an + offset from the end. + + :type end: integer + :param end: end byte of the range. + """ if start < 0: request.headers['range'] = 'bytes=%d' % start elif end is None: @@ -273,24 +414,25 @@ def _set_range_header(self, request, start, end=None): def _compute_end_byte(self, start, end=None, use_chunks=True): """Compute the last byte to fetch for this request. - This is all based on the HTTP spec for Range and - Content-Range. + Based on the HTTP spec for Range and Content-Range. - Note that this is potentially confusing in several ways: - * the value for the last byte is 0-based, eg "fetch 10 bytes - from the beginning" would return 9 here. - * if we have no information about size, and don't want to - use the chunksize, we'll return None. - See the tests for more examples. + .. note:: + This is potentially confusing in several ways: + - the value for the last byte is 0-based, eg "fetch 10 bytes + from the beginning" would return 9 here. + - if we have no information about size, and don't want to + use the chunksize, we'll return None. - Args: - start: byte to start at. - end: (int or None, default: None) Suggested last byte. - use_chunks: (bool, default: True) If False, ignore self.chunksize. + :type start: integer + :param start: start byte of the range. - Returns: - Last byte to use in a Range header, or None. + :type end: integer or None + :param end: suggested last byte of the range. + :type use_chunks: boolean + :param use_chunks: If False, ignore :attr:`chunksize`. + + :returns: Last byte to use in a 'Range' header, or None. """ end_byte = end @@ -314,7 +456,17 @@ def _compute_end_byte(self, start, end=None, use_chunks=True): return end_byte def _get_chunk(self, start, end): - """Retrieve a chunk, and return the full response.""" + """Retrieve a chunk of the file. + + :type start: integer + :param start: start byte of the range. + + :type end: integer or None + :param end: end byte of the range. + + :rtype: :class:`gcloud.streaming.http_wrapper.Response` + :returns: response from the chunk request. + """ self._ensure_initialized() request = Request(url=self.url) self._set_range_header(request, start, end=end) @@ -323,23 +475,32 @@ def _get_chunk(self, start, end): retries=self.num_retries) def _process_response(self, response): - """Process response (by updating self and writing to self.stream).""" + """Update attribtes and writing stream, based on response. + + :type response: :class:`gcloud.streaming.http_wrapper.Response` + :param response: response from a download request. + + :rtype: :class:`gcloud.streaming.http_wrapper.Response` + :returns: the response + :raises: :exc:`gcloud.streaming.exceptions.HttpError` for + missing / unauthorized responses; + :exc:`gcloud.streaming.exceptions.TransferRetryError` + for other error responses. + """ if response.status_code not in self._ACCEPTABLE_STATUSES: # We distinguish errors that mean we made a mistake in setting # up the transfer versus something we should attempt again. if response.status_code in (http_client.FORBIDDEN, http_client.NOT_FOUND): - raise HttpError.FromResponse(response) + raise HttpError.from_response(response) else: raise TransferRetryError(response.content) if response.status_code in (http_client.OK, http_client.PARTIAL_CONTENT): self.stream.write(response.content) - self.__progress += response.length + self._progress += response.length if response.info and 'content-encoding' in response.info: - # TODO(craigcitro): Handle the case where this changes over a - # download. - self.__encoding = response.info['content-encoding'] + self._encoding = response.info['content-encoding'] elif response.status_code == http_client.NO_CONTENT: # It's important to write something to the stream for the case # of a 0-byte download to a file, as otherwise python won't @@ -350,6 +511,8 @@ def _process_response(self, response): def get_range(self, start, end=None, use_chunks=True): """Retrieve a given byte range from this download, inclusive. + Writes retrieved bytes into :attr:`stream`. + Range must be of one of these three forms: * 0 <= start, end = None: Fetch from start to the end of the file. * 0 <= start <= end: Fetch the bytes from start to end. @@ -358,14 +521,19 @@ def get_range(self, start, end=None, use_chunks=True): (These variations correspond to those described in the HTTP 1.1 protocol for range headers in RFC 2616, sec. 14.35.1.) - Args: - start: (int) Where to start fetching bytes. (See above.) - end: (int, optional) Where to stop fetching bytes. (See above.) - use_chunks: (bool, default: True) If False, ignore self.chunksize - and fetch this range in a single request. + :type start: integer + :param start: Where to start fetching bytes. (See above.) + + :type end: integer or ``None`` + :param end: Where to stop fetching bytes. (See above.) + + :type use_chunks: boolean + :param use_chunks: If False, ignore :attr:`chunksize` + and fetch this range in a single request. + If True, streams via chunks. - Returns: - None. Streams bytes into self.stream. + :raises: :exc:`gcloud.streaming.exceptions.TransferRetryError` + if a request returns an empty response. """ self._ensure_initialized() progress_end_normalized = False @@ -378,7 +546,7 @@ def get_range(self, start, end=None, use_chunks=True): while (not progress_end_normalized or end_byte is None or progress <= end_byte): end_byte = self._compute_end_byte(progress, end=end_byte, - use_chunks=use_chunks) + use_chunks=use_chunks) response = self._get_chunk(progress, end_byte) if not progress_end_normalized: self._set_total(response.info) @@ -393,12 +561,12 @@ def get_range(self, start, end=None, use_chunks=True): def stream_file(self, use_chunks=True): """Stream the entire download. - Args: - use_chunks: (bool, default: True) If False, ignore self.chunksize - and stream this download in a single request. + Writes retrieved bytes into :attr:`stream`. - Returns: - None. Streams bytes into self.stream. + :type use_chunks: boolean + :param use_chunks: If False, ignore :attr:`chunksize` + and stream this download in a single request. + If True, streams via chunks. """ self._ensure_initialized() while True: @@ -407,7 +575,7 @@ def stream_file(self, use_chunks=True): self._initial_response = None else: end_byte = self._compute_end_byte(self.progress, - use_chunks=use_chunks) + use_chunks=use_chunks) response = self._get_chunk(self.progress, end_byte) if self.total_size is None: self._set_total(response.info) @@ -418,38 +586,65 @@ def stream_file(self, use_chunks=True): class Upload(_Transfer): + """Represent a single Upload. + + :type stream: file-like object + :param stream: stream to/from which data is downloaded/uploaded. + + :type mime_type: string: + :param mime_type: MIME type of the upload. + + :type total_size: integer or None + :param total_size: Total upload size for the stream. + + :type http: :class:`httplib2.Http` (or workalike) + :param http: Http instance used to perform requests. - """Data for a single Upload. + :type close_stream: boolean + :param close_stream: should this instance close the stream when deleted - Fields: - stream: The stream to upload. - mime_type: MIME type of the upload. - total_size: (optional) Total upload size for the stream. - close_stream: (default: False) Whether or not we should close the - stream when finished with the upload. - auto_transfer: (default: True) If True, stream all bytes as soon as - the upload is created. + :type auto_transfer: boolean + :param auto_transfer: should this instance automatically begin transfering + data when initialized + + :type kwds: dict + :param kwds: keyword arguments: all except ``total_size`` are passed + through to :meth:`_Transfer.__init__()`. """ _REQUIRED_SERIALIZATION_KEYS = set(( 'auto_transfer', 'mime_type', 'total_size', 'url')) def __init__(self, stream, mime_type, total_size=None, http=None, - close_stream=False, chunksize=None, auto_transfer=True, + close_stream=False, auto_transfer=True, **kwds): super(Upload, self).__init__( - stream, close_stream=close_stream, chunksize=chunksize, - auto_transfer=auto_transfer, http=http, **kwds) + stream, close_stream=close_stream, auto_transfer=auto_transfer, + http=http, **kwds) self._final_response = None self._server_chunk_granularity = None self._complete = False - self.__mime_type = mime_type - self.__progress = 0 - self.__strategy = None - self.__total_size = total_size + self._mime_type = mime_type + self._progress = 0 + self._strategy = None + self._total_size = total_size @classmethod def from_file(cls, filename, mime_type=None, auto_transfer=True, **kwds): - """Create a new Upload object from a filename.""" + """Create a new Upload object from a filename. + + :type filename: string + :param filename: path/filename to the file being uploaded + + :type mime_type: string + :param mime_type: MIMEtype of the file being uploaded + + :type auto_transfer: boolean or None + :param auto_transfer: should the transfer be started immediately + + :type kwds: dict + :param kwds: keyword arguments: passed + through to :meth:`_Transfer.__init__()`. + """ path = os.path.expanduser(filename) if not os.path.exists(path): raise NotFoundError('Could not find file %s' % path) @@ -463,9 +658,26 @@ def from_file(cls, filename, mime_type=None, auto_transfer=True, **kwds): close_stream=True, auto_transfer=auto_transfer, **kwds) @classmethod - def from_stream(cls, stream, mime_type, total_size=None, auto_transfer=True, - **kwds): - """Create a new Upload object from a stream.""" + def from_stream(cls, stream, mime_type, + total_size=None, auto_transfer=True, **kwds): + """Create a new Upload object from a stream. + + :type stream: writable file-like object + :param stream: the target file + + :type mime_type: string + :param mime_type: MIMEtype of the file being uploaded + + :type total_size: integer or None + :param total_size: Size of the file being uploaded + + :type auto_transfer: boolean or None + :param auto_transfer: should the transfer be started immediately + + :type kwds: dict + :param kwds: keyword arguments: passed + through to :meth:`_Transfer.__init__()`. + """ if mime_type is None: raise InvalidUserInputError( 'No mime_type specified for stream') @@ -474,36 +686,69 @@ def from_stream(cls, stream, mime_type, total_size=None, auto_transfer=True, @property def complete(self): + """Has the entire stream been uploaded. + + :rtype: boolean + """ return self._complete @property def mime_type(self): - return self.__mime_type + """MIMEtype of the file being uploaded. + + :rtype: string + """ + return self._mime_type @property def progress(self): - return self.__progress + """Bytes uploaded so far + + :rtype: integer + """ + return self._progress @property def strategy(self): - return self.__strategy + """Upload strategy to use + + :rtype: string or None + """ + return self._strategy @strategy.setter def strategy(self, value): + """Update upload strategy to use + + :type value: string (one of :data:`SIMPLE_UPLOAD` or + :data:`RESUMABLE_UPLOAD`) + + :raises: :exc:`gcloud.streaming.exceptions.UserError` + if value is not one of the two allowed strings. + """ if value not in (SIMPLE_UPLOAD, RESUMABLE_UPLOAD): raise UserError(( 'Invalid value "%s" for upload strategy, must be one of ' '"simple" or "resumable".') % value) - self.__strategy = value + self._strategy = value @property def total_size(self): - return self.__total_size + """Total size of the stream to be uploaded. + + :rtype: integer or None + """ + return self._total_size @total_size.setter def total_size(self, value): + """Update total size of the stream to be uploaded. + + :type value: integer or None + :param value: the size + """ self._ensure_uninitialized() - self.__total_size = value + self._total_size = value def __repr__(self): if not self.initialized: @@ -520,12 +765,12 @@ def _set_default_strategy(self, upload_config, http_request): large, (2) the simple endpoint doesn't support multipart requests and we have metadata, or (3) there is no simple upload endpoint. - Args: - upload_config: Configuration for the upload endpoint. - http_request: The associated http request. + :type upload_config: instance w/ ``max_size`` and ``accept`` + attributes + :param upload_config: Configuration for the upload endpoint. - Returns: - None. + :type http_request: :class:`gcloud.streaming.http_wrapper.Request` + :param http_request: The associated http request. """ if upload_config.resumable_path is None: self.strategy = SIMPLE_UPLOAD @@ -542,7 +787,23 @@ def _set_default_strategy(self, upload_config, http_request): self.strategy = strategy def configure_request(self, upload_config, http_request, url_builder): - """Configure the request and url for this upload.""" + """Configure the request and url for this upload. + + :type upload_config: instance w/ ``max_size`` and ``accept`` + attributes + :param upload_config: transfer policy object to be queried + + :type http_request: :class:`gcloud.streaming.http_wrapper.Request` + :param http_request: the request to be updated + + :type url_builder: instance with settable 'relative_path' and + 'query_params' attributes. + :param url_builder: transfer policy object to be updated + + :raises: :exc:`gcloud.streaming.exceptions.InvalidUserInputError` + if the requested upload is too big, or does not have an + acceptable MIME type. + """ # Validate total_size vs. max_size if (self.total_size and upload_config.max_size and self.total_size > upload_config.max_size): @@ -570,13 +831,13 @@ def configure_request(self, upload_config, http_request, url_builder): self._configure_resumable_request(http_request) def _configure_media_request(self, http_request): - """Configure http_request as a simple request for this upload.""" + """Helper for 'configure_request': set up simple request.""" http_request.headers['content-type'] = self.mime_type http_request.body = self.stream.read() http_request.loggable_body = '' def _configure_multipart_request(self, http_request): - """Configure http_request as a multipart request for this upload.""" + """Helper for 'configure_request': set up multipart request.""" # This is a multipart/related upload. msg_root = mime_multipart.MIMEMultipart('related') # msg_root should not write out its own headers @@ -599,10 +860,10 @@ def _configure_multipart_request(self, http_request): # `> ` to `From ` lines. # NOTE: We must use six.StringIO() instead of io.StringIO() since the # `email` library uses cStringIO in Py2 and io.StringIO in Py3. - fp = six.StringIO() - g = email_generator.Generator(fp, mangle_from_=False) - g.flatten(msg_root, unixfrom=False) - http_request.body = fp.getvalue() + stream = six.StringIO() + generator = email_generator.Generator(stream, mangle_from_=False) + generator.flatten(msg_root, unixfrom=False) + http_request.body = stream.getvalue() multipart_boundary = msg_root.get_boundary() http_request.headers['content-type'] = ( @@ -614,28 +875,30 @@ def _configure_multipart_request(self, http_request): http_request.loggable_body = multipart_boundary.join(body_components) def _configure_resumable_request(self, http_request): + """Helper for 'configure_request': set up resumable request.""" http_request.headers['X-Upload-Content-Type'] = self.mime_type if self.total_size is not None: http_request.headers[ 'X-Upload-Content-Length'] = str(self.total_size) def refresh_upload_state(self): - """Talk to the server and refresh the state of this resumable upload. - - Returns: - Response if the upload is complete. + """Refresh the state of a resumable upload via query to the back-end. """ if self.strategy != RESUMABLE_UPLOAD: return self._ensure_initialized() - # XXX Per RFC 2616/7231, a 'PUT' request is absolutely inappropriate - # here: # it is intended to be used to replace the entire resource, - # not to # query for a status. - # If the back-end doesn't provide a way to query for this state - # via a 'GET' request, somebody should be spanked. - # http://www.w3.org/Protocols/rfc2616/rfc2616-sec9.html#sec9.6 - # http://tools.ietf.org/html/rfc7231#section-4.3.4 - # The violation is documented: + # NOTE: Per RFC 2616[1]/7231[2], a 'PUT' request is inappropriate + # here: it is intended to be used to replace the entire + # resource, not to query for a status. + # + # If the back-end doesn't provide a way to query for this state + # via a 'GET' request, somebody should be spanked. + # + # The violation is documented[3]. + # + # [1] http://www.w3.org/Protocols/rfc2616/rfc2616-sec9.html#sec9.6 + # [2] http://tools.ietf.org/html/rfc7231#section-4.3.4 + # [3] # https://cloud.google.com/storage/docs/json_api/v1/how-tos/upload#resume-upload refresh_request = Request( url=self.url, http_method='PUT', @@ -647,7 +910,7 @@ def refresh_upload_state(self): if refresh_response.status_code in (http_client.OK, http_client.CREATED): self._complete = True - self.__progress = self.total_size + self._progress = self.total_size self.stream.seek(self.progress) # If we're finished, the refresh response will contain the metadata # originally requested. Cache it so it can be returned in @@ -655,51 +918,64 @@ def refresh_upload_state(self): self._final_response = refresh_response elif refresh_response.status_code == RESUME_INCOMPLETE: if range_header is None: - self.__progress = 0 + self._progress = 0 else: - self.__progress = self._last_byte(range_header) + 1 + self._progress = self._last_byte(range_header) + 1 self.stream.seek(self.progress) else: - raise HttpError.FromResponse(refresh_response) + raise HttpError.from_response(refresh_response) def _get_range_header(self, response): - # XXX Per RFC 2616/7233, 'Range' is a request header, not a response - # header: # If the back-end is actually setting 'Range' on responses, - # somebody should be spanked: it should be sending 'Content-Range' - # (including the # '/' trailer). - # http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html - # http://tools.ietf.org/html/rfc7233#section-3.1 - # http://tools.ietf.org/html/rfc7233#section-4.2 - # The violation is documented: + """Return a 'Range' header from a response. + + :type response: :class:`gcloud.streaming.http_wrapper.Response` + :param response: response to be queried + + :rtype: string + """ + # NOTE: Per RFC 2616[1]/7233[2][3], 'Range' is a request header, + # not a response header. If the back-end is actually setting + # 'Range' on responses, somebody should be spanked: it should + # be sending 'Content-Range' (including the # '/' + # trailer). + # + # The violation is documented[4]. + # + # [1] http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html + # [2] http://tools.ietf.org/html/rfc7233#section-3.1 + # [3] http://tools.ietf.org/html/rfc7233#section-4.2 + # [4] # https://cloud.google.com/storage/docs/json_api/v1/how-tos/upload#chunking return response.info.get('Range', response.info.get('range')) - def initialize_upload(self, http_request, http=None, client=None): - """Initialize this upload from the given http_request.""" + def initialize_upload(self, http_request, http): + """Initialize this upload from the given http_request. + + :type http_request: :class:`gcloud.streaming.http_wrapper.Request` + :param http_request: the request to be used + + :type http: :class:`httplib2.Http` (or workalike) + :param http: Http instance for this request. + + :raises: :exc:`gcloud.streaming.exceptions.UserError` if the instance + has not been configured with a strategy. + """ if self.strategy is None: raise UserError( 'No upload strategy set; did you call configure_request?') - if http is None and client is None: - raise UserError('Must provide client or http.') if self.strategy != RESUMABLE_UPLOAD: return - http = http or client.http - if client is not None: - http_request.url = client.FinalizeTransferUrl(http_request.url) self._ensure_uninitialized() http_response = make_api_request(http, http_request, - retries=self.num_retries) + retries=self.num_retries) if http_response.status_code != http_client.OK: - raise HttpError.FromResponse(http_response) + raise HttpError.from_response(http_response) - # XXX when is this getting converted to an integer? granularity = http_response.info.get('X-Goog-Upload-Chunk-Granularity') if granularity is not None: granularity = int(granularity) self._server_chunk_granularity = granularity url = http_response.info['location'] - if client is not None: - url = client.FinalizeTransferUrl(url) self._initialize(http, url) # Unless the user has requested otherwise, we want to just @@ -710,11 +986,26 @@ def initialize_upload(self, http_request, http=None, client=None): return http_response def _last_byte(self, range_header): + """Parse the last byte from a 'Range' header. + + :type range_header: string + :param range_header: 'Range' header value per RFC 2616/7233 + """ _, _, end = range_header.partition('-') - # TODO(craigcitro): Validate start == 0? return int(end) def _validate_chunksize(self, chunksize=None): + """Validate chunksize against server-specified granularity. + + Helper for :meth:`stream_file`. + + :type chunksize: integer or None + :param chunksize: the chunk size to be tested. + + :raises: :exc:`gcloud.streaming.exceptions.ConfigurationValueError` + if ``chunksize`` is not a multiple of the server-specified + granulariy. + """ if self._server_chunk_granularity is None: return chunksize = chunksize or self.chunksize @@ -724,10 +1015,11 @@ def _validate_chunksize(self, chunksize=None): self._server_chunk_granularity) def stream_file(self, use_chunks=True): - """Send this resumable upload - - If 'use_chunks' is False, send it in a single request. Otherwise, - send it in chunks. + """Upload the stream. + + :type use_chunks: boolean + :param use_chunks: If False, send the stream in a single request. + Otherwise, send it in chunks. """ if self.strategy != RESUMABLE_UPLOAD: raise InvalidUserInputError( @@ -743,9 +1035,8 @@ def stream_file(self, use_chunks=True): if response.status_code in (http_client.OK, http_client.CREATED): self._complete = True break - self.__progress = self._last_byte(response.info['range']) + self._progress = self._last_byte(response.info['range']) if self.progress + 1 != self.stream.tell(): - # TODO(craigcitro): Add a better way to recover here. raise CommunicationError( 'Failed to transfer all bytes in chunk, upload paused at ' 'byte %d' % self.progress) @@ -761,7 +1052,21 @@ def stream_file(self, use_chunks=True): return response def _send_media_request(self, request, end): - """Request helper function for SendMediaBody & SendChunk.""" + """Peform API upload request. + + Helper for _send_media_body & _send_chunk: + + :type request: :class:`gcloud.streaming.http_wrapper.Request` + :param request: the request to upload + + :type end: integer + :param end: end byte of the to be uploaded + + :rtype: :class:`gcloud.streaming.http_wrapper.Response` + :returns: the response + :raises: :exc:`gcloud.streaming.exceptions.HttpError` if the status + code from the response indicates an error. + """ response = make_api_request( self.bytes_http, request, retry_func=self.retry_func, retries=self.num_retries) @@ -770,7 +1075,7 @@ def _send_media_request(self, request, end): # We want to reset our state to wherever the server left us # before this failed request, and then raise. self.refresh_upload_state() - raise HttpError.FromResponse(response) + raise HttpError.from_response(response) if response.status_code == RESUME_INCOMPLETE: last_byte = self._last_byte( self._get_range_header(response)) @@ -779,15 +1084,20 @@ def _send_media_request(self, request, end): return response def _send_media_body(self, start): - """Send the entire media stream in a single request.""" + """ Send the entire stream in a single request. + + Helper for :meth:`stream_file`: + + :type start: integer + :param start: start byte of the range. + """ self._ensure_initialized() if self.total_size is None: raise TransferInvalidError( 'Total size must be known for SendMediaBody') body_stream = StreamSlice(self.stream, self.total_size - start) - request = Request(url=self.url, http_method='PUT', - body=body_stream) + request = Request(url=self.url, http_method='PUT', body=body_stream) request.headers['Content-Type'] = self.mime_type if start == self.total_size: # End of an upload with 0 bytes left to send; just finalize. @@ -801,7 +1111,13 @@ def _send_media_body(self, start): return self._send_media_request(request, self.total_size) def _send_chunk(self, start): - """Send the specified chunk.""" + """Send a chunk of the stream. + + Helper for :meth:`stream_file`: + + :type start: integer + :param start: start byte of the range. + """ self._ensure_initialized() no_log_body = self.total_size is None if self.total_size is None: @@ -811,25 +1127,19 @@ def _send_chunk(self, start): self.stream, start, self.chunksize) end = body_stream.stream_end_position if body_stream.stream_exhausted: - self.__total_size = end - # TODO: Here, change body_stream from a stream to a string object, + self._total_size = end + # Here, change body_stream from a stream to a string object, # which means reading a chunk into memory. This works around # https://code.google.com/p/httplib2/issues/detail?id=176 which can # cause httplib2 to skip bytes on 401's for file objects. - # Rework this solution to be more general. body_stream = body_stream.read(self.chunksize) else: end = min(start + self.chunksize, self.total_size) body_stream = StreamSlice(self.stream, end - start) - # TODO(craigcitro): Think about clearer errors on "no data in - # stream". - request = Request(url=self.url, http_method='PUT', - body=body_stream) + request = Request(url=self.url, http_method='PUT', body=body_stream) request.headers['Content-Type'] = self.mime_type if no_log_body: # Disable logging of streaming body. - # TODO: Remove no_log_body and rework as part of a larger logs - # refactor. request.loggable_body = '' if self.total_size is None: # Streaming resumable upload case, unknown total size. diff --git a/gcloud/streaming/util.py b/gcloud/streaming/util.py index 5cdaeaebd9eb..b3d5b979dc4f 100644 --- a/gcloud/streaming/util.py +++ b/gcloud/streaming/util.py @@ -1,39 +1,26 @@ -# pylint: skip-file """Assorted utilities shared between parts of apitools.""" import random from gcloud.streaming.exceptions import ConfigurationValueError from gcloud.streaming.exceptions import InvalidUserInputError -from gcloud.streaming.exceptions import TypecheckError - - -def type_check(arg, arg_type, msg=None): - if not isinstance(arg, arg_type): - if msg is None: - if isinstance(arg_type, tuple): - msg = 'Type of arg is "%s", not one of %r' % ( - type(arg), arg_type) - else: - msg = 'Type of arg is "%s", not "%s"' % (type(arg), arg_type) - raise TypecheckError(msg) - return arg def calculate_wait_for_retry(retry_attempt, max_wait=60): - """Calculates amount of time to wait before a retry attempt. + """Calculate the amount of time to wait before a retry attempt. Wait time grows exponentially with the number of attempts. A random amount of jitter is added to spread out retry attempts from different clients. - Args: - retry_attempt: Retry attempt counter. - max_wait: Upper bound for wait time [seconds]. + :type retry_attempt: integer + :param retry_attempt: Retry attempt counter. - Returns: - Number of seconds to wait before retrying request. + :type max_wait: integer + :param max_wait: Upper bound for wait time [seconds]. + :rtype: integer + :returns: Number of seconds to wait before retrying request. """ wait_time = 2 ** retry_attempt @@ -43,19 +30,22 @@ def calculate_wait_for_retry(retry_attempt, max_wait=60): def acceptable_mime_type(accept_patterns, mime_type): - """Return True iff mime_type is acceptable for one of accept_patterns. + """Check that ``mime_type`` matches one of ``accept_patterns``. Note that this function assumes that all patterns in accept_patterns will be simple types of the form "type/subtype", where one or both of these can be "*". We do not support parameters (i.e. "; q=") in patterns. - Args: - accept_patterns: list of acceptable MIME types. - mime_type: the mime type we would like to match. + :type accept_patterns: list of string + :param accept_patterns: acceptable MIME types. + + :type mime_type: string + :param mime_type: the MIME being checked - Returns: - Whether or not mime_type matches (at least) one of these patterns. + :rtype: boolean + :returns: True if the supplied MIME type matches at least one of the + patterns, else False. """ if '/' not in mime_type: raise InvalidUserInputError( @@ -66,10 +56,9 @@ def acceptable_mime_type(accept_patterns, mime_type): 'MIME patterns with parameter unsupported: "%s"' % ', '.join( unsupported_patterns)) - def MimeTypeMatches(pattern, mime_type): + def _match(pattern, mime_type): """Return True iff mime_type is acceptable for pattern.""" return all(accept in ('*', provided) for accept, provided in zip(pattern.split('/'), mime_type.split('/'))) - return any(MimeTypeMatches(pattern, mime_type) - for pattern in accept_patterns) + return any(_match(pattern, mime_type) for pattern in accept_patterns) diff --git a/tox.ini b/tox.ini index 12423f1a79db..4cb76f997fac 100644 --- a/tox.ini +++ b/tox.ini @@ -55,7 +55,7 @@ deps = {[testenv:docs]deps} passenv = {[testenv:docs]passenv} [pep8] -exclude = gcloud/datastore/_datastore_v1_pb2.py,gcloud/bigtable/_generated/*,docs/conf.py,gcloud/streaming/* +exclude = gcloud/datastore/_datastore_v1_pb2.py,gcloud/bigtable/_generated/*,docs/conf.py verbose = 1 [testenv:lint]