Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 1 second timeout for plumbing of no-response requests #190

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion aiocoap/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class Message(object):
* Some options or even the payload may differ if a proxy was involved.
"""

def __init__(self, *, mtype=None, mid=None, code=None, payload=b'', token=b'', uri=None, **kwargs):
def __init__(self, *, mtype=None, mid=None, code=None, payload=b'', token=b'', uri=None, local=None, **kwargs):
self.version = 1
if mtype is None:
# leave it unspecified for convenience, sending functions will know what to do
Expand All @@ -145,6 +145,7 @@ def __init__(self, *, mtype=None, mid=None, code=None, payload=b'', token=b'', u
self.opt = Options()

self.remote = None
self.local = local

# deprecation error, should go away roughly after 0.2 release
if self.payload is None:
Expand Down
24 changes: 18 additions & 6 deletions aiocoap/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
from . import error
from .numbers import (INTERNAL_SERVER_ERROR, NOT_FOUND,
SERVICE_UNAVAILABLE, CONTINUE, REQUEST_ENTITY_INCOMPLETE,
OBSERVATION_RESET_TIME, MAX_TRANSMIT_WAIT)
OBSERVATION_RESET_TIME, MAX_TRANSMIT_WAIT,
NON, EMPTY)
from .numbers.optionnumbers import OptionNumber

import warnings
Expand Down Expand Up @@ -160,7 +161,7 @@ async def _append_tokenmanaged_transport(self, token_interface_constructor):
self.request_interfaces.append(tman)

@classmethod
async def create_client_context(cls, *, loggername="coap", loop=None):
async def create_client_context(cls, *, loggername="coap", loop=None, local=None):
"""Create a context bound to all addresses on a random listening port.

This is the easiest way to get a context suitable for sending client
Expand All @@ -177,7 +178,7 @@ async def create_client_context(cls, *, loggername="coap", loop=None):
if transportname == 'udp6':
from .transports.udp6 import MessageInterfaceUDP6
await self._append_tokenmanaged_messagemanaged_transport(
lambda mman: MessageInterfaceUDP6.create_client_transport_endpoint(mman, log=self.log, loop=loop))
lambda mman: MessageInterfaceUDP6.create_client_transport_endpoint(mman, log=self.log, loop=loop, local=local))
elif transportname == 'simple6':
from .transports.simple6 import MessageInterfaceSimple6
await self._append_tokenmanaged_messagemanaged_transport(
Expand Down Expand Up @@ -205,7 +206,7 @@ async def create_client_context(cls, *, loggername="coap", loop=None):
return self

@classmethod
async def create_server_context(cls, site, bind=None, *, loggername="coap-server", loop=None, _ssl_context=None):
async def create_server_context(cls, site, bind=None, *, loggername="coap-server", loop=None, _ssl_context=None, multicastif=None, local=None):
"""Create a context, bound to all addresses on the CoAP port (unless
otherwise specified in the ``bind`` argument).

Expand All @@ -222,7 +223,7 @@ async def create_server_context(cls, site, bind=None, *, loggername="coap-server
from .transports.udp6 import MessageInterfaceUDP6

await self._append_tokenmanaged_messagemanaged_transport(
lambda mman: MessageInterfaceUDP6.create_server_transport_endpoint(mman, log=self.log, loop=loop, bind=bind))
lambda mman: MessageInterfaceUDP6.create_server_transport_endpoint(mman, log=self.log, loop=loop, bind=bind, multicastif=multicastif, local=local))
# FIXME this is duplicated from the client version, as those are client-only anyway
elif transportname == 'simple6':
from .transports.simple6 import MessageInterfaceSimple6
Expand Down Expand Up @@ -609,10 +610,21 @@ def _response_cancellation_handler(self, response):
def _add_response_properties(response, request):
response.request = request

def request_timeout(self, msg):
if msg.mtype == NON and msg.opt.no_response == 26:
return 1
return None

async def _run(self):
# FIXME: check that responses come from the same remmote as long as we're assuming unicast

first_event = await self._plumbing_request._events.get()
timeout = self.request_timeout(self._plumbing_request.request)

try:
first_event = await asyncio.wait_for(self._plumbing_request._events.get(), timeout=timeout)
except asyncio.TimeoutError:
self.response.set_result(Message(code=EMPTY))
return

if first_event.message is not None:
self._add_response_properties(first_event.message, self._plumbing_request.request)
Expand Down
128 changes: 98 additions & 30 deletions aiocoap/transports/udp6.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import weakref
from collections import namedtuple

import netifaces

from ..message import Message
from ..numbers import constants
from .. import error
Expand Down Expand Up @@ -175,10 +177,19 @@ def load(cls, data):
return cls(*cls._struct.unpack_from(data))

class MessageInterfaceUDP6(RecvmsgDatagramProtocol, interfaces.MessageInterface):
def __init__(self, ctx: interfaces.MessageManager, log, loop):
def __init__(self, ctx: interfaces.MessageManager, log, loop, multicastif, local):
self._ctx = ctx
self.log = log
self.loop = loop
self.multicastif = multicastif

self.local = {}
local = local or []
if isinstance(local, str):
local = [local]
for addr in local:
category, ip = self._normalize_local(addr)
self.local[category] = ip

self._shutting_down = None #: Future created and used in the .shutdown() method.

Expand All @@ -191,7 +202,52 @@ def _local_port(self):
return self.transport.get_extra_info('socket').getsockname()[1]

@classmethod
async def _create_transport_endpoint(cls, sock, ctx: interfaces.MessageManager, log, loop, multicast=False):
def _normalize_local(cls, addr: str):
ip = ipaddress.ip_address(addr)
if ip.version == 4:
return '4', '::ffff:' + addr
elif ip.version == 6 and ip.ipv4_mapped:
return '4', addr
elif ip.version == 6 and ip.is_link_local:
return '6ll', addr
elif ip.version == 6 and ip.is_global:
return '6', addr
return None, None

@classmethod
def _join_ipv4_multicast(cls, log, sock, multicast: str, interface: str):
if not interface:
ifaddr = '0.0.0.0'
else:
result = netifaces.ifaddresses(interface)
ifaddr = result.get(netifaces.AF_INET, [{}])[0].get('addr', '0.0.0.0')

s = struct.pack('4s4si',
socket.inet_aton(multicast),
socket.inet_aton(ifaddr), 0)
try:
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, s)
except OSError:
log.warning("Could not join IPv4 multicast group {} on interface {} with ip {}; possibly, there is no network connection available.".format(multicast, interface, ifaddr))

@classmethod
def _join_ipv6_multicast(cls, log, sock, multicast: str, interface: str):
if not interface:
ifindex = 0
else:
ifindex = socket.if_nametoindex(interface)

s = struct.pack('16si',
socket.inet_pton(socket.AF_INET6, multicast),
ifindex)
try:
sock.setsockopt(socket.IPPROTO_IPV6,
socket.IPV6_JOIN_GROUP, s)
except OSError:
log.warning("Could not join IPv6 multicast group {} on interface {}; possibly, there is no network connection available.".format(multicast, interface))

@classmethod
async def _create_transport_endpoint(cls, sock, ctx: interfaces.MessageManager, log, loop, multicastif, local):
try:
sock.setsockopt(socket.IPPROTO_IPV6, socknumbers.IPV6_RECVPKTINFO, 1)
except NameError:
Expand All @@ -204,45 +260,35 @@ async def _create_transport_endpoint(cls, sock, ctx: interfaces.MessageManager,
else:
log.warning("Transport udp6 set up on platform without RECVERR capability. ICMP errors will be ignored.")

if multicast:
# FIXME this all registers only for one interface, doesn't it?
s = struct.pack('4s4si',
socket.inet_aton(constants.MCAST_IPV4_ALLCOAPNODES),
socket.inet_aton("0.0.0.0"), 0)
try:
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, s)
except OSError:
log.warning("Could not join IPv4 multicast group; possibly, there is no network connection available.")
# FIXME this all registers only for one interface, doesn't it?
for ifname in multicastif:
print('multicast-if', ifname)
cls._join_ipv4_multicast(log, sock, constants.MCAST_IPV4_ALLCOAPNODES, ifname)

for a in constants.MCAST_IPV6_ALL:
s = struct.pack('16si',
socket.inet_pton(socket.AF_INET6, a),
0)
try:
sock.setsockopt(socket.IPPROTO_IPV6,
socket.IPV6_JOIN_GROUP, s)
except OSError:
log.warning("Could not join IPv6 multicast group; possibly, there is no network connection available.")

transport, protocol = await create_recvmsg_datagram_endpoint(loop,
lambda: cls(ctx, log=log, loop=loop),
sock=sock)
cls._join_ipv6_multicast(log, sock, a, ifname)

transport, protocol = await create_recvmsg_datagram_endpoint(
loop, lambda: cls(ctx, log=log, loop=loop, multicastif=multicastif, local=local),
sock=sock)

await protocol.ready

return protocol

@classmethod
async def create_client_transport_endpoint(cls, ctx: interfaces.MessageManager, log, loop):
async def create_client_transport_endpoint(cls, ctx: interfaces.MessageManager, log, loop, local):
sock = socket.socket(family=socket.AF_INET6, type=socket.SOCK_DGRAM)
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)

return await cls._create_transport_endpoint(sock, ctx, log, loop, multicast=False)
return await cls._create_transport_endpoint(sock, ctx, log, loop, multicastif=None, local=local)

@classmethod
async def create_server_transport_endpoint(cls, ctx: interfaces.MessageManager, log, loop, bind):
async def create_server_transport_endpoint(cls, ctx: interfaces.MessageManager, log, loop, bind, multicastif, local):
bind = bind or ('::', None)
bind = (bind[0], bind[1] or COAP_PORT)
bind = (bind[0] or '::', bind[1] or COAP_PORT)

multicastif = multicastif or [None]
# The later bind() does most of what getaddr info usually does
# (including resolving names), but is missing out subtly: It does not
# populate the zone identifier of an IPv6 address, making it impossible
Expand All @@ -269,7 +315,7 @@ async def create_server_transport_endpoint(cls, ctx: interfaces.MessageManager,
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
sock.bind(bind)

return (await cls._create_transport_endpoint(sock, ctx, log, loop, multicast=True))
return (await cls._create_transport_endpoint(sock, ctx, log, loop, multicastif=multicastif, local=local))

async def shutdown(self):
self._shutting_down = asyncio.Future()
Expand All @@ -284,7 +330,7 @@ def send(self, message):
ancdata = []
if message.remote.pktinfo is not None:
ancdata.append((socket.IPPROTO_IPV6, socknumbers.IPV6_PKTINFO,
message.remote.pktinfo))
message.remote.pktinfo))
self.transport.sendmsg(message.encode(), ancdata, 0, message.remote.sockaddr)

async def recognize_remote(self, remote):
Expand All @@ -307,6 +353,13 @@ async def determine_remote(self, request):
else:
raise ValueError("No location found to send message to (neither in .opt.uri_host nor in .remote)")

if '%' in host:
if host.startswith('ff02:'):
hostif = host.split('%')[1]
else:
host, hostif = host.split('%')
else:
hostif = None
try:
own_sock = self.transport.get_extra_info('socket')
addrinfo = await self.loop.getaddrinfo(
Expand All @@ -322,7 +375,22 @@ async def determine_remote(self, request):
)
except socket.gaierror:
raise error.ResolutionError("No address information found for requests to %r" % host)
return UDP6EndpointAddress(addrinfo[0][-1], self)

pktinfo = None
ifindex = 0
if hostif:
ifindex = socket.if_nametoindex(hostif)

if request.local:
source = request.local
elif addrinfo[0][-1][0].startswith('::ffff:') and '.' in addrinfo[0][-1][0]:
source = self.local.get('4', '::ffff:0.0.0.0')
elif addrinfo[0][-1][0].startswith('fe80::'):
source = self.local.get('6ll', '::')
else:
source = self.local.get('6', '::')
pktinfo = struct.pack('16sI', socket.inet_pton(socket.AF_INET6, source), ifindex)
return UDP6EndpointAddress(addrinfo[0][-1], self, pktinfo=pktinfo)

#
# implementing the typical DatagramProtocol interfaces.
Expand Down
2 changes: 2 additions & 0 deletions aiocoap/util/socknumbers.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
# Not attempting to make any guesses for other platforms; the udp6 module
# will fail to import where it needs the specifics

IP_PKTINFO = 8

try:
from IN import IPV6_RECVERR, IP_RECVERR
except (ImportError, NameError):
Expand Down