Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Abstract worker connection details for HTTP Replication requests #15578

Merged
merged 11 commits into from
May 23, 2023
1 change: 1 addition & 0 deletions changelog.d/15578.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow connecting to HTTP Replication Endpoints by using `worker_name` when constructing the request.
1 change: 1 addition & 0 deletions synapse/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,7 @@ def __init__(

self.agent: IAgent = ReplicationAgent(
hs.get_reactor(),
hs.config.worker.instance_map,
contextFactory=hs.get_http_client_context_factory(),
pool=pool,
)
Expand Down
30 changes: 23 additions & 7 deletions synapse/http/replicationagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import logging
from typing import Optional
from typing import Dict, Optional

from zope.interface import implementer

Expand All @@ -32,6 +32,7 @@
IResponse,
)

from synapse.config.workers import InstanceLocationConfig
from synapse.types import ISynapseReactor

logger = logging.getLogger(__name__)
Expand All @@ -44,9 +45,11 @@ class ReplicationEndpointFactory:
def __init__(
self,
reactor: ISynapseReactor,
instance_map: Dict[str, InstanceLocationConfig],
context_factory: IPolicyForHTTPS,
) -> None:
self.reactor = reactor
self.instance_map = instance_map
self.context_factory = context_factory

def endpointForURI(self, uri: URI) -> IStreamClientEndpoint:
Expand All @@ -58,15 +61,25 @@ def endpointForURI(self, uri: URI) -> IStreamClientEndpoint:

Returns: The correct client endpoint object
"""
if uri.scheme in (b"http", b"https"):
endpoint = HostnameEndpoint(self.reactor, uri.host, uri.port)
if uri.scheme == b"https":
# The place to connect to now comes in as the name of the worker, similar to
# a hostname in placement. Use the instance_map data to get the actual
# connection information.
realtyem marked this conversation as resolved.
Show resolved Hide resolved
worker_name = uri.netloc.decode("utf-8")
netloc = self.instance_map[worker_name].netloc()
scheme = self.instance_map[worker_name].scheme()

if scheme in ("http", "https"):
host, port = netloc.split(":", maxsplit=1)
endpoint = HostnameEndpoint(self.reactor, host, int(port))
realtyem marked this conversation as resolved.
Show resolved Hide resolved
if scheme == "https":
endpoint = wrapClientTLS(
self.context_factory.creatorForNetloc(uri.host, uri.port), endpoint
# The 'port' argument below isn't actually used by the function
self.context_factory.creatorForNetloc(host, port),
endpoint,
)
return endpoint
else:
raise SchemeNotSupported(f"Unsupported scheme: {uri.scheme!r}")
raise SchemeNotSupported(f"Unsupported scheme: {scheme}")


@implementer(IAgent)
Expand All @@ -80,6 +93,7 @@ class ReplicationAgent(_AgentBase):
def __init__(
self,
reactor: ISynapseReactor,
instance_map: Dict[str, InstanceLocationConfig],
contextFactory: IPolicyForHTTPS,
connectTimeout: Optional[float] = None,
bindAddress: Optional[bytes] = None,
Expand All @@ -102,7 +116,9 @@ def __init__(
created.
"""
_AgentBase.__init__(self, reactor, pool)
endpoint_factory = ReplicationEndpointFactory(reactor, contextFactory)
endpoint_factory = ReplicationEndpointFactory(
reactor, instance_map, contextFactory
)
self._endpointFactory = endpoint_factory

def request(
Expand Down
17 changes: 5 additions & 12 deletions synapse/replication/http/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,7 @@ async def send_request(
with outgoing_gauge.track_inprogress():
if instance_name == local_instance_name:
raise Exception("Trying to send HTTP request to self")
if instance_name in instance_map:
host = instance_map[instance_name].host
port = instance_map[instance_name].port
tls = instance_map[instance_name].tls
else:
if instance_name not in instance_map:
raise Exception(
"Instance %r not in 'instance_map' config" % (instance_name,)
)
Expand Down Expand Up @@ -271,13 +267,10 @@ async def send_request(
"Unknown METHOD on %s replication endpoint" % (cls.NAME,)
)

# Here the protocol is hard coded to be http by default or https in case the replication
# port is set to have tls true.
scheme = "https" if tls else "http"
uri = "%s://%s:%s/_synapse/replication/%s/%s" % (
scheme,
host,
port,
# Use the instance_map data to retrieve the correct scheme and use the
# instance_name to abstract the connection details into the Agent
realtyem marked this conversation as resolved.
Show resolved Hide resolved
uri = "synapse-replication://%s/_synapse/replication/%s/%s" % (
instance_name,
cls.NAME,
"/".join(url_args),
)
Expand Down