Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Move DNS lookups into separate thread pool #11177

Merged
merged 7 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11177.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a performance regression introduced in v1.44.0 which could cause client requests to time out when making large numbers of outbound requests.
13 changes: 12 additions & 1 deletion synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from twisted.internet import defer, error, reactor
from twisted.logger import LoggingFile, LogLevel
from twisted.protocols.tls import TLSMemoryBIOFactory
from twisted.python.threadpool import ThreadPool

import synapse
from synapse.api.constants import MAX_PDU_SIZE
Expand All @@ -48,6 +49,7 @@
from synapse.metrics.jemalloc import setup_jemalloc_stats
from synapse.util.caches.lrucache import setup_expire_lru_cache_entries
from synapse.util.daemonize import daemonize_process
from synapse.util.gai_resolver import GAIResolver
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string

Expand Down Expand Up @@ -338,9 +340,18 @@ async def start(hs: "HomeServer"):
Args:
hs: homeserver instance
"""
reactor = hs.get_reactor()

# We want to use a separate thread pool for the resolver so that large
# numbers of DNS requests don't starve out other users of the threadpool.
resolver_threadpool = ThreadPool(name="gai_resolver")
resolver_threadpool.start()
reactor.installNameResolver(
GAIResolver(reactor, getThreadPool=lambda: resolver_threadpool)
)

# Set up the SIGHUP machinery.
if hasattr(signal, "SIGHUP"):
reactor = hs.get_reactor()

@wrap_as_background_process("sighup")
def handle_sighup(*args, **kwargs):
Expand Down
136 changes: 136 additions & 0 deletions synapse/util/gai_resolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# This is a direct lift from
# https://github.com/twisted/twisted/blob/release-21.2.0-10091/src/twisted/internet/_resolver.py.
squahtx marked this conversation as resolved.
Show resolved Hide resolved
# We copy it here as we need to instantiate `GAIResolver` manually, but it is a
# private class.


from socket import (
AF_INET,
AF_INET6,
AF_UNSPEC,
SOCK_DGRAM,
SOCK_STREAM,
gaierror,
getaddrinfo,
)

from zope.interface import implementer

from twisted.internet.address import IPv4Address, IPv6Address
from twisted.internet.interfaces import IHostnameResolver, IHostResolution
from twisted.internet.threads import deferToThreadPool


@implementer(IHostResolution)
class HostResolution:
"""
The in-progress resolution of a given hostname.
"""

def __init__(self, name):
"""
Create a L{HostResolution} with the given name.
"""
self.name = name

def cancel(self):
# IHostResolution.cancel
raise NotImplementedError()


_any = frozenset([IPv4Address, IPv6Address])

_typesToAF = {
frozenset([IPv4Address]): AF_INET,
frozenset([IPv6Address]): AF_INET6,
_any: AF_UNSPEC,
}

_afToType = {
AF_INET: IPv4Address,
AF_INET6: IPv6Address,
}

_transportToSocket = {
"TCP": SOCK_STREAM,
"UDP": SOCK_DGRAM,
}

_socktypeToType = {
SOCK_STREAM: "TCP",
SOCK_DGRAM: "UDP",
}


@implementer(IHostnameResolver)
class GAIResolver:
"""
L{IHostnameResolver} implementation that resolves hostnames by calling
L{getaddrinfo} in a thread.
"""

def __init__(self, reactor, getThreadPool=None, getaddrinfo=getaddrinfo):
"""
Create a L{GAIResolver}.
@param reactor: the reactor to schedule result-delivery on
@type reactor: L{IReactorThreads}
@param getThreadPool: a function to retrieve the thread pool to use for
scheduling name resolutions. If not supplied, the use the given
C{reactor}'s thread pool.
@type getThreadPool: 0-argument callable returning a
L{twisted.python.threadpool.ThreadPool}
@param getaddrinfo: a reference to the L{getaddrinfo} to use - mainly
parameterized for testing.
@type getaddrinfo: callable with the same signature as L{getaddrinfo}
"""
self._reactor = reactor
self._getThreadPool = (
reactor.getThreadPool if getThreadPool is None else getThreadPool
)
self._getaddrinfo = getaddrinfo

def resolveHostName(
self,
resolutionReceiver,
hostName,
portNumber=0,
addressTypes=None,
transportSemantics="TCP",
):
"""
See L{IHostnameResolver.resolveHostName}
@param resolutionReceiver: see interface
@param hostName: see interface
@param portNumber: see interface
@param addressTypes: see interface
@param transportSemantics: see interface
@return: see interface
"""
pool = self._getThreadPool()
addressFamily = _typesToAF[
_any if addressTypes is None else frozenset(addressTypes)
]
socketType = _transportToSocket[transportSemantics]

def get():
try:
return self._getaddrinfo(
hostName, portNumber, addressFamily, socketType
)
except gaierror:
return []

d = deferToThreadPool(self._reactor, pool, get)
resolution = HostResolution(hostName)
resolutionReceiver.resolutionBegan(resolution)

@d.addCallback
def deliverResults(result):
for family, socktype, _proto, _cannoname, sockaddr in result:
addrType = _afToType[family]
resolutionReceiver.addressResolved(
addrType(_socktypeToType.get(socktype, "TCP"), *sockaddr)
)
resolutionReceiver.resolutionComplete()

return resolution