Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove worker_replication_* settings #15491

Merged
merged 15 commits into from
May 11, 2023
Merged
1 change: 1 addition & 0 deletions changelog.d/15491.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove need for `worker_replication_*` based settings in worker configuration yaml by placing this data directly on the `instance_map` instead.
4 changes: 0 additions & 4 deletions docker/conf-workers/worker.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
worker_app: "{{ app }}"
worker_name: "{{ name }}"

# The replication listener on the main synapse process.
worker_replication_host: 127.0.0.1
worker_replication_http_port: 9093

worker_listeners:
- type: http
port: {{ port }}
Expand Down
15 changes: 13 additions & 2 deletions docker/configure_workers_and_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@
from jinja2 import Environment, FileSystemLoader

MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
MAIN_PROCESS_INSTANCE_NAME = "main"
MAIN_PROCESS_LOCALHOST_ADDRESS = "127.0.0.1"
MAIN_PROCESS_REPLICATION_PORT = 9093

# A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced
# during processing with the name of the worker.
Expand Down Expand Up @@ -719,8 +722,8 @@ def generate_worker_files(
# shared config file.
listeners = [
{
"port": 9093,
"bind_address": "127.0.0.1",
"port": MAIN_PROCESS_REPLICATION_PORT,
"bind_address": MAIN_PROCESS_LOCALHOST_ADDRESS,
"type": "http",
"resources": [{"names": ["replication"]}],
}
Expand Down Expand Up @@ -870,6 +873,14 @@ def generate_worker_files(

workers_in_use = len(requested_worker_types) > 0

# If there are workers, add the main process to the instance_map too.
if workers_in_use:
instance_map = shared_config.setdefault("instance_map", {})
instance_map[MAIN_PROCESS_INSTANCE_NAME] = {
"host": MAIN_PROCESS_LOCALHOST_ADDRESS,
"port": MAIN_PROCESS_REPLICATION_PORT,
}

# Shared homeserver config
convert(
"/conf/shared.yaml.j2",
Expand Down
22 changes: 22 additions & 0 deletions docs/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,28 @@ process, for example:
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
```

# Upgrading to v1.83.0

## Worker Yaml setting deprecations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably rejig the title and try and make it clearer that the 3 settings not only can be removed, but should be removed because they are now deprecated (though it's also worth being fair that there is still backwards compat for now).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So...

## Worker YAML setting removal and deprecations

...instead? I guess I don't follow

Copy link
Contributor

@reivilibre reivilibre May 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess your PR title has it more or less spot on... how about:

## Deprecation of `worker_replication_*` configuration options

Then just make the upgrade notes as clear as possible about what you need to migrate (e.g. a before and after example)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gave this a shot in e3f7936


When using workers,
* `worker_replication_host`
* `worker_replication_http_port`
* `worker_replication_http_tls`

can now be removed from individual worker yaml if you add the main process to the `instance_map` in the shared yaml configuration.
realtyem marked this conversation as resolved.
Show resolved Hide resolved

Example:
```yaml
instance_map:
main:
host: localhost
port: 3456
tls: false
```
Notes:
* `tls` is optional but mirrors the functionality of `worker_replication_http_tls`
* Ensure these values match up with the `replication` listener declared for the main process.
realtyem marked this conversation as resolved.
Show resolved Hide resolved
# Upgrading to v1.81.0

## Application service path & authentication deprecations
Expand Down
23 changes: 16 additions & 7 deletions docs/usage/configuration/config_documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -3846,15 +3846,20 @@ federation_sender_instances:
### `instance_map`

When using workers this should be a map from [`worker_name`](#worker_name) to the
HTTP replication listener of the worker, if configured.
HTTP replication listener of the worker, if configured, and to the main process.
Each worker declared under [`stream_writers`](../../workers.md#stream-writers) needs
a HTTP replication listener, and that listener should be included in the `instance_map`.
(The main process also needs an HTTP replication listener, but it should not be
listed in the `instance_map`.)
The main process also needs an entry on the `instance_map`, and it should be listed under
`main` **if even one other worker exists**. Ensure the port matches with what is declared
inside the `listener` block for a `replication` listener.


Example configuration:
```yaml
instance_map:
main:
host: localhost
port: 8030
worker1:
host: localhost
port: 8034
Expand All @@ -3864,7 +3869,8 @@ instance_map:

Experimental: When using workers you can define which workers should
handle writing to streams such as event persistence and typing notifications.
Any worker specified here must also be in the [`instance_map`](#instance_map).
Any worker specified here must also be in the [`instance_map`](#instance_map), the
exception being the main process.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be honest I feel like this phrasing just adds more confusion, I'd probably settle towards the little innocent lie that you need to have all the workers, even main, defined in instance_map, even though we still have the backwards compat?

Copy link
Contributor Author

@realtyem realtyem May 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular block was about stream_writers(it's just out of sight in the diff), and since there is not a 'main' stream...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I'm not sure I'm following 'the exception being the main process' here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception is that the main process doesn't need to be anywhere on the stream_writers mapping, but everything else from the instance_map needs an entry here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be in order to just remove the words 'the exception being the main process' — if you see what I mean, the implication is going the other way to what you're saying.

What you mean is 'Any worker in the instance_map must also be specified here, the exception being the main process.' (although it's not really a MUST).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'm convinced. Fixed in f6c24e4


See the list of available streams in the
[worker documentation](../../workers.md#stream-writers).
Expand Down Expand Up @@ -3986,6 +3992,7 @@ worker_name: generic_worker1
```
---
### `worker_replication_host`
*Deprecated as of version 1.83.0. Place `host` under `main` entry on the [`instance_map`](#instance_map) in your shared yaml configuration instead.*

The HTTP replication endpoint that it should talk to on the main Synapse process.
The main Synapse process defines this with a `replication` resource in
Expand All @@ -3997,6 +4004,7 @@ worker_replication_host: 127.0.0.1
```
---
### `worker_replication_http_port`
*Deprecated as of version 1.83.0. Place `port` under `main` entry on the [`instance_map`](#instance_map) in your shared yaml configuration instead.*

The HTTP replication port that it should talk to on the main Synapse process.
The main Synapse process defines this with a `replication` resource in
Expand All @@ -4008,6 +4016,7 @@ worker_replication_http_port: 9093
```
---
### `worker_replication_http_tls`
*Deprecated as of version 1.83.0. Place `tls` under `main` entry on the [`instance_map`](#instance_map) in your shared yaml configuration instead.*

Whether TLS should be used for talking to the HTTP replication port on the main
Synapse process.
Expand All @@ -4033,9 +4042,9 @@ A worker can handle HTTP requests. To do so, a `worker_listeners` option
must be declared, in the same way as the [`listeners` option](#listeners)
realtyem marked this conversation as resolved.
Show resolved Hide resolved
in the shared config.

Workers declared in [`stream_writers`](#stream_writers) will need to include a
`replication` listener here, in order to accept internal HTTP requests from
other workers.
Workers declared in [`stream_writers`](#stream_writers) and [`instance_map`](#instance_map)
realtyem marked this conversation as resolved.
Show resolved Hide resolved
will need to include a `replication` listener here, in order to accept internal HTTP
requests from other workers.

Example configuration:
```yaml
Expand Down
41 changes: 29 additions & 12 deletions docs/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,18 @@ shared configuration file.

### Shared configuration

Normally, only a couple of changes are needed to make an existing configuration
file suitable for use with workers. First, you need to enable an
Normally, only a few changes are needed to make an existing configuration
file suitable for use with workers:
* First, you need to enable an
["HTTP replication listener"](usage/configuration/config_documentation.md#listeners)
for the main process; and secondly, you need to enable
[redis-based replication](usage/configuration/config_documentation.md#redis).
Optionally, a [shared secret](usage/configuration/config_documentation.md#worker_replication_secret)
for the main process
* Secondly, you need to enable
[redis-based replication](usage/configuration/config_documentation.md#redis)
* You will need to add an [`instance_map`](usage/configuration/config_documentation.md#instance_map)
with the `main` process defined, as well as the relevant connection information from
it's HTTP`replication` listener(defined in step 1 above). Note that the `host` defined
realtyem marked this conversation as resolved.
Show resolved Hide resolved
is the address the worker needs to look for the `main` process at, not necessarily the same address that is bound to.
* Optionally, a [shared secret](usage/configuration/config_documentation.md#worker_replication_secret)
can be used to authenticate HTTP traffic between workers. For example:

```yaml
Expand All @@ -111,6 +117,11 @@ worker_replication_secret: ""

redis:
enabled: true

instance_map:
main:
host: 'localhost'
port: 9093
```

See the [configuration manual](usage/configuration/config_documentation.md)
Expand All @@ -130,13 +141,13 @@ In the config file for each worker, you must specify:
* The type of worker ([`worker_app`](usage/configuration/config_documentation.md#worker_app)).
The currently available worker applications are listed [below](#available-worker-applications).
* A unique name for the worker ([`worker_name`](usage/configuration/config_documentation.md#worker_name)).
* The HTTP replication endpoint that it should talk to on the main synapse process
([`worker_replication_host`](usage/configuration/config_documentation.md#worker_replication_host) and
[`worker_replication_http_port`](usage/configuration/config_documentation.md#worker_replication_http_port)).
* If handling HTTP requests, a [`worker_listeners`](usage/configuration/config_documentation.md#worker_listeners) option
with an `http` listener.
* **Synapse 1.72 and older:** if handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for
the main process (`worker_main_http_uri`). This config option is no longer required and is ignored when running Synapse 1.73 and newer.
* **Synapse 1.82 and older:** The HTTP replication endpoint that it should talk to on the main synapse process
([`worker_replication_host`](usage/configuration/config_documentation.md#worker_replication_host) and
[`worker_replication_http_port`](usage/configuration/config_documentation.md#worker_replication_http_port)). If using Synapse 1.83 and newer, these are not needed if `main` is defined on the [shared configuration](#shared-configuration) `instance_map`

For example:

Expand Down Expand Up @@ -355,18 +366,24 @@ effects of bursts of events from that bridge on events sent by normal users.
Additionally, the writing of specific streams (such as events) can be moved off
of the main process to a particular worker.

To enable this, the worker must have a
[HTTP `replication` listener](usage/configuration/config_documentation.md#listeners) configured,
have a [`worker_name`](usage/configuration/config_documentation.md#worker_name)
To enable this, the worker must have:
* An [HTTP `replication` listener](usage/configuration/config_documentation.md#listeners) configured,
* Have a [`worker_name`](usage/configuration/config_documentation.md#worker_name)
and be listed in the [`instance_map`](usage/configuration/config_documentation.md#instance_map)
config. The same worker can handle multiple streams, but unless otherwise documented,
config.
* Have the main process declared on the [`instance_map`](usage/configuration/config_documentation.md#instance_map) as well.

Note: The same worker can handle multiple streams, but unless otherwise documented,
each stream can only have a single writer.

For example, to move event persistence off to a dedicated worker, the shared
configuration would include:

```yaml
instance_map:
main:
host: localhost
port: 8030
event_persister1:
host: localhost
port: 8034
Expand Down
78 changes: 61 additions & 17 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@
Synapse version. Please use ``%s: name_of_worker`` instead.
"""

_MISSING_MAIN_PROCESS_INSTANCE_MAP_DATA = """
Missing data for a worker to connect to main process. Please include '%s' in the
`instance_map` declared in your shared yaml configuration, or optionally(as a deprecated
solution) in every worker's yaml as various `worker_replication_*` settings as defined
in workers documentation here:
`https://matrix-org.github.io/synapse/latest/workers.html#worker-configuration`
"""
# This allows for a handy knob when it's time to change from 'master' to
# something with less 'history'
MAIN_PROCESS_INSTANCE_NAME = "master"
# Use this to adjust what the main process is known as in the yaml instance_map
MAIN_PROCESS_INSTANCE_MAP_NAME = "main"

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -161,27 +174,15 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
raise ConfigError("worker_log_config must be a string")
self.worker_log_config = worker_log_config

# The host used to connect to the main synapse
self.worker_replication_host = config.get("worker_replication_host", None)

# The port on the main synapse for TCP replication
if "worker_replication_port" in config:
raise ConfigError(DIRECT_TCP_ERROR, ("worker_replication_port",))

# The port on the main synapse for HTTP replication endpoint
self.worker_replication_http_port = config.get("worker_replication_http_port")

# The tls mode on the main synapse for HTTP replication endpoint.
# For backward compatibility this defaults to False.
self.worker_replication_http_tls = config.get(
"worker_replication_http_tls", False
)

# The shared secret used for authentication when connecting to the main synapse.
self.worker_replication_secret = config.get("worker_replication_secret", None)

self.worker_name = config.get("worker_name", self.worker_app)
self.instance_name = self.worker_name or "master"
self.instance_name = self.worker_name or MAIN_PROCESS_INSTANCE_NAME

# FIXME: Remove this check after a suitable amount of time.
self.worker_main_http_uri = config.get("worker_main_http_uri", None)
Expand Down Expand Up @@ -215,12 +216,55 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
)

# A map from instance name to host/port of their HTTP replication endpoint.
# Check if the main process is declared. Inject it into the map if it's not,
# based first on if a 'main' block is declared then on 'worker_replication_*'
# data. If both are available, default to instance_map. The main process
# itself doesn't need this data as it would never have to talk to itself.
instance_map: Dict[str, Any] = config.get("instance_map", {})

if instance_map and self.instance_name is not MAIN_PROCESS_INSTANCE_NAME:
# The host used to connect to the main synapse
main_host = config.get("worker_replication_host", None)

# The port on the main synapse for HTTP replication endpoint
main_port = config.get("worker_replication_http_port")

# The tls mode on the main synapse for HTTP replication endpoint.
# For backward compatibility this defaults to False.
main_tls = config.get("worker_replication_http_tls", False)

# For now, accept 'main' in the instance_map, but the replication system
# expects 'master', force that into being until it's changed later.
if MAIN_PROCESS_INSTANCE_MAP_NAME in instance_map:
instance_map.setdefault(MAIN_PROCESS_INSTANCE_NAME, {})
for k, v in instance_map[MAIN_PROCESS_INSTANCE_MAP_NAME].items():
instance_map[MAIN_PROCESS_INSTANCE_NAME].setdefault(k, v)
del instance_map[MAIN_PROCESS_INSTANCE_MAP_NAME]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering what's wrong with

Suggested change
instance_map.setdefault(MAIN_PROCESS_INSTANCE_NAME, {})
for k, v in instance_map[MAIN_PROCESS_INSTANCE_MAP_NAME].items():
instance_map[MAIN_PROCESS_INSTANCE_NAME].setdefault(k, v)
del instance_map[MAIN_PROCESS_INSTANCE_MAP_NAME]
instance_map[MAIN_PROCESS_INSTANCE_NAME] = instance_map[MAIN_PROCESS_INSTANCE_MAP_NAME]
del instance_map[MAIN_PROCESS_INSTANCE_MAP_NAME]

Copy link
Contributor Author

@realtyem realtyem May 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's been a few weeks, but IIRC it was giving me a KeyError when I tried that(I can double check, it may have been another reason). Once the migration from master to main is complete code base wide, this will all be deleted anyways. The only reason it exists here at all is because the ReplicationEndpoint class still needs it to be master

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whatever I had done before must not have been this. You got it(plus formatting changes cuz lint) in f6c24e4


# This is the backwards compatibility bit that handles the
# worker_replication_* bits using setdefault() to not overwrite anything.
elif main_host is not None and main_port is not None:
instance_map.setdefault(
MAIN_PROCESS_INSTANCE_NAME,
{
"host": main_host,
"port": main_port,
"tls": main_tls,
},
)

else:
# If we've gotten here, it means that the main process is not on the
# instance_map and that not enough worker_replication_* variables
# were declared in the worker's yaml.
raise ConfigError(
_MISSING_MAIN_PROCESS_INSTANCE_MAP_DATA
% MAIN_PROCESS_INSTANCE_MAP_NAME
)

self.instance_map: Dict[
str, InstanceLocationConfig
] = parse_and_validate_mapping(
config.get("instance_map", {}),
InstanceLocationConfig,
)
] = parse_and_validate_mapping(instance_map, InstanceLocationConfig)

# Map from type of streams to source, c.f. WriterLocations.
writers = config.get("stream_writers") or {}
Expand Down
16 changes: 5 additions & 11 deletions synapse/replication/http/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from twisted.web.server import Request

from synapse.api.errors import HttpResponseException, SynapseError
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
from synapse.http import RequestTimedOutError
from synapse.http.server import HttpServer
from synapse.http.servlet import parse_json_object_from_request
Expand Down Expand Up @@ -197,11 +198,6 @@ def make_client(cls, hs: "HomeServer") -> Callable:
client = hs.get_simple_http_client()
local_instance_name = hs.get_instance_name()

# The value of these option should match the replication listener settings
master_host = hs.config.worker.worker_replication_host
master_port = hs.config.worker.worker_replication_http_port
master_tls = hs.config.worker.worker_replication_http_tls

instance_map = hs.config.worker.instance_map

outgoing_gauge = _pending_outgoing_requests.labels(cls.NAME)
Expand All @@ -213,19 +209,17 @@ def make_client(cls, hs: "HomeServer") -> Callable:
)

@trace_with_opname("outgoing_replication_request")
async def send_request(*, instance_name: str = "master", **kwargs: Any) -> Any:
async def send_request(
*, instance_name: str = MAIN_PROCESS_INSTANCE_NAME, **kwargs: Any
) -> Any:
# We have to pull these out here to avoid circular dependencies...
streams = hs.get_replication_command_handler().get_streams_to_replicate()
replication = hs.get_replication_data_handler()

with outgoing_gauge.track_inprogress():
if instance_name == local_instance_name:
raise Exception("Trying to send HTTP request to self")
if instance_name == "master":
host = master_host
port = master_port
tls = master_tls
elif instance_name in instance_map:
if instance_name in instance_map:
host = instance_map[instance_name].host
port = instance_map[instance_name].port
tls = instance_map[instance_name].tls
Expand Down
1 change: 1 addition & 0 deletions tests/module_api/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,7 @@ def default_config(self) -> Dict[str, Any]:
conf = super().default_config()
conf["stream_writers"] = {"presence": ["presence_writer"]}
conf["instance_map"] = {
"main": {"host": "testserv", "port": 8765},
"presence_writer": {"host": "testserv", "port": 1001},
}
return conf
Expand Down
Loading