Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Move additional tasks to the background worker #8458

Merged
merged 9 commits into from
Oct 7, 2020
2 changes: 2 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
from synapse.storage.databases.main.presence import UserPresenceState
from synapse.storage.databases.main.search import SearchWorkerStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.transactions import TransactionWorkerStore
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
from synapse.storage.databases.main.user_directory import UserDirectoryStore
from synapse.types import ReadReceipt
Expand Down Expand Up @@ -483,6 +484,7 @@ class GenericWorkerSlavedStore(
MediaRepositoryStore,
ServerMetricsStore,
SearchWorkerStore,
TransactionWorkerStore,
BaseSlavedStore,
):
pass
Expand Down
43 changes: 24 additions & 19 deletions synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,36 @@
SENTINEL = object()


class TransactionStore(SQLBaseStore):
class TransactionWorkerStore(SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)

self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)

def _start_cleanup_transactions(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we're here should we make things a bit more consistent by using wrap_as_background_process, rather than wrapping these things in background processes in a myriad of different ways?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had wondered that. I think it makes sense to do...as long as it is a separate commit. I'll go ahead and do that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead and did this, also included this for the changes from #8369 to be consistent.

return run_as_background_process(
"cleanup_transactions", self._cleanup_transactions
)

async def _cleanup_transactions(self) -> None:
now = self._clock.time_msec()
month_ago = now - 30 * 24 * 60 * 60 * 1000

def _cleanup_transactions_txn(txn):
txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,))

await self.db_pool.runInteraction(
"_cleanup_transactions", _cleanup_transactions_txn
)


class TransactionStore(TransactionWorkerStore):
"""A collection of queries for handling PDUs.
"""

def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)

self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)

self._destination_retry_cache = ExpiringCache(
cache_name="get_destination_retry_timings",
clock=self._clock,
Expand Down Expand Up @@ -266,22 +287,6 @@ def _set_destination_retry_timings(
},
)

def _start_cleanup_transactions(self):
return run_as_background_process(
"cleanup_transactions", self._cleanup_transactions
)

async def _cleanup_transactions(self) -> None:
now = self._clock.time_msec()
month_ago = now - 30 * 24 * 60 * 60 * 1000

def _cleanup_transactions_txn(txn):
txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,))

await self.db_pool.runInteraction(
"_cleanup_transactions", _cleanup_transactions_txn
)

async def store_destination_rooms_entries(
self, destinations: Iterable[str], room_id: str, stream_ordering: int,
) -> None:
Expand Down