Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Move additional tasks to the background worker #8458

Merged
merged 9 commits into from
Oct 7, 2020
4 changes: 4 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.server import HomeServer, cache_in_self
from synapse.storage.databases.main.censor_events import CensorEventsStore
from synapse.storage.databases.main.client_ips import ClientIpWorkerStore
from synapse.storage.databases.main.media_repository import MediaRepositoryStore
from synapse.storage.databases.main.metrics import ServerMetricsStore
from synapse.storage.databases.main.monthly_active_users import (
Expand All @@ -135,6 +136,7 @@
from synapse.storage.databases.main.presence import UserPresenceState
from synapse.storage.databases.main.search import SearchWorkerStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.transactions import TransactionWorkerStore
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
from synapse.storage.databases.main.user_directory import UserDirectoryStore
from synapse.types import ReadReceipt
Expand Down Expand Up @@ -466,6 +468,7 @@ class GenericWorkerSlavedStore(
SlavedAccountDataStore,
SlavedPusherStore,
CensorEventsStore,
ClientIpWorkerStore,
SlavedEventStore,
SlavedKeyStore,
RoomStore,
Expand All @@ -481,6 +484,7 @@ class GenericWorkerSlavedStore(
MediaRepositoryStore,
ServerMetricsStore,
SearchWorkerStore,
TransactionWorkerStore,
BaseSlavedStore,
):
pass
Expand Down
109 changes: 57 additions & 52 deletions synapse/storage/databases/main/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,63 @@ def _devices_last_seen_update_txn(txn):
return updated


class ClientIpStore(ClientIpBackgroundUpdateStore):
class ClientIpWorkerStore(ClientIpBackgroundUpdateStore):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was unsure which of these should be the base class? It seems we do have a few layouts:

  • worker -> background update -> main
  • worker -> main
  • (worker, background update) -> main
  • background update -> worker -> main

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it depends on if functions in the background updater depend on stuff from the worker or vice versa 🤷

def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)

self.user_ips_max_age = hs.config.user_ips_max_age

if hs.config.run_background_tasks and self.user_ips_max_age:
self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)

@wrap_as_background_process("prune_old_user_ips")
async def _prune_old_user_ips(self):
"""Removes entries in user IPs older than the configured period.
"""

if self.user_ips_max_age is None:
# Nothing to do
return

if not await self.db_pool.updates.has_completed_background_update(
"devices_last_seen"
):
# Only start pruning if we have finished populating the devices
# last seen info.
return

# We do a slightly funky SQL delete to ensure we don't try and delete
# too much at once (as the table may be very large from before we
# started pruning).
#
# This works by finding the max last_seen that is less than the given
# time, but has no more than N rows before it, deleting all rows with
# a lesser last_seen time. (We COALESCE so that the sub-SELECT always
# returns exactly one row).
sql = """
DELETE FROM user_ips
WHERE last_seen <= (
SELECT COALESCE(MAX(last_seen), -1)
FROM (
SELECT last_seen FROM user_ips
WHERE last_seen <= ?
ORDER BY last_seen ASC
LIMIT 5000
) AS u
)
"""

timestamp = self.clock.time_msec() - self.user_ips_max_age

def _prune_old_user_ips_txn(txn):
txn.execute(sql, (timestamp,))

await self.db_pool.runInteraction(
"_prune_old_user_ips", _prune_old_user_ips_txn
)


class ClientIpStore(ClientIpWorkerStore):
def __init__(self, database: DatabasePool, db_conn, hs):

self.client_ip_last_seen = Cache(
Expand All @@ -360,8 +416,6 @@ def __init__(self, database: DatabasePool, db_conn, hs):

super().__init__(database, db_conn, hs)

self.user_ips_max_age = hs.config.user_ips_max_age

# (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
self._batch_row_update = {}

Expand All @@ -372,9 +426,6 @@ def __init__(self, database: DatabasePool, db_conn, hs):
"before", "shutdown", self._update_client_ips_batch
)

if self.user_ips_max_age:
self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)

async def insert_client_ip(
self, user_id, access_token, ip, user_agent, device_id, now=None
):
Expand Down Expand Up @@ -525,49 +576,3 @@ async def get_user_ip_and_agents(self, user):
}
for (access_token, ip), (user_agent, last_seen) in results.items()
]

@wrap_as_background_process("prune_old_user_ips")
async def _prune_old_user_ips(self):
"""Removes entries in user IPs older than the configured period.
"""

if self.user_ips_max_age is None:
# Nothing to do
return

if not await self.db_pool.updates.has_completed_background_update(
"devices_last_seen"
):
# Only start pruning if we have finished populating the devices
# last seen info.
return

# We do a slightly funky SQL delete to ensure we don't try and delete
# too much at once (as the table may be very large from before we
# started pruning).
#
# This works by finding the max last_seen that is less than the given
# time, but has no more than N rows before it, deleting all rows with
# a lesser last_seen time. (We COALESCE so that the sub-SELECT always
# returns exactly one row).
sql = """
DELETE FROM user_ips
WHERE last_seen <= (
SELECT COALESCE(MAX(last_seen), -1)
FROM (
SELECT last_seen FROM user_ips
WHERE last_seen <= ?
ORDER BY last_seen ASC
LIMIT 5000
) AS u
)
"""

timestamp = self.clock.time_msec() - self.user_ips_max_age

def _prune_old_user_ips_txn(txn):
txn.execute(sql, (timestamp,))

await self.db_pool.runInteraction(
"_prune_old_user_ips", _prune_old_user_ips_txn
)
5 changes: 4 additions & 1 deletion synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ def __init__(self, database: DatabasePool, db_conn, hs):
self._check_safe_current_state_events_membership_updated_txn(txn)
txn.close()

if self.hs.config.metrics_flags.known_servers:
if (
self.hs.config.run_background_tasks
and self.hs.config.metrics_flags.known_servers
):
self._known_servers_count = 1
self.hs.get_clock().looping_call(
run_as_background_process,
Expand Down
43 changes: 24 additions & 19 deletions synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,36 @@
SENTINEL = object()


class TransactionStore(SQLBaseStore):
class TransactionWorkerStore(SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)

self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)

def _start_cleanup_transactions(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we're here should we make things a bit more consistent by using wrap_as_background_process, rather than wrapping these things in background processes in a myriad of different ways?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had wondered that. I think it makes sense to do...as long as it is a separate commit. I'll go ahead and do that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead and did this, also included this for the changes from #8369 to be consistent.

return run_as_background_process(
"cleanup_transactions", self._cleanup_transactions
)

async def _cleanup_transactions(self) -> None:
now = self._clock.time_msec()
month_ago = now - 30 * 24 * 60 * 60 * 1000

def _cleanup_transactions_txn(txn):
txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,))

await self.db_pool.runInteraction(
"_cleanup_transactions", _cleanup_transactions_txn
)


class TransactionStore(TransactionWorkerStore):
"""A collection of queries for handling PDUs.
"""

def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)

self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)

self._destination_retry_cache = ExpiringCache(
cache_name="get_destination_retry_timings",
clock=self._clock,
Expand Down Expand Up @@ -266,22 +287,6 @@ def _set_destination_retry_timings(
},
)

def _start_cleanup_transactions(self):
return run_as_background_process(
"cleanup_transactions", self._cleanup_transactions
)

async def _cleanup_transactions(self) -> None:
now = self._clock.time_msec()
month_ago = now - 30 * 24 * 60 * 60 * 1000

def _cleanup_transactions_txn(txn):
txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,))

await self.db_pool.runInteraction(
"_cleanup_transactions", _cleanup_transactions_txn
)

async def store_destination_rooms_entries(
self, destinations: Iterable[str], room_id: str, stream_ordering: int,
) -> None:
Expand Down