diff --git a/gcloud/storage/batch.py b/gcloud/storage/batch.py new file mode 100644 index 000000000000..e43c16892a6e --- /dev/null +++ b/gcloud/storage/batch.py @@ -0,0 +1,185 @@ +# Copyright 2014 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Batch updates / deletes of storage buckets / blobs. + +See: https://cloud.google.com/storage/docs/json_api/v1/how-tos/batch +""" +from email.encoders import encode_noop +from email.generator import Generator +from email.mime.application import MIMEApplication +from email.mime.multipart import MIMEMultipart +from email.parser import Parser +import io +import json + +import six + +from gcloud._localstack import _LocalStack +from gcloud.storage.connection import Connection + + +_BATCHES = _LocalStack() + + +class MIMEApplicationHTTP(MIMEApplication): + """MIME type for ``application/http``. + + Constructs payload from headers and body + + :type headers: dict + :param headers: HTTP headers + + :type body: text or None + :param body: HTTP payload + """ + def __init__(self, method, uri, headers, body): + if isinstance(body, dict): + body = json.dumps(body) + headers['Content-Type'] = 'application/json' + headers['Content-Length'] = len(body) + if body is None: + body = '' + lines = ['%s %s HTTP/1.1' % (method, uri)] + lines.extend(['%s: %s' % (key, value) + for key, value in sorted(headers.items())]) + lines.append('') + lines.append(body) + payload = '\r\n'.join(lines) + if six.PY2: # pragma: NO COVER Python2 + # Sigh. email.message.Message is an old-style class, so we + # cannot use 'super()'. + MIMEApplication.__init__(self, payload, 'http', encode_noop) + else: # pragma: NO COVER Python3 + super_init = super(MIMEApplicationHTTP, self).__init__ + super_init(payload, 'http', encode_noop) + + +class NoContent(object): + """Emulate an HTTP '204 No Content' response.""" + status = 204 + + +class Batch(Connection): + """Proxy an underlying connection, batching up change operations. + + :type connection: :class:`gcloud.storage.connection.Connection` + :param connection: the connection for which the batch proxies. + """ + _MAX_BATCH_SIZE = 1000 + + def __init__(self, connection): + super(Batch, self).__init__(project=connection.project) + self._connection = connection + self._requests = [] + self._responses = [] + + def _do_request(self, method, url, headers, data): + """Override Connection: defer actual HTTP request. + + Only allow up to ``_MAX_BATCH_SIZE`` requests to be deferred. + + :type method: string + :param method: The HTTP method to use in the request. + + :type url: string + :param url: The URL to send the request to. + + :type headers: dict + :param headers: A dictionary of HTTP headers to send with the request. + + :type data: string + :param data: The data to send as the body of the request. + + :rtype: tuple of ``response`` (a dictionary of sorts) + and ``content`` (a string). + :returns: The HTTP response object and the content of the response. + """ + if method == 'GET': + _req = self._connection.http.request + return _req(method=method, uri=url, headers=headers, body=data) + + if len(self._requests) >= self._MAX_BATCH_SIZE: + raise ValueError("Too many deferred requests (max %d)" % + self._MAX_BATCH_SIZE) + self._requests.append((method, url, headers, data)) + return NoContent(), '' + + def finish(self): + """Submit a single `multipart/mixed` request w/ deferred requests. + + :rtype: list of tuples + :returns: one ``(status, reason, payload)`` tuple per deferred request. + :raises: ValueError if no requests have been deferred. + """ + if len(self._requests) == 0: + raise ValueError("No deferred requests") + + multi = MIMEMultipart() + + for method, uri, headers, body in self._requests: + subrequest = MIMEApplicationHTTP(method, uri, headers, body) + multi.attach(subrequest) + + # The `email` package expects to deal with "native" strings + if six.PY3: # pragma: NO COVER Python3 + buf = io.StringIO() + else: # pragma: NO COVER Python2 + buf = io.BytesIO() + generator = Generator(buf, False, 0) + generator.flatten(multi) + payload = buf.getvalue() + + # Strip off redundant header text + _, body = payload.split('\n\n', 1) + headers = dict(multi._headers) + + url = '%s/batch' % self.API_BASE_URL + + _req = self._connection._make_request + response, content = _req('POST', url, data=body, headers=headers) + self._responses = list(_unpack_batch_response(response, content)) + return self._responses + + def __enter__(self): + _BATCHES.push(self) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + try: + if exc_type is None: + self.finish() + finally: + _BATCHES.pop() + + +def _unpack_batch_response(response, content): + """Convert response, content -> [(status, reason, payload)].""" + parser = Parser() + faux_message = ('Content-Type: %s\nMIME-Version: 1.0\n\n%s' % + (response['content-type'], content)) + + message = parser.parsestr(faux_message) + + if not isinstance(message._payload, list): + raise ValueError('Bad response: not multi-part') + + for subrequest in message._payload: + status_line, rest = subrequest._payload.split('\n', 1) + _, status, reason = status_line.split(' ', 2) + message = parser.parsestr(rest) + payload = message._payload + ctype = message['Content-Type'] + if ctype and ctype.startswith('application/json'): + payload = json.loads(payload) + yield status, reason, payload diff --git a/gcloud/storage/connection.py b/gcloud/storage/connection.py index fabb1edfb94d..d00b37b8d24e 100644 --- a/gcloud/storage/connection.py +++ b/gcloud/storage/connection.py @@ -149,7 +149,8 @@ def _make_request(self, method, url, data=None, content_type=None, :rtype: tuple of ``response`` (a dictionary of sorts) and ``content`` (a string). - :returns: The HTTP response object and the content of the response. + :returns: The HTTP response object and the content of the response, + returned by :meth:`_do_request`. """ headers = headers or {} headers['Accept-Encoding'] = 'gzip' @@ -166,6 +167,30 @@ def _make_request(self, method, url, data=None, content_type=None, headers['User-Agent'] = self.USER_AGENT + return self._do_request(method, url, headers, data) + + def _do_request(self, method, url, headers, data): + """Low-level helper: perform the actual API request over HTTP. + + Allows :class:`gcloud.storage.batch.Batch` to override, deferring + the request. + + :type method: string + :param method: The HTTP method to use in the request. + + :type url: string + :param url: The URL to send the request to. + + :type headers: dict + :param headers: A dictionary of HTTP headers to send with the request. + + :type data: string + :param data: The data to send as the body of the request. + + :rtype: tuple of ``response`` (a dictionary of sorts) + and ``content`` (a string). + :returns: The HTTP response object and the content of the response. + """ return self.http.request(uri=url, method=method, headers=headers, body=data) diff --git a/gcloud/storage/test_batch.py b/gcloud/storage/test_batch.py new file mode 100644 index 000000000000..d671373649b7 --- /dev/null +++ b/gcloud/storage/test_batch.py @@ -0,0 +1,415 @@ +# Copyright 2014 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest2 + + +class TestMIMEApplicationHTTP(unittest2.TestCase): + + def _getTargetClass(self): + from gcloud.storage.batch import MIMEApplicationHTTP + return MIMEApplicationHTTP + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_ctor_body_None(self): + METHOD = 'DELETE' + PATH = '/path/to/api' + LINES = [ + "DELETE /path/to/api HTTP/1.1", + "", + ] + mah = self._makeOne(METHOD, PATH, {}, None) + self.assertEqual(mah.get_content_type(), 'application/http') + self.assertEqual(mah.get_payload().splitlines(), LINES) + + def test_ctor_body_str(self): + METHOD = 'GET' + PATH = '/path/to/api' + BODY = 'ABC' + HEADERS = {'Content-Length': len(BODY), 'Content-Type': 'text/plain'} + LINES = [ + "GET /path/to/api HTTP/1.1", + "Content-Length: 3", + "Content-Type: text/plain", + "", + "ABC", + ] + mah = self._makeOne(METHOD, PATH, HEADERS, BODY) + self.assertEqual(mah.get_payload().splitlines(), LINES) + + def test_ctor_body_dict(self): + METHOD = 'GET' + PATH = '/path/to/api' + BODY = {'foo': 'bar'} + HEADERS = {} + LINES = [ + 'GET /path/to/api HTTP/1.1', + 'Content-Length: 14', + 'Content-Type: application/json', + '', + '{"foo": "bar"}', + ] + mah = self._makeOne(METHOD, PATH, HEADERS, BODY) + self.assertEqual(mah.get_payload().splitlines(), LINES) + + +class TestBatch(unittest2.TestCase): + + def _getTargetClass(self): + from gcloud.storage.batch import Batch + return Batch + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_ctor_w_explicit_connection(self): + http = _HTTP() + connection = _Connection(http=http) + batch = self._makeOne(connection) + self.assertTrue(batch._connection is connection) + self.assertEqual(batch.project, connection.project) + self.assertEqual(len(batch._requests), 0) + self.assertEqual(len(batch._responses), 0) + + def test__make_request_GET_forwarded_to_connection(self): + URL = 'http://example.com/api' + expected = _Response() + http = _HTTP((expected, '')) + connection = _Connection(http=http) + batch = self._makeOne(connection) + response, content = batch._make_request('GET', URL) + self.assertTrue(response is expected) + self.assertEqual(content, '') + EXPECTED_HEADERS = [ + ('Accept-Encoding', 'gzip'), + ('Content-Length', 0), + ] + self.assertEqual(len(http._requests), 1) + self.assertEqual(http._requests[0][0], 'GET') + self.assertEqual(http._requests[0][1], URL) + headers = http._requests[0][2] + for key, value in EXPECTED_HEADERS: + self.assertEqual(headers[key], value) + self.assertEqual(http._requests[0][3], None) + self.assertEqual(batch._requests, []) + + def test__make_request_POST_normal(self): + URL = 'http://example.com/api' + http = _HTTP() # no requests expected + connection = _Connection(http=http) + batch = self._makeOne(connection) + response, content = batch._make_request('POST', URL, data={'foo': 1}) + self.assertEqual(response.status, 204) + self.assertEqual(content, '') + self.assertEqual(http._requests, []) + EXPECTED_HEADERS = [ + ('Accept-Encoding', 'gzip'), + ('Content-Length', 10), + ] + self.assertEqual(len(batch._requests), 1) + self.assertEqual(batch._requests[0][0], 'POST') + self.assertEqual(batch._requests[0][1], URL) + headers = batch._requests[0][2] + for key, value in EXPECTED_HEADERS: + self.assertEqual(headers[key], value) + self.assertEqual(batch._requests[0][3], {'foo': 1}) + + def test__make_request_PATCH_normal(self): + URL = 'http://example.com/api' + http = _HTTP() # no requests expected + connection = _Connection(http=http) + batch = self._makeOne(connection) + response, content = batch._make_request('PATCH', URL, data={'foo': 1}) + self.assertEqual(response.status, 204) + self.assertEqual(content, '') + self.assertEqual(http._requests, []) + EXPECTED_HEADERS = [ + ('Accept-Encoding', 'gzip'), + ('Content-Length', 10), + ] + self.assertEqual(len(batch._requests), 1) + self.assertEqual(batch._requests[0][0], 'PATCH') + self.assertEqual(batch._requests[0][1], URL) + headers = batch._requests[0][2] + for key, value in EXPECTED_HEADERS: + self.assertEqual(headers[key], value) + self.assertEqual(batch._requests[0][3], {'foo': 1}) + + def test__make_request_DELETE_normal(self): + URL = 'http://example.com/api' + http = _HTTP() # no requests expected + connection = _Connection(http=http) + batch = self._makeOne(connection) + response, content = batch._make_request('DELETE', URL) + self.assertEqual(response.status, 204) + self.assertEqual(content, '') + self.assertEqual(http._requests, []) + EXPECTED_HEADERS = [ + ('Accept-Encoding', 'gzip'), + ('Content-Length', 0), + ] + self.assertEqual(len(batch._requests), 1) + self.assertEqual(batch._requests[0][0], 'DELETE') + self.assertEqual(batch._requests[0][1], URL) + headers = batch._requests[0][2] + for key, value in EXPECTED_HEADERS: + self.assertEqual(headers[key], value) + self.assertEqual(batch._requests[0][3], None) + + def test__make_request_POST_too_many_requests(self): + URL = 'http://example.com/api' + http = _HTTP() # no requests expected + connection = _Connection(http=http) + batch = self._makeOne(connection) + batch._MAX_BATCH_SIZE = 1 + batch._requests.append(('POST', URL, {}, {'bar': 2})) + self.assertRaises(ValueError, + batch._make_request, 'POST', URL, data={'foo': 1}) + self.assertTrue(connection.http is http) + + def test_finish_empty(self): + http = _HTTP() # no requests expected + connection = _Connection(http=http) + batch = self._makeOne(connection) + self.assertRaises(ValueError, batch.finish) + self.assertTrue(connection.http is http) + + def _check_subrequest_no_payload(self, chunk, method, url): + lines = chunk.splitlines() + # blank + 2 headers + blank + request + blank + blank + self.assertEqual(len(lines), 7) + self.assertEqual(lines[0], '') + self.assertEqual(lines[1], 'Content-Type: application/http') + self.assertEqual(lines[2], 'MIME-Version: 1.0') + self.assertEqual(lines[3], '') + self.assertEqual(lines[4], '%s %s HTTP/1.1' % (method, url)) + self.assertEqual(lines[5], '') + self.assertEqual(lines[6], '') + + def _check_subrequest_payload(self, chunk, method, url, payload): + import json + lines = chunk.splitlines() + # blank + 2 headers + blank + request + 2 headers + blank + body + payload_str = json.dumps(payload) + self.assertEqual(len(lines), 9) + self.assertEqual(lines[0], '') + self.assertEqual(lines[1], 'Content-Type: application/http') + self.assertEqual(lines[2], 'MIME-Version: 1.0') + self.assertEqual(lines[3], '') + self.assertEqual(lines[4], '%s %s HTTP/1.1' % (method, url)) + self.assertEqual(lines[5], 'Content-Length: %d' % len(payload_str)) + self.assertEqual(lines[6], 'Content-Type: application/json') + self.assertEqual(lines[7], '') + self.assertEqual(json.loads(lines[8]), payload) + + def test_finish_nonempty(self): + URL = 'http://api.example.com/other_api' + expected = _Response() + expected['content-type'] = 'multipart/mixed; boundary="DEADBEEF="' + http = _HTTP((expected, _THREE_PART_MIME_RESPONSE)) + connection = _Connection(http=http) + batch = self._makeOne(connection) + batch.API_BASE_URL = 'http://api.example.com' + batch._requests.append(('POST', URL, {}, {'foo': 1, 'bar': 2})) + batch._requests.append(('PATCH', URL, {}, {'bar': 3})) + batch._requests.append(('DELETE', URL, {}, None)) + result = batch.finish() + self.assertEqual(len(result), len(batch._requests)) + self.assertEqual(result[0][0], '200') + self.assertEqual(result[0][1], 'OK') + self.assertEqual(result[0][2], {'foo': 1, 'bar': 2}) + self.assertEqual(result[1][0], '200') + self.assertEqual(result[1][1], 'OK') + self.assertEqual(result[1][2], {'foo': 1, 'bar': 3}) + self.assertEqual(result[2][0], '204') + self.assertEqual(result[2][1], 'No Content') + self.assertEqual(result[2][2], '') + self.assertEqual(len(http._requests), 1) + method, uri, headers, body = http._requests[0] + self.assertEqual(method, 'POST') + self.assertEqual(uri, 'http://api.example.com/batch') + self.assertEqual(len(headers), 2) + ctype, boundary = [x.strip() + for x in headers['Content-Type'].split(';')] + self.assertEqual(ctype, 'multipart/mixed') + self.assertTrue(boundary.startswith('boundary="==')) + self.assertTrue(boundary.endswith('=="')) + self.assertEqual(headers['MIME-Version'], '1.0') + + divider = '--' + boundary[len('boundary="'):-1] + chunks = body.split(divider)[1:-1] # discard prolog / epilog + self.assertEqual(len(chunks), 3) + + self._check_subrequest_payload(chunks[0], 'POST', URL, + {'foo': 1, 'bar': 2}) + + self._check_subrequest_payload(chunks[1], 'PATCH', URL, {'bar': 3}) + + self._check_subrequest_no_payload(chunks[2], 'DELETE', URL) + + def test_finish_nonempty_non_multipart_response(self): + URL = 'http://api.example.com/other_api' + expected = _Response() + expected['content-type'] = 'text/plain' + http = _HTTP((expected, 'NOT A MIME_RESPONSE')) + connection = _Connection(http=http) + batch = self._makeOne(connection) + batch._requests.append(('POST', URL, {}, {'foo': 1, 'bar': 2})) + batch._requests.append(('PATCH', URL, {}, {'bar': 3})) + batch._requests.append(('DELETE', URL, {}, None)) + self.assertRaises(ValueError, batch.finish) + + def test_as_context_mgr_wo_error(self): + from gcloud.storage.batch import _BATCHES + URL = 'http://example.com/api' + expected = _Response() + expected['content-type'] = 'multipart/mixed; boundary="DEADBEEF="' + http = _HTTP((expected, _THREE_PART_MIME_RESPONSE)) + connection = _Connection(http=http) + + self.assertEqual(list(_BATCHES), []) + + with self._makeOne(connection) as batch: + self.assertEqual(list(_BATCHES), [batch]) + batch._make_request('POST', URL, {'foo': 1, 'bar': 2}) + batch._make_request('PATCH', URL, {'bar': 3}) + batch._make_request('DELETE', URL) + + self.assertEqual(list(_BATCHES), []) + self.assertEqual(len(batch._requests), 3) + self.assertEqual(batch._requests[0][0], 'POST') + self.assertEqual(batch._requests[1][0], 'PATCH') + self.assertEqual(batch._requests[2][0], 'DELETE') + self.assertEqual(len(batch._responses), 3) + self.assertEqual( + batch._responses[0], + ('200', 'OK', {'foo': 1, 'bar': 2})) + self.assertEqual( + batch._responses[1], + ('200', 'OK', {'foo': 1, 'bar': 3})) + self.assertEqual( + batch._responses[2], + ('204', 'No Content', '')) + + def test_as_context_mgr_w_error(self): + from gcloud.storage.batch import _BATCHES + URL = 'http://example.com/api' + http = _HTTP() + connection = _Connection(http=http) + + self.assertEqual(list(_BATCHES), []) + + try: + with self._makeOne(connection) as batch: + self.assertEqual(list(_BATCHES), [batch]) + batch._make_request('POST', URL, {'foo': 1, 'bar': 2}) + batch._make_request('PATCH', URL, {'bar': 3}) + batch._make_request('DELETE', URL) + raise ValueError() + except ValueError: + pass + + self.assertEqual(list(_BATCHES), []) + self.assertEqual(len(http._requests), 0) + self.assertEqual(len(batch._requests), 3) + self.assertEqual(len(batch._responses), 0) + + +_THREE_PART_MIME_RESPONSE = """\ +--DEADBEEF= +Content-Type: application/http +Content-ID: + +HTTP/1.1 200 OK +Content-Type: application/json; charset=UTF-8 +Content-Length: 20 + +{"foo": 1, "bar": 2} + +--DEADBEEF= +Content-Type: application/http +Content-ID: + +HTTP/1.1 200 OK +Content-Type: application/json; charset=UTF-8 +Content-Length: 20 + +{"foo": 1, "bar": 3} + +--DEADBEEF= +Content-Type: application/http +Content-ID: + +HTTP/1.1 204 No Content +Content-Length: 0 + +--DEADBEEF=-- +""" + + +class _Connection(object): + + project = 'TESTING' + + def __init__(self, **kw): + self.__dict__.update(kw) + + def build_api_url(self, path, **_): # pragma: NO COVER + return 'http://api.example.com%s' % path + + def _make_request(self, method, url, data=None, content_type=None, + headers=None): + if content_type is not None: # pragma: NO COVER + headers['Content-Type'] = content_type + + return self.http.request(method, uri=url, headers=headers, body=data) + + def api_request(self, method, path, query_params=None, + data=None, content_type=None, + api_base_url=None, api_version=None, + expect_json=True): # pragma: NO COVER + pass + + def get_all_buckets(self): # pragma: NO COVER + pass + + def get_bucket(self, name): # pragma: NO COVER + pass + + def create_bucket(self, name): # pragma: NO COVER + pass + + def delete_bucket(self, name): # pragma: NO COVER + pass + + +class _Response(dict): + + def __init__(self, status=200, **kw): + self.status = status + super(_Response, self).__init__(**kw) + + +class _HTTP(object): + + def __init__(self, *responses): + self._requests = [] + self._responses = list(responses) + + def request(self, method, uri, headers, body): + self._requests.append((method, uri, headers, body)) + response, self._responses = self._responses[0], self._responses[1:] + return response diff --git a/regression/storage.py b/regression/storage.py index 342f240761cb..397f11a6da08 100644 --- a/regression/storage.py +++ b/regression/storage.py @@ -21,6 +21,7 @@ from gcloud import storage from gcloud.storage._helpers import _base64_md5hash from gcloud.storage import _implicit_environ +from gcloud.storage.batch import Batch HTTP = httplib2.Http() @@ -52,15 +53,16 @@ def setUp(self): self.case_buckets_to_delete = [] def tearDown(self): - for bucket in self.case_buckets_to_delete: - bucket.delete() + with Batch(CONNECTION) as batch: + for bucket_name in self.case_buckets_to_delete: + batch.get_bucket(bucket_name).delete() def test_create_bucket(self): new_bucket_name = 'a-new-bucket' self.assertRaises(exceptions.NotFound, CONNECTION.get_bucket, new_bucket_name) created = CONNECTION.create_bucket(new_bucket_name) - self.case_buckets_to_delete.append(created) + self.case_buckets_to_delete.append(new_bucket_name) self.assertEqual(created.name, new_bucket_name) def test_get_buckets(self): @@ -72,7 +74,7 @@ def test_get_buckets(self): created_buckets = [] for bucket_name in buckets_to_create: bucket = CONNECTION.create_bucket(bucket_name) - self.case_buckets_to_delete.append(bucket) + self.case_buckets_to_delete.append(bucket_name) # Retrieve the buckets. all_buckets = CONNECTION.get_all_buckets()