diff --git a/gcloud/storage/blob.py b/gcloud/storage/blob.py index 6b1280a31289..c852c5e7c816 100644 --- a/gcloud/storage/blob.py +++ b/gcloud/storage/blob.py @@ -18,6 +18,7 @@ import copy import hashlib from io import BytesIO +from io import UnsupportedOperation import json import mimetypes import os @@ -491,10 +492,11 @@ def upload_from_file(self, file_obj, rewind=False, size=None, total_bytes = size if total_bytes is None: if hasattr(file_obj, 'fileno'): - total_bytes = os.fstat(file_obj.fileno()).st_size - else: - raise ValueError('total bytes could not be determined. Please ' - 'pass an explicit size.') + try: + total_bytes = os.fstat(file_obj.fileno()).st_size + except (OSError, UnsupportedOperation): + pass # Assuming fd is not an actual file (maybe socket). + headers = { 'Accept': 'application/json', 'Accept-Encoding': 'gzip, deflate', @@ -510,6 +512,13 @@ def upload_from_file(self, file_obj, rewind=False, size=None, if self.chunk_size is not None: upload.chunksize = self.chunk_size + if total_bytes is None: + upload.strategy = RESUMABLE_UPLOAD + elif total_bytes is None: + raise ValueError('total bytes could not be determined. Please ' + 'pass an explicit size, or supply a chunk size ' + 'for a streaming transfer.') + url_builder = _UrlBuilder(bucket_name=self.bucket.name, object_name=self.name) upload_config = _UploadConfig() diff --git a/gcloud/storage/test_blob.py b/gcloud/storage/test_blob.py index 3ae92ed76a3d..0259a1613f8a 100644 --- a/gcloud/storage/test_blob.py +++ b/gcloud/storage/test_blob.py @@ -507,6 +507,102 @@ def _upload_from_file_simple_test_helper(self, properties=None, self.assertEqual(headers['Content-Length'], '6') self.assertEqual(headers['Content-Type'], expected_content_type) + def test_upload_from_file_stream(self): + from six.moves.http_client import OK + from six.moves.urllib.parse import parse_qsl + from six.moves.urllib.parse import urlsplit + from gcloud.streaming import http_wrapper + + BLOB_NAME = 'blob-name' + UPLOAD_URL = 'http://example.com/upload/name/key' + DATA = b'ABCDE' + loc_response = {'status': OK, 'location': UPLOAD_URL} + chunk1_response = {'status': http_wrapper.RESUME_INCOMPLETE, + 'range': 'bytes 0-4'} + chunk2_response = {'status': OK} + # Need valid JSON on last response, since resumable. + connection = _Connection( + (loc_response, b''), + (chunk1_response, b''), + (chunk2_response, b'{}'), + ) + client = _Client(connection) + bucket = _Bucket(client) + blob = self._makeOne(BLOB_NAME, bucket=bucket) + blob._CHUNK_SIZE_MULTIPLE = 1 + blob.chunk_size = 5 + + from gcloud.streaming.test_transfer import _Stream + file_obj = _Stream(DATA) + + # Mock stream closes at end of data, like a socket might + def is_stream_closed(stream): + if stream.tell() < len(DATA): + return stream._closed + else: + return stream.close() or True + + _Stream.closed = property(is_stream_closed) + + def fileno_mock(): + from io import UnsupportedOperation + raise UnsupportedOperation() + + file_obj.fileno = fileno_mock + + blob.upload_from_file(file_obj) + + # Remove the temp property + delattr(_Stream, "closed") + + rq = connection.http._requested + self.assertEqual(len(rq), 3) + + # Requested[0] + headers = dict( + [(x.title(), str(y)) for x, y in rq[0].pop('headers').items()]) + self.assertEqual(headers['Content-Length'], '0') + self.assertEqual(headers['X-Upload-Content-Type'], + 'application/octet-stream') + + uri = rq[0].pop('uri') + scheme, netloc, path, qs, _ = urlsplit(uri) + self.assertEqual(scheme, 'http') + self.assertEqual(netloc, 'example.com') + self.assertEqual(path, '/b/name/o') + self.assertEqual(dict(parse_qsl(qs)), + {'uploadType': 'resumable', 'name': BLOB_NAME}) + self.assertEqual(rq[0], { + 'method': 'POST', + 'body': '', + 'connection_type': None, + 'redirections': 5, + }) + + # Requested[1] + headers = dict( + [(x.title(), str(y)) for x, y in rq[1].pop('headers').items()]) + self.assertEqual(headers['Content-Range'], 'bytes 0-4/*') + self.assertEqual(rq[1], { + 'method': 'PUT', + 'uri': UPLOAD_URL, + 'body': DATA[:5], + 'connection_type': None, + 'redirections': 5, + }) + + # Requested[2] + headers = dict( + [(x.title(), str(y)) for x, y in rq[2].pop('headers').items()]) + self.assertEqual(headers['Content-Range'], 'bytes */5') + self.assertEqual(rq[2], { + 'method': 'PUT', + 'uri': UPLOAD_URL, + 'body': DATA[5:], + 'connection_type': None, + 'redirections': 5, + }) + def test_upload_from_file_simple(self): self._upload_from_file_simple_test_helper( expected_content_type='application/octet-stream') diff --git a/gcloud/streaming/buffered_stream.py b/gcloud/streaming/buffered_stream.py index 6ce9299e701d..bf5dc66550d4 100644 --- a/gcloud/streaming/buffered_stream.py +++ b/gcloud/streaming/buffered_stream.py @@ -20,7 +20,12 @@ def __init__(self, stream, start, size): self._stream = stream self._start_pos = start self._buffer_pos = 0 - self._buffered_data = self._stream.read(size) + + if not hasattr(self._stream, 'closed') or not self._stream.closed: + self._buffered_data = self._stream.read(size) + else: + self._buffered_data = b'' + self._stream_at_end = len(self._buffered_data) < size self._end_pos = self._start_pos + len(self._buffered_data) diff --git a/gcloud/streaming/transfer.py b/gcloud/streaming/transfer.py index 4c5b2a7a23f8..7ef439d67f36 100644 --- a/gcloud/streaming/transfer.py +++ b/gcloud/streaming/transfer.py @@ -1038,14 +1038,16 @@ def stream_file(self, use_chunks=True): 'Failed to transfer all bytes in chunk, upload paused at ' 'byte %d' % self.progress) if self.complete and hasattr(self.stream, 'seek'): - current_pos = self.stream.tell() - self.stream.seek(0, os.SEEK_END) - end_pos = self.stream.tell() - self.stream.seek(current_pos) - if current_pos != end_pos: - raise TransferInvalidError( - 'Upload complete with %s additional bytes left in stream' % - (int(end_pos) - int(current_pos))) + if not hasattr(self.stream, 'seekable') or self.stream.seekable(): + current_pos = self.stream.tell() + self.stream.seek(0, os.SEEK_END) + end_pos = self.stream.tell() + self.stream.seek(current_pos) + if current_pos != end_pos: + raise TransferInvalidError( + 'Upload complete with %s ' + 'additional bytes left in stream' % + (int(end_pos) - int(current_pos))) return response def _send_media_request(self, request, end):