From 682f6b249720a18e3a6a7b213f45df4a2508f875 Mon Sep 17 00:00:00 2001 From: Henning Rogge Date: Wed, 25 Mar 2020 12:59:57 +0100 Subject: [PATCH 1/3] Add 1 second timeout for plumbing of no-response requests --- aiocoap/protocol.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/aiocoap/protocol.py b/aiocoap/protocol.py index 938122ef..c90c2a9f 100644 --- a/aiocoap/protocol.py +++ b/aiocoap/protocol.py @@ -34,7 +34,8 @@ from . import error from .numbers import (INTERNAL_SERVER_ERROR, NOT_FOUND, SERVICE_UNAVAILABLE, CONTINUE, REQUEST_ENTITY_INCOMPLETE, - OBSERVATION_RESET_TIME, MAX_TRANSMIT_WAIT) + OBSERVATION_RESET_TIME, MAX_TRANSMIT_WAIT, + NON, EMPTY) from .numbers.optionnumbers import OptionNumber import warnings @@ -609,10 +610,21 @@ def _response_cancellation_handler(self, response): def _add_response_properties(response, request): response.request = request + def request_timeout(self, msg): + if msg.mtype == NON and msg.opt.no_response == 26: + return 1 + return None + async def _run(self): # FIXME: check that responses come from the same remmote as long as we're assuming unicast - first_event = await self._plumbing_request._events.get() + timeout = self.request_timeout(self._plumbing_request.request) + + try: + first_event = await asyncio.wait_for(self._plumbing_request._events.get(), timeout=timeout) + except asyncio.TimeoutError: + self.response.set_result(Message(code=EMPTY)) + return if first_event.message is not None: self._add_response_properties(first_event.message, self._plumbing_request.request) From c9f1d69272bed0705ad865be28b53bdb45b7f5e3 Mon Sep 17 00:00:00 2001 From: Henning Rogge Date: Mon, 6 Apr 2020 13:12:35 +0200 Subject: [PATCH 2/3] Fix udp6 transport for multi-interface multicast situations --- aiocoap/protocol.py | 4 +- aiocoap/transports/udp6.py | 103 ++++++++++++++++++++++++++---------- aiocoap/util/socknumbers.py | 2 + 3 files changed, 78 insertions(+), 31 deletions(-) diff --git a/aiocoap/protocol.py b/aiocoap/protocol.py index 938122ef..49737996 100644 --- a/aiocoap/protocol.py +++ b/aiocoap/protocol.py @@ -205,7 +205,7 @@ async def create_client_context(cls, *, loggername="coap", loop=None): return self @classmethod - async def create_server_context(cls, site, bind=None, *, loggername="coap-server", loop=None, _ssl_context=None): + async def create_server_context(cls, site, bind=None, *, loggername="coap-server", loop=None, _ssl_context=None, multicastif=None): """Create a context, bound to all addresses on the CoAP port (unless otherwise specified in the ``bind`` argument). @@ -222,7 +222,7 @@ async def create_server_context(cls, site, bind=None, *, loggername="coap-server from .transports.udp6 import MessageInterfaceUDP6 await self._append_tokenmanaged_messagemanaged_transport( - lambda mman: MessageInterfaceUDP6.create_server_transport_endpoint(mman, log=self.log, loop=loop, bind=bind)) + lambda mman: MessageInterfaceUDP6.create_server_transport_endpoint(mman, log=self.log, loop=loop, bind=bind, multicastif=multicastif)) # FIXME this is duplicated from the client version, as those are client-only anyway elif transportname == 'simple6': from .transports.simple6 import MessageInterfaceSimple6 diff --git a/aiocoap/transports/udp6.py b/aiocoap/transports/udp6.py index 4dc0ea66..87abd10e 100644 --- a/aiocoap/transports/udp6.py +++ b/aiocoap/transports/udp6.py @@ -45,6 +45,8 @@ import weakref from collections import namedtuple +import netifaces + from ..message import Message from ..numbers import constants from .. import error @@ -175,10 +177,11 @@ def load(cls, data): return cls(*cls._struct.unpack_from(data)) class MessageInterfaceUDP6(RecvmsgDatagramProtocol, interfaces.MessageInterface): - def __init__(self, ctx: interfaces.MessageManager, log, loop): + def __init__(self, ctx: interfaces.MessageManager, log, loop, multicastif): self._ctx = ctx self.log = log self.loop = loop + self.multicastif = multicastif self._shutting_down = None #: Future created and used in the .shutdown() method. @@ -191,7 +194,39 @@ def _local_port(self): return self.transport.get_extra_info('socket').getsockname()[1] @classmethod - async def _create_transport_endpoint(cls, sock, ctx: interfaces.MessageManager, log, loop, multicast=False): + def _join_ipv4_multicast(cls, log, sock, multicast: str, interface: str): + if not interface: + ifaddr = '0.0.0.0' + else: + result = netifaces.ifaddresses(interface) + ifaddr = result.get(netifaces.AF_INET, [{}])[0].get('addr', '0.0.0.0') + + s = struct.pack('4s4si', + socket.inet_aton(multicast), + socket.inet_aton(ifaddr), 0) + try: + sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, s) + except OSError: + log.warning("Could not join IPv4 multicast group {} on interface {} with ip {}; possibly, there is no network connection available.".format(multicast, interface, ifaddr)) + + @classmethod + def _join_ipv6_multicast(cls, log, sock, multicast: str, interface: str): + if not interface: + ifindex = 0 + else: + ifindex = socket.if_nametoindex(interface) + + s = struct.pack('16si', + socket.inet_pton(socket.AF_INET6, multicast), + ifindex) + try: + sock.setsockopt(socket.IPPROTO_IPV6, + socket.IPV6_JOIN_GROUP, s) + except OSError: + log.warning("Could not join IPv6 multicast group {} on interface {}; possibly, there is no network connection available.".format(multicast, interface)) + + @classmethod + async def _create_transport_endpoint(cls, sock, ctx: interfaces.MessageManager, log, loop, multicastif): try: sock.setsockopt(socket.IPPROTO_IPV6, socknumbers.IPV6_RECVPKTINFO, 1) except NameError: @@ -204,28 +239,17 @@ async def _create_transport_endpoint(cls, sock, ctx: interfaces.MessageManager, else: log.warning("Transport udp6 set up on platform without RECVERR capability. ICMP errors will be ignored.") - if multicast: - # FIXME this all registers only for one interface, doesn't it? - s = struct.pack('4s4si', - socket.inet_aton(constants.MCAST_IPV4_ALLCOAPNODES), - socket.inet_aton("0.0.0.0"), 0) - try: - sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, s) - except OSError: - log.warning("Could not join IPv4 multicast group; possibly, there is no network connection available.") + # FIXME this all registers only for one interface, doesn't it? + for ifname in multicastif: + print('multicast-if', ifname) + cls._join_ipv4_multicast(log, sock, constants.MCAST_IPV4_ALLCOAPNODES, ifname) + for a in constants.MCAST_IPV6_ALL: - s = struct.pack('16si', - socket.inet_pton(socket.AF_INET6, a), - 0) - try: - sock.setsockopt(socket.IPPROTO_IPV6, - socket.IPV6_JOIN_GROUP, s) - except OSError: - log.warning("Could not join IPv6 multicast group; possibly, there is no network connection available.") - - transport, protocol = await create_recvmsg_datagram_endpoint(loop, - lambda: cls(ctx, log=log, loop=loop), - sock=sock) + cls._join_ipv6_multicast(log, sock, a, ifname) + + transport, protocol = await create_recvmsg_datagram_endpoint( + loop, lambda: cls(ctx, log=log, loop=loop, multicastif=multicastif), + sock=sock) await protocol.ready @@ -236,13 +260,14 @@ async def create_client_transport_endpoint(cls, ctx: interfaces.MessageManager, sock = socket.socket(family=socket.AF_INET6, type=socket.SOCK_DGRAM) sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) - return await cls._create_transport_endpoint(sock, ctx, log, loop, multicast=False) + return await cls._create_transport_endpoint(sock, ctx, log, loop, multicastif=None) @classmethod - async def create_server_transport_endpoint(cls, ctx: interfaces.MessageManager, log, loop, bind): + async def create_server_transport_endpoint(cls, ctx: interfaces.MessageManager, log, loop, bind, multicastif): bind = bind or ('::', None) - bind = (bind[0], bind[1] or COAP_PORT) + bind = (bind[0] or '::', bind[1] or COAP_PORT) + multicastif = multicastif or [None] # The later bind() does most of what getaddr info usually does # (including resolving names), but is missing out subtly: It does not # populate the zone identifier of an IPv6 address, making it impossible @@ -269,7 +294,7 @@ async def create_server_transport_endpoint(cls, ctx: interfaces.MessageManager, sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) sock.bind(bind) - return (await cls._create_transport_endpoint(sock, ctx, log, loop, multicast=True)) + return (await cls._create_transport_endpoint(sock, ctx, log, loop, multicastif=multicastif)) async def shutdown(self): self._shutting_down = asyncio.Future() @@ -284,7 +309,7 @@ def send(self, message): ancdata = [] if message.remote.pktinfo is not None: ancdata.append((socket.IPPROTO_IPV6, socknumbers.IPV6_PKTINFO, - message.remote.pktinfo)) + message.remote.pktinfo)) self.transport.sendmsg(message.encode(), ancdata, 0, message.remote.sockaddr) async def recognize_remote(self, remote): @@ -307,6 +332,13 @@ async def determine_remote(self, request): else: raise ValueError("No location found to send message to (neither in .opt.uri_host nor in .remote)") + if '%' in host: + if host[:5] == 'ff02:': + hostif = host.split('%')[1] + else: + host, hostif = host.split('%') + else: + hostif = None try: own_sock = self.transport.get_extra_info('socket') addrinfo = await self.loop.getaddrinfo( @@ -322,7 +354,20 @@ async def determine_remote(self, request): ) except socket.gaierror: raise error.ResolutionError("No address information found for requests to %r" % host) - return UDP6EndpointAddress(addrinfo[0][-1], self) + + pktinfo = None + if hostif: + if addrinfo[0][-1][0][:7] == '::ffff:': + pktinfo = struct.pack( + '16sI', + socket.inet_pton(socket.AF_INET6, '::ffff:0.0.0.0'), + socket.if_nametoindex(hostif)) + else: + pktinfo = struct.pack( + '16sI', + socket.inet_pton(socket.AF_INET6, '::'), + socket.if_nametoindex(hostif)) + return UDP6EndpointAddress(addrinfo[0][-1], self, pktinfo=pktinfo) # # implementing the typical DatagramProtocol interfaces. diff --git a/aiocoap/util/socknumbers.py b/aiocoap/util/socknumbers.py index 00d19ecc..87a87bde 100644 --- a/aiocoap/util/socknumbers.py +++ b/aiocoap/util/socknumbers.py @@ -42,6 +42,8 @@ # Not attempting to make any guesses for other platforms; the udp6 module # will fail to import where it needs the specifics +IP_PKTINFO = 8 + try: from IN import IPV6_RECVERR, IP_RECVERR except (ImportError, NameError): From 3a6af58421cb67fddc5bcf854e5faab28b3497e1 Mon Sep 17 00:00:00 2001 From: Henning Rogge Date: Mon, 6 Apr 2020 14:50:06 +0200 Subject: [PATCH 3/3] Add control for local source IP address --- aiocoap/message.py | 3 +- aiocoap/protocol.py | 8 +++--- aiocoap/transports/udp6.py | 59 ++++++++++++++++++++++++++------------ 3 files changed, 47 insertions(+), 23 deletions(-) diff --git a/aiocoap/message.py b/aiocoap/message.py index 9fb9f5a6..d5bf4ff3 100644 --- a/aiocoap/message.py +++ b/aiocoap/message.py @@ -127,7 +127,7 @@ class Message(object): * Some options or even the payload may differ if a proxy was involved. """ - def __init__(self, *, mtype=None, mid=None, code=None, payload=b'', token=b'', uri=None, **kwargs): + def __init__(self, *, mtype=None, mid=None, code=None, payload=b'', token=b'', uri=None, local=None, **kwargs): self.version = 1 if mtype is None: # leave it unspecified for convenience, sending functions will know what to do @@ -145,6 +145,7 @@ def __init__(self, *, mtype=None, mid=None, code=None, payload=b'', token=b'', u self.opt = Options() self.remote = None + self.local = local # deprecation error, should go away roughly after 0.2 release if self.payload is None: diff --git a/aiocoap/protocol.py b/aiocoap/protocol.py index 49737996..adc26884 100644 --- a/aiocoap/protocol.py +++ b/aiocoap/protocol.py @@ -160,7 +160,7 @@ async def _append_tokenmanaged_transport(self, token_interface_constructor): self.request_interfaces.append(tman) @classmethod - async def create_client_context(cls, *, loggername="coap", loop=None): + async def create_client_context(cls, *, loggername="coap", loop=None, local=None): """Create a context bound to all addresses on a random listening port. This is the easiest way to get a context suitable for sending client @@ -177,7 +177,7 @@ async def create_client_context(cls, *, loggername="coap", loop=None): if transportname == 'udp6': from .transports.udp6 import MessageInterfaceUDP6 await self._append_tokenmanaged_messagemanaged_transport( - lambda mman: MessageInterfaceUDP6.create_client_transport_endpoint(mman, log=self.log, loop=loop)) + lambda mman: MessageInterfaceUDP6.create_client_transport_endpoint(mman, log=self.log, loop=loop, local=local)) elif transportname == 'simple6': from .transports.simple6 import MessageInterfaceSimple6 await self._append_tokenmanaged_messagemanaged_transport( @@ -205,7 +205,7 @@ async def create_client_context(cls, *, loggername="coap", loop=None): return self @classmethod - async def create_server_context(cls, site, bind=None, *, loggername="coap-server", loop=None, _ssl_context=None, multicastif=None): + async def create_server_context(cls, site, bind=None, *, loggername="coap-server", loop=None, _ssl_context=None, multicastif=None, local=None): """Create a context, bound to all addresses on the CoAP port (unless otherwise specified in the ``bind`` argument). @@ -222,7 +222,7 @@ async def create_server_context(cls, site, bind=None, *, loggername="coap-server from .transports.udp6 import MessageInterfaceUDP6 await self._append_tokenmanaged_messagemanaged_transport( - lambda mman: MessageInterfaceUDP6.create_server_transport_endpoint(mman, log=self.log, loop=loop, bind=bind, multicastif=multicastif)) + lambda mman: MessageInterfaceUDP6.create_server_transport_endpoint(mman, log=self.log, loop=loop, bind=bind, multicastif=multicastif, local=local)) # FIXME this is duplicated from the client version, as those are client-only anyway elif transportname == 'simple6': from .transports.simple6 import MessageInterfaceSimple6 diff --git a/aiocoap/transports/udp6.py b/aiocoap/transports/udp6.py index 87abd10e..4251378e 100644 --- a/aiocoap/transports/udp6.py +++ b/aiocoap/transports/udp6.py @@ -177,12 +177,20 @@ def load(cls, data): return cls(*cls._struct.unpack_from(data)) class MessageInterfaceUDP6(RecvmsgDatagramProtocol, interfaces.MessageInterface): - def __init__(self, ctx: interfaces.MessageManager, log, loop, multicastif): + def __init__(self, ctx: interfaces.MessageManager, log, loop, multicastif, local): self._ctx = ctx self.log = log self.loop = loop self.multicastif = multicastif + self.local = {} + local = local or [] + if isinstance(local, str): + local = [local] + for addr in local: + category, ip = self._normalize_local(addr) + self.local[category] = ip + self._shutting_down = None #: Future created and used in the .shutdown() method. self.ready = asyncio.Future() #: Future that gets fullfilled by connection_made (ie. don't send before this is done; handled by ``create_..._context`` @@ -193,6 +201,19 @@ def _local_port(self): # results while the first message has not been sent yet. return self.transport.get_extra_info('socket').getsockname()[1] + @classmethod + def _normalize_local(cls, addr: str): + ip = ipaddress.ip_address(addr) + if ip.version == 4: + return '4', '::ffff:' + addr + elif ip.version == 6 and ip.ipv4_mapped: + return '4', addr + elif ip.version == 6 and ip.is_link_local: + return '6ll', addr + elif ip.version == 6 and ip.is_global: + return '6', addr + return None, None + @classmethod def _join_ipv4_multicast(cls, log, sock, multicast: str, interface: str): if not interface: @@ -226,7 +247,7 @@ def _join_ipv6_multicast(cls, log, sock, multicast: str, interface: str): log.warning("Could not join IPv6 multicast group {} on interface {}; possibly, there is no network connection available.".format(multicast, interface)) @classmethod - async def _create_transport_endpoint(cls, sock, ctx: interfaces.MessageManager, log, loop, multicastif): + async def _create_transport_endpoint(cls, sock, ctx: interfaces.MessageManager, log, loop, multicastif, local): try: sock.setsockopt(socket.IPPROTO_IPV6, socknumbers.IPV6_RECVPKTINFO, 1) except NameError: @@ -248,7 +269,7 @@ async def _create_transport_endpoint(cls, sock, ctx: interfaces.MessageManager, cls._join_ipv6_multicast(log, sock, a, ifname) transport, protocol = await create_recvmsg_datagram_endpoint( - loop, lambda: cls(ctx, log=log, loop=loop, multicastif=multicastif), + loop, lambda: cls(ctx, log=log, loop=loop, multicastif=multicastif, local=local), sock=sock) await protocol.ready @@ -256,14 +277,14 @@ async def _create_transport_endpoint(cls, sock, ctx: interfaces.MessageManager, return protocol @classmethod - async def create_client_transport_endpoint(cls, ctx: interfaces.MessageManager, log, loop): + async def create_client_transport_endpoint(cls, ctx: interfaces.MessageManager, log, loop, local): sock = socket.socket(family=socket.AF_INET6, type=socket.SOCK_DGRAM) sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) - return await cls._create_transport_endpoint(sock, ctx, log, loop, multicastif=None) + return await cls._create_transport_endpoint(sock, ctx, log, loop, multicastif=None, local=local) @classmethod - async def create_server_transport_endpoint(cls, ctx: interfaces.MessageManager, log, loop, bind, multicastif): + async def create_server_transport_endpoint(cls, ctx: interfaces.MessageManager, log, loop, bind, multicastif, local): bind = bind or ('::', None) bind = (bind[0] or '::', bind[1] or COAP_PORT) @@ -294,7 +315,7 @@ async def create_server_transport_endpoint(cls, ctx: interfaces.MessageManager, sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) sock.bind(bind) - return (await cls._create_transport_endpoint(sock, ctx, log, loop, multicastif=multicastif)) + return (await cls._create_transport_endpoint(sock, ctx, log, loop, multicastif=multicastif, local=local)) async def shutdown(self): self._shutting_down = asyncio.Future() @@ -333,7 +354,7 @@ async def determine_remote(self, request): raise ValueError("No location found to send message to (neither in .opt.uri_host nor in .remote)") if '%' in host: - if host[:5] == 'ff02:': + if host.startswith('ff02:'): hostif = host.split('%')[1] else: host, hostif = host.split('%') @@ -356,17 +377,19 @@ async def determine_remote(self, request): raise error.ResolutionError("No address information found for requests to %r" % host) pktinfo = None + ifindex = 0 if hostif: - if addrinfo[0][-1][0][:7] == '::ffff:': - pktinfo = struct.pack( - '16sI', - socket.inet_pton(socket.AF_INET6, '::ffff:0.0.0.0'), - socket.if_nametoindex(hostif)) - else: - pktinfo = struct.pack( - '16sI', - socket.inet_pton(socket.AF_INET6, '::'), - socket.if_nametoindex(hostif)) + ifindex = socket.if_nametoindex(hostif) + + if request.local: + source = request.local + elif addrinfo[0][-1][0].startswith('::ffff:') and '.' in addrinfo[0][-1][0]: + source = self.local.get('4', '::ffff:0.0.0.0') + elif addrinfo[0][-1][0].startswith('fe80::'): + source = self.local.get('6ll', '::') + else: + source = self.local.get('6', '::') + pktinfo = struct.pack('16sI', socket.inet_pton(socket.AF_INET6, source), ifindex) return UDP6EndpointAddress(addrinfo[0][-1], self, pktinfo=pktinfo) #