Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
cf68d0e
Serialize HTTPRequest
lmazuel Sep 5, 2019
b236e90
Basic deserialize Http Response
lmazuel Sep 5, 2019
4985fa6
http.client response transport
lmazuel Sep 5, 2019
79078a6
Multipart helper - base version
lmazuel Sep 6, 2019
5f81289
Working full mock scenario
lmazuel Sep 6, 2019
61be567
Parse multipart response based on content-type
lmazuel Sep 17, 2019
9bb1c95
Adding BOM tests
lmazuel Sep 18, 2019
edf8ac2
Improve a bit multipart generation
lmazuel Sep 18, 2019
3e0cf8d
Refactor prepare_multipart_mixed
lmazuel Sep 19, 2019
f12856c
Core cleaning
lmazuel Sep 20, 2019
d765053
Fix tests
lmazuel Sep 20, 2019
8fc8ae4
Case insensitive dict fix
lmazuel Sep 20, 2019
358208d
Simplify code
lmazuel Sep 20, 2019
6eb0f12
pylint
lmazuel Sep 20, 2019
1b35a9e
mypy
lmazuel Sep 20, 2019
ce4e8c2
Merge remote-tracking branch 'origin/master' into features/http_seria…
lmazuel Sep 23, 2019
1a6fe94
Python 2.7 improvement
lmazuel Sep 23, 2019
1c024d3
Python 2.7 improvement, part2
lmazuel Sep 24, 2019
98e744d
Remove unecessary test
lmazuel Sep 24, 2019
4abc62f
Refactor parts
lmazuel Sep 30, 2019
c7e1788
Recursive multipart
lmazuel Sep 30, 2019
605d987
No multipart serialization on 2.7
lmazuel Sep 30, 2019
d82e737
pylint
lmazuel Sep 30, 2019
2b7c844
mypy
lmazuel Sep 30, 2019
9364cd5
Fix test for 3.5
lmazuel Sep 30, 2019
d0d4e5b
Adding six as dep for azure-core
lmazuel Sep 30, 2019
2a8cabd
Make analyze job happy
lmazuel Sep 30, 2019
f0d54c3
Skip test that assumes dict ordering for consistency in 3.5
lmazuel Oct 1, 2019
15af826
Merge remote-tracking branch 'origin/master' into features/http_seria…
lmazuel Oct 1, 2019
4be0c71
Accept async on_response on async scenarios
lmazuel Oct 1, 2019
50b68ef
Async on_request support
lmazuel Oct 2, 2019
b1046f3
Complete support of async sansio in multipart with pipeline tests
lmazuel Oct 2, 2019
b22f6e1
Fix mock naming
lmazuel Oct 2, 2019
0d20e4d
pylint
lmazuel Oct 2, 2019
a97ce1b
ChangeLog / Readme
lmazuel Oct 2, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/core/azure-core/azure/core/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def run(self, request, **kwargs):
:return: The PipelineResponse object
:rtype: ~azure.core.pipeline.PipelineResponse
"""
request.prepare_multipart_mixed()
context = PipelineContext(self._transport, **kwargs)
pipeline_request = PipelineRequest(request, context) # type: PipelineRequest
first_node = self._impl_policies[0] if self._impl_policies else _TransportRunner(self._transport)
Expand Down
3 changes: 2 additions & 1 deletion sdk/core/azure-core/azure/core/pipeline/base_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,15 @@ async def __aenter__(self) -> 'AsyncPipeline':
async def __aexit__(self, *exc_details): # pylint: disable=arguments-differ
await self._transport.__aexit__(*exc_details)

async def run(self, request: PipelineRequest, **kwargs: Any):
async def run(self, request: HTTPRequestType, **kwargs: Any):
"""Runs the HTTP Request through the chained policies.

:param request: The HTTP request object.
:type request: ~azure.core.pipeline.transport.HttpRequest
:return: The PipelineResponse object.
:rtype: ~azure.core.pipeline.PipelineResponse
"""
request.prepare_multipart_mixed()
context = PipelineContext(self._transport, **kwargs)
pipeline_request = PipelineRequest(request, context)
first_node = self._impl_policies[0] if self._impl_policies else _AsyncTransportRunner(self._transport)
Expand Down
262 changes: 254 additions & 8 deletions sdk/core/azure-core/azure/core/pipeline/transport/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@
# --------------------------------------------------------------------------
from __future__ import absolute_import
import abc
from email.message import Message
from email.policy import HTTP
try:
from email import message_from_bytes as message_parser
except ImportError: # 2.7
from email import message_from_string as message_parser # type: ignore
from io import BytesIO
import json
import logging
import os
Expand All @@ -38,23 +45,47 @@
import xml.etree.ElementTree as ET

from typing import (TYPE_CHECKING, Generic, TypeVar, cast, IO, List, Union, Any, Mapping, Dict, # pylint: disable=unused-import
Optional, Tuple, Callable, Iterator)
Optional, Tuple, Callable, Iterator, Iterable)

from six.moves.http_client import HTTPConnection, HTTPResponse as _HTTPResponse

# This file is NOT using any "requests" HTTP implementation
# However, the CaseInsensitiveDict is handy.
# If one day we reach the point where "requests" can be skip totally,
# might provide our own implementation
from requests.structures import CaseInsensitiveDict
from azure.core.pipeline import ABC, AbstractContextManager, PipelineRequest, PipelineResponse
from azure.core.pipeline import ABC, AbstractContextManager, PipelineRequest, PipelineResponse, PipelineContext


if TYPE_CHECKING:
from ..policies import SansIOHTTPPolicy

HTTPResponseType = TypeVar("HTTPResponseType")
HTTPRequestType = TypeVar("HTTPRequestType")
PipelineType = TypeVar("PipelineType")

_LOGGER = logging.getLogger(__name__)


def _case_insensitive_dict(*args, **kwargs):
"""Return a case-insensitive dict from a structure that a dict would have accepted.

Rational is I don't want to re-implement this, but I don't want
to assume "requests" or "aiohttp" are installed either.
So I use the one from "requests" or the one from "aiohttp" ("multidict")
If one day this library is used in an HTTP context without "requests" nor "aiohttp" installed,
we can add "multidict" as a dependency or re-implement our own.
"""
try:
from requests.structures import CaseInsensitiveDict
return CaseInsensitiveDict(*args, **kwargs)
except ImportError:
pass
try:
# multidict is installed by aiohttp
from multidict import CIMultiDict
return CIMultiDict(*args, **kwargs)
except ImportError:
raise ValueError(
"Neither 'requests' or 'multidict' are installed and no case-insensitive dict impl have been found"
)


def _format_url_section(template, **kwargs):
components = template.split("/")
while components:
Expand All @@ -81,6 +112,33 @@ def _urljoin(base_url, stub_url):
return parsed.geturl()


class _HTTPSerializer(HTTPConnection):
"""Hacking the stdlib HTTPConnection to serialize HTTP request as strings.
"""
def __init__(self, *args, **kwargs):
self.buffer = b''
kwargs.setdefault("host", "fakehost")
super(_HTTPSerializer, self).__init__(*args, **kwargs)

def putheader(self, header, *values):
if header in ["Host", "Accept-Encoding"]:
return
super(_HTTPSerializer, self).putheader(header, *values)

def send(self, data):
self.buffer += data

def _serialize_request(http_request):
serializer = _HTTPSerializer()
serializer.request(
method=http_request.method,
url=http_request.url,
body=http_request.body,
headers=http_request.headers
)
return serializer.buffer


class HttpTransport(AbstractContextManager, ABC, Generic[HTTPRequestType, HTTPResponseType]): # type: ignore
"""An http sender ABC.
"""
Expand Down Expand Up @@ -119,9 +177,10 @@ def __init__(self, method, url, headers=None, files=None, data=None):
# type: (str, str, Mapping[str, str], Any, Any) -> None
self.method = method
self.url = url
self.headers = CaseInsensitiveDict(headers)
self.headers = _case_insensitive_dict(headers)
self.files = files
self.data = data
self.multipart_mixed_info = None # type: Optional[Tuple]

def __repr__(self):
return '<HttpRequest [%s]>' % (self.method)
Expand Down Expand Up @@ -245,6 +304,38 @@ def set_bytes_body(self, data):
self.data = data
self.files = None

def set_multipart_mixed(self, *requests, **kwargs):
# type: (HttpRequest, Any) -> None
"""Set the part of a multipart/mixed.

Only support args for now are HttpRequest objects.

kwargs:
- policies: SansIOPolicy to apply at preparation time

:param requests: HttpRequests object
"""
self.multipart_mixed_info = (
requests,
kwargs.pop("policies", [])
)

def prepare_multipart_mixed(self):
# type: () -> None
"""Will prepare the body of this request according to the multipart information.

Does nothing if "set_multipart_mixed" was never called.
"""
if self.multipart_mixed_info:
multipart_helper = MultiPartHelper(self)
multipart_helper.prepare_request()

def serialize(self):
# type: () -> bytes
"""Serialize this request using application/http spec.
"""
return _serialize_request(self)


class _HttpResponseBase(object):
"""Represent a HTTP response.
Expand Down Expand Up @@ -287,6 +378,19 @@ def text(self, encoding=None):
"""
return self.body().decode(encoding or "utf-8")

def parts(self):
# type: () -> Iterable
"""Assuming the content-type is multipart/mixed, will return the parts as an iterable.

:rtype: list
:raises ValueError: If the content is not multipart/mixed
"""
if not self.content_type or not self.content_type.startswith("multipart/mixed"):
raise ValueError("You can't get parts if the response is nit multipart/mixed")

multipart_helper = MultiPartHelper(self.request)
return multipart_helper.parse_response(self)


class HttpResponse(_HttpResponseBase):
def stream_download(self, pipeline):
Expand All @@ -298,6 +402,51 @@ def stream_download(self, pipeline):
"""


class HttpClientTransportResponse(HttpResponse):
"""Create a HTTPResponse from an http.client response.

Body will NOT be read by the constructor. Call "body()" to load the body in memory if necessary.

:param HttpRequest request: The request.
:param httpclient_response: The object returned from an HTTP(S)Connection from http.client
"""
def __init__(self, request, httpclient_response):
super(HttpClientTransportResponse, self).__init__(request, httpclient_response)
self.status_code = httpclient_response.status
self.headers = _case_insensitive_dict(httpclient_response.getheaders())
self.reason = httpclient_response.reason
self.content_type = self.headers.get('Content-Type')
self.data = None

def body(self):
if self.data is None:
self.data = self.internal_response.read()
return self.data


class BytesIOSocket(object):
"""Mocking the "makefile" of socket for HTTPResponse.

This can be used to create a http.client.HTTPResponse object
based on bytes and not a real socket.
"""
def __init__(self, bytes_data):
self.bytes_data = bytes_data

def makefile(self, _):
return BytesIO(self.bytes_data)


def _deserialize_response(http_response_as_bytes, http_request):
local_socket = BytesIOSocket(http_response_as_bytes)
response = _HTTPResponse(
local_socket,
method=http_request.method
)
response.begin()
return HttpClientTransportResponse(http_request, response)


class PipelineClientBase(object):
"""Base class for pipeline clients.

Expand Down Expand Up @@ -501,3 +650,100 @@ def merge(self, url, params=None, headers=None, content=None, form_content=None)
"""
request = self._request('MERGE', url, params, headers, content, form_content, None)
return request


class MultiPartHelper(object):
def __init__(
self,
main_request, # type: HttpRequest
boundary=None, # type: str
):
"""Create a multipart helper to serialize and parse multipart/mixed payload.

boundary is optional, and one will be generate if you don't provide one.
Note that no verification are made on the boundary, this is considered advanced
enough so you know how to respect RFC1341 7.2.1 and provide a correct boundary.

:param HttpRequest main_request: The request.
:param str boundary: Optional boundary
"""
self.main_request = main_request
if self.main_request.multipart_mixed_info is None:
raise ValueError("This request doesn't have multipart information available")
self.requests = self.main_request.multipart_mixed_info[0] # type: List[HttpRequest]
self.policies = self.main_request.multipart_mixed_info[1] # type: List[SansIOHTTPPolicy]
self._boundary = boundary

def prepare_request(self):
# Apply on_requests concurrently to all requests
import concurrent.futures

def prepare_requests(request):
context = PipelineContext(None)
pipeline_request = PipelineRequest(request, context)
for policy in self.policies:
policy.on_request(pipeline_request)

with concurrent.futures.ThreadPoolExecutor() as executor:
# List comprehension to raise exceptions if happened
[_ for _ in executor.map(prepare_requests, self.requests)] # pylint: disable=expression-not-assigned

# Update the main request with the body
main_message = Message()
main_message.add_header("Content-Type", "multipart/mixed")
if self._boundary:
main_message.set_boundary(self._boundary)
for i, req in enumerate(self.requests):
part_message = Message()
part_message.add_header('Content-Type', 'application/http')
part_message.add_header('Content-Transfer-Encoding', 'binary')
part_message.add_header('Content-ID', str(i))
part_message.set_payload(req.serialize())
main_message.attach(part_message)

full_message = main_message.as_bytes(policy=HTTP)
_, _, body = full_message.split(b'\r\n', maxsplit=2)
self.main_request.set_bytes_body(body)
self.main_request.headers['Content-Type'] = 'multipart/mixed; boundary='+main_message.get_boundary()

def parse_response(self, response):
body_as_bytes = response.body()
# In order to use email.message parser, I need full HTTP bytes. Faking something to make the parser happy
http_body = (
b'Content-Type: ' +
response.content_type.encode('ascii') +
b'\r\n\r\n' +
body_as_bytes
)

message = message_parser(http_body) # type: Message

# Rebuild an HTTP response from pure string
responses = []
for request, raw_reponse in zip(self.requests, message.get_payload()):
if raw_reponse.get_content_type() == "application/http":
responses.append(_deserialize_response(raw_reponse.get_payload(decode=True), request))
else:
raise ValueError("Multipart doesn't support part other than application/http for now")

# Apply on_response concurrently to all requests
import concurrent.futures

def parse_responses(response):
http_request = response.request
context = PipelineContext(None)
pipeline_request = PipelineRequest(http_request, context)
pipeline_response = PipelineResponse(
http_request,
response,
context=context
)

for policy in self.policies:
policy.on_response(pipeline_request, pipeline_response)

with concurrent.futures.ThreadPoolExecutor() as executor:
# List comprehension to raise exceptions if happened
[_ for _ in executor.map(parse_responses, responses)] # pylint: disable=expression-not-assigned

return responses
Loading