Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Move DNS lookups into separate thread pool #11177

Merged
merged 7 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11177.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix performance regression when doing large number of hostname lookups at once. Introduced in v1.44.0.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
10 changes: 9 additions & 1 deletion synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from twisted.internet import defer, error, reactor
from twisted.logger import LoggingFile, LogLevel
from twisted.protocols.tls import TLSMemoryBIOFactory
from twisted.python.threadpool import ThreadPool

import synapse
from synapse.api.constants import MAX_PDU_SIZE
Expand All @@ -48,6 +49,7 @@
from synapse.metrics.jemalloc import setup_jemalloc_stats
from synapse.util.caches.lrucache import setup_expire_lru_cache_entries
from synapse.util.daemonize import daemonize_process
from synapse.util.gai_resolver import GAIResolver
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string

Expand Down Expand Up @@ -338,9 +340,10 @@ async def start(hs: "HomeServer"):
Args:
hs: homeserver instance
"""
reactor = hs.get_reactor()

# Set up the SIGHUP machinery.
if hasattr(signal, "SIGHUP"):
reactor = hs.get_reactor()

@wrap_as_background_process("sighup")
def handle_sighup(*args, **kwargs):
Expand Down Expand Up @@ -371,6 +374,11 @@ def run_sighup(*args, **kwargs):
# Start the tracer
synapse.logging.opentracing.init_tracer(hs) # type: ignore[attr-defined] # noqa

# We want to use a separate thread pool for the resolver so that large
# numbers of DNS requests don't starve out other users of the threadpool.
resolver_threadpool = ThreadPool(name="gai_resolver")
reactor.installNameResolver(GAIResolver(reactor, getThreadPool=resolver_threadpool))
clokep marked this conversation as resolved.
Show resolved Hide resolved

# Instantiate the modules so they can register their web resources to the module API
# before we start the listeners.
module_api = hs.get_module_api()
Expand Down
136 changes: 136 additions & 0 deletions synapse/util/gai_resolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# This is a direct lift from
# https://github.com/twisted/twisted/blob/release-21.2.0-10091/src/twisted/internet/_resolver.py.
squahtx marked this conversation as resolved.
Show resolved Hide resolved
# We copy it here as we need to instansiate `GAIResolver` manually, but it is a
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# private class.


from socket import (
AF_INET,
AF_INET6,
AF_UNSPEC,
SOCK_DGRAM,
SOCK_STREAM,
gaierror,
getaddrinfo,
)

from zope.interface import implementer

from twisted.internet.address import IPv4Address, IPv6Address
from twisted.internet.interfaces import IHostnameResolver, IHostResolution
from twisted.internet.threads import deferToThreadPool


@implementer(IHostResolution)
class HostResolution:
"""
The in-progress resolution of a given hostname.
"""

def __init__(self, name):
"""
Create a L{HostResolution} with the given name.
"""
self.name = name

def cancel(self):
# IHostResolution.cancel
raise NotImplementedError()


_any = frozenset([IPv4Address, IPv6Address])

_typesToAF = {
frozenset([IPv4Address]): AF_INET,
frozenset([IPv6Address]): AF_INET6,
_any: AF_UNSPEC,
}

_afToType = {
AF_INET: IPv4Address,
AF_INET6: IPv6Address,
}

_transportToSocket = {
"TCP": SOCK_STREAM,
"UDP": SOCK_DGRAM,
}

_socktypeToType = {
SOCK_STREAM: "TCP",
SOCK_DGRAM: "UDP",
}


@implementer(IHostnameResolver)
class GAIResolver:
"""
L{IHostnameResolver} implementation that resolves hostnames by calling
L{getaddrinfo} in a thread.
"""

def __init__(self, reactor, getThreadPool=None, getaddrinfo=getaddrinfo):
"""
Create a L{GAIResolver}.
@param reactor: the reactor to schedule result-delivery on
@type reactor: L{IReactorThreads}
@param getThreadPool: a function to retrieve the thread pool to use for
scheduling name resolutions. If not supplied, the use the given
C{reactor}'s thread pool.
@type getThreadPool: 0-argument callable returning a
L{twisted.python.threadpool.ThreadPool}
@param getaddrinfo: a reference to the L{getaddrinfo} to use - mainly
parameterized for testing.
@type getaddrinfo: callable with the same signature as L{getaddrinfo}
"""
self._reactor = reactor
self._getThreadPool = (
reactor.getThreadPool if getThreadPool is None else getThreadPool
)
self._getaddrinfo = getaddrinfo

def resolveHostName(
self,
resolutionReceiver,
hostName,
portNumber=0,
addressTypes=None,
transportSemantics="TCP",
):
"""
See L{IHostnameResolver.resolveHostName}
@param resolutionReceiver: see interface
@param hostName: see interface
@param portNumber: see interface
@param addressTypes: see interface
@param transportSemantics: see interface
@return: see interface
"""
pool = self._getThreadPool()
addressFamily = _typesToAF[
_any if addressTypes is None else frozenset(addressTypes)
]
socketType = _transportToSocket[transportSemantics]

def get():
try:
return self._getaddrinfo(
hostName, portNumber, addressFamily, socketType
)
except gaierror:
return []

d = deferToThreadPool(self._reactor, pool, get)
resolution = HostResolution(hostName)
resolutionReceiver.resolutionBegan(resolution)

@d.addCallback
def deliverResults(result):
for family, socktype, _proto, _cannoname, sockaddr in result:
addrType = _afToType[family]
resolutionReceiver.addressResolved(
addrType(_socktypeToType.get(socktype, "TCP"), *sockaddr)
)
resolutionReceiver.resolutionComplete()

return resolution