This repository has been archived by the owner on Apr 26, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Wait for streams to catch up when processing HTTP replication. #14820
Merged
Merged
Changes from 6 commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
a2d8ed5
Always pass response body
erikjohnston e36ff7b
Wait for streams to catch up when processing HTTP replication.
erikjohnston 7f2700b
Don't wait for streams when asking for stream updates
erikjohnston 473cc10
Send out `POSITION` commands for all streams
erikjohnston a03ee6e
Change ID generator to return position of last write
erikjohnston 6edfd62
Newsfile
erikjohnston 01ae502
Make ClassVar
erikjohnston ace2b8c
Don't fail if waiting for stream update times out
erikjohnston 20590ce
Merge branch 'develop' into erikj/wait_for_repl
erikjohnston 7a0a51e
Merge branch 'develop' into erikj/wait_for_repl
erikjohnston File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix rare races when using workers. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ | |
from synapse.api.errors import HttpResponseException, SynapseError | ||
from synapse.http import RequestTimedOutError | ||
from synapse.http.server import HttpServer | ||
from synapse.http.servlet import parse_json_object_from_request | ||
from synapse.http.site import SynapseRequest | ||
from synapse.logging import opentracing | ||
from synapse.logging.opentracing import trace_with_opname | ||
|
@@ -53,6 +54,9 @@ | |
) | ||
|
||
|
||
_STREAM_POSITION_KEY = "_INT_STREAM_POS" | ||
|
||
|
||
class ReplicationEndpoint(metaclass=abc.ABCMeta): | ||
"""Helper base class for defining new replication HTTP endpoints. | ||
|
||
|
@@ -94,6 +98,9 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): | |
a connection error is received. | ||
RETRY_ON_CONNECT_ERROR_ATTEMPTS (int): Number of attempts to retry when | ||
receiving connection errors, each will backoff exponentially longer. | ||
WAIT_FOR_STREAMS (bool): Whether to wait for replication streams to | ||
catch up before processing the request and/or response. Defaults to | ||
True. | ||
""" | ||
|
||
NAME: str = abc.abstractproperty() # type: ignore | ||
|
@@ -104,6 +111,8 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): | |
RETRY_ON_CONNECT_ERROR = True | ||
RETRY_ON_CONNECT_ERROR_ATTEMPTS = 5 # =63s (2^6-1) | ||
|
||
WAIT_FOR_STREAMS = True | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be annotated as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Err, yeah can do! |
||
|
||
def __init__(self, hs: "HomeServer"): | ||
if self.CACHE: | ||
self.response_cache: ResponseCache[str] = ResponseCache( | ||
|
@@ -126,6 +135,10 @@ def __init__(self, hs: "HomeServer"): | |
if hs.config.worker.worker_replication_secret: | ||
self._replication_secret = hs.config.worker.worker_replication_secret | ||
|
||
self._streams = hs.get_replication_command_handler().get_streams_to_replicate() | ||
self._replication = hs.get_replication_data_handler() | ||
self._instance_name = hs.get_instance_name() | ||
|
||
def _check_auth(self, request: Request) -> None: | ||
# Get the authorization header. | ||
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization") | ||
|
@@ -160,7 +173,7 @@ async def _serialize_payload(**kwargs) -> JsonDict: | |
|
||
@abc.abstractmethod | ||
async def _handle_request( | ||
self, request: Request, **kwargs: Any | ||
self, request: Request, content: JsonDict, **kwargs: Any | ||
) -> Tuple[int, JsonDict]: | ||
"""Handle incoming request. | ||
|
||
|
@@ -201,6 +214,10 @@ def make_client(cls, hs: "HomeServer") -> Callable: | |
|
||
@trace_with_opname("outgoing_replication_request") | ||
async def send_request(*, instance_name: str = "master", **kwargs: Any) -> Any: | ||
# We have to pull these out here to avoid circular dependencies... | ||
streams = hs.get_replication_command_handler().get_streams_to_replicate() | ||
replication = hs.get_replication_data_handler() | ||
DMRobertson marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
with outgoing_gauge.track_inprogress(): | ||
if instance_name == local_instance_name: | ||
raise Exception("Trying to send HTTP request to self") | ||
|
@@ -219,6 +236,24 @@ async def send_request(*, instance_name: str = "master", **kwargs: Any) -> Any: | |
|
||
data = await cls._serialize_payload(**kwargs) | ||
|
||
if cls.METHOD != "GET" and cls.WAIT_FOR_STREAMS: | ||
# Include the current stream positions that we write to. We | ||
# don't do this for GETs as they don't have a body, and we | ||
# generally assume that a GET won't rely on data we have | ||
# written. | ||
if _STREAM_POSITION_KEY in data: | ||
raise Exception( | ||
"data to send contains %r key", _STREAM_POSITION_KEY | ||
) | ||
|
||
data[_STREAM_POSITION_KEY] = { | ||
"streams": { | ||
stream.NAME: stream.current_token(local_instance_name) | ||
for stream in streams | ||
}, | ||
"instance_name": local_instance_name, | ||
} | ||
|
||
url_args = [ | ||
urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS | ||
] | ||
|
@@ -308,6 +343,17 @@ async def send_request(*, instance_name: str = "master", **kwargs: Any) -> Any: | |
) from e | ||
|
||
_outgoing_request_counter.labels(cls.NAME, 200).inc() | ||
|
||
# Wait on any streams that the remote may have written to. | ||
for stream_name, position in result.get( | ||
_STREAM_POSITION_KEY, {} | ||
).items(): | ||
await replication.wait_for_stream_position( | ||
instance_name=instance_name, | ||
stream_name=stream_name, | ||
position=position, | ||
) | ||
|
||
return result | ||
|
||
return send_request | ||
|
@@ -353,6 +399,22 @@ async def _check_auth_and_handle( | |
if self._replication_secret: | ||
self._check_auth(request) | ||
|
||
if self.METHOD == "GET": | ||
# GET APIs always have an empty body. | ||
content = {} | ||
else: | ||
content = parse_json_object_from_request(request) | ||
|
||
# Wait on any streams that the remote may have written to. | ||
for stream_name, position in content.get(_STREAM_POSITION_KEY, {"streams": {}})[ | ||
"streams" | ||
].items(): | ||
await self._replication.wait_for_stream_position( | ||
instance_name=content[_STREAM_POSITION_KEY]["instance_name"], | ||
stream_name=stream_name, | ||
position=position, | ||
) | ||
|
||
if self.CACHE: | ||
txn_id = kwargs.pop("txn_id") | ||
|
||
|
@@ -361,13 +423,28 @@ async def _check_auth_and_handle( | |
# correctly yet. In particular, there may be issues to do with logging | ||
# context lifetimes. | ||
|
||
return await self.response_cache.wrap( | ||
txn_id, self._handle_request, request, **kwargs | ||
code, response = await self.response_cache.wrap( | ||
txn_id, self._handle_request, request, content, **kwargs | ||
) | ||
else: | ||
# The `@cancellable` decorator may be applied to `_handle_request`. But we | ||
# told `HttpServer.register_paths` that our handler is `_check_auth_and_handle`, | ||
# so we have to set up the cancellable flag ourselves. | ||
request.is_render_cancellable = is_function_cancellable( | ||
self._handle_request | ||
) | ||
|
||
code, response = await self._handle_request(request, content, **kwargs) | ||
|
||
# Return streams we may have written to in the course of processing this | ||
# request. | ||
if _STREAM_POSITION_KEY in response: | ||
raise Exception("data to send contains %r key", _STREAM_POSITION_KEY) | ||
|
||
# The `@cancellable` decorator may be applied to `_handle_request`. But we | ||
# told `HttpServer.register_paths` that our handler is `_check_auth_and_handle`, | ||
# so we have to set up the cancellable flag ourselves. | ||
request.is_render_cancellable = is_function_cancellable(self._handle_request) | ||
if self.WAIT_FOR_STREAMS: | ||
response[_STREAM_POSITION_KEY] = { | ||
stream.NAME: stream.current_token(self._instance_name) | ||
for stream in self._streams | ||
} | ||
|
||
return await self._handle_request(request, **kwargs) | ||
return code, response |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this change, did we have to wait for something else to notify replication?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We poke the notifier below for all non-backfilled events, and since I don't think anything "waits" on the backfill stream that has broadly been OK.
But yeah, its not ideal. I kinda want to move the poke to replication more close to where we advance the stream tokens, but that proved a bit of a PITA due to circular dependencies.