Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Move additional tasks to the background worker (#8458)
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep authored Oct 7, 2020
1 parent 8dbf62f commit e4f72dd
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 197 deletions.
1 change: 1 addition & 0 deletions changelog.d/8458.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow running background tasks in a separate worker process.
4 changes: 4 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.server import HomeServer, cache_in_self
from synapse.storage.databases.main.censor_events import CensorEventsStore
from synapse.storage.databases.main.client_ips import ClientIpWorkerStore
from synapse.storage.databases.main.media_repository import MediaRepositoryStore
from synapse.storage.databases.main.metrics import ServerMetricsStore
from synapse.storage.databases.main.monthly_active_users import (
Expand All @@ -135,6 +136,7 @@
from synapse.storage.databases.main.presence import UserPresenceState
from synapse.storage.databases.main.search import SearchWorkerStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.transactions import TransactionWorkerStore
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
from synapse.storage.databases.main.user_directory import UserDirectoryStore
from synapse.types import ReadReceipt
Expand Down Expand Up @@ -466,6 +468,7 @@ class GenericWorkerSlavedStore(
SlavedAccountDataStore,
SlavedPusherStore,
CensorEventsStore,
ClientIpWorkerStore,
SlavedEventStore,
SlavedKeyStore,
RoomStore,
Expand All @@ -481,6 +484,7 @@ class GenericWorkerSlavedStore(
MediaRepositoryStore,
ServerMetricsStore,
SearchWorkerStore,
TransactionWorkerStore,
BaseSlavedStore,
):
pass
Expand Down
33 changes: 11 additions & 22 deletions synapse/app/phone_stats_home.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import math
import resource
import sys

from prometheus_client import Gauge

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)

logger = logging.getLogger("synapse.app.homeserver")

Expand All @@ -41,6 +43,7 @@
)


@wrap_as_background_process("phone_stats_home")
async def phone_stats_home(hs, stats, stats_process=_stats_process):
logger.info("Gathering stats for reporting")
now = int(hs.get_clock().time())
Expand Down Expand Up @@ -143,20 +146,10 @@ def performance_stats_init():
(int(hs.get_clock().time()), resource.getrusage(resource.RUSAGE_SELF))
)

def start_phone_stats_home():
return run_as_background_process(
"phone_stats_home", phone_stats_home, hs, stats
)

def generate_user_daily_visit_stats():
return run_as_background_process(
"generate_user_daily_visits", hs.get_datastore().generate_user_daily_visits
)

# Rather than update on per session basis, batch up the requests.
# If you increase the loop period, the accuracy of user_daily_visits
# table will decrease
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
clock.looping_call(hs.get_datastore().generate_user_daily_visits, 5 * 60 * 1000)

# monthly active user limiting functionality
def reap_monthly_active_users():
Expand All @@ -167,6 +160,7 @@ def reap_monthly_active_users():
clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
reap_monthly_active_users()

@wrap_as_background_process("generate_monthly_active_users")
async def generate_monthly_active_users():
current_mau_count = 0
current_mau_count_by_service = {}
Expand All @@ -186,24 +180,19 @@ async def generate_monthly_active_users():
registered_reserved_users_mau_gauge.set(float(len(reserved_users)))
max_mau_gauge.set(float(hs.config.max_mau_value))

def start_generate_monthly_active_users():
return run_as_background_process(
"generate_monthly_active_users", generate_monthly_active_users
)

if hs.config.limit_usage_by_mau or hs.config.mau_stats_only:
start_generate_monthly_active_users()
clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
generate_monthly_active_users()
clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
# End of monthly active user settings

if hs.config.report_stats:
logger.info("Scheduling stats reporting for 3 hour intervals")
clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)
clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000, hs, stats)

# We need to defer this init for the cases that we daemonize
# otherwise the process ID we get is that of the non-daemon process
clock.call_later(0, performance_stats_init)

# We wait 5 minutes to send the first set of stats as the server can
# be quite busy the first few minutes
clock.call_later(5 * 60, start_phone_stats_home)
clock.call_later(5 * 60, phone_stats_home, hs, stats)
109 changes: 57 additions & 52 deletions synapse/storage/databases/main/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,63 @@ def _devices_last_seen_update_txn(txn):
return updated


class ClientIpStore(ClientIpBackgroundUpdateStore):
class ClientIpWorkerStore(ClientIpBackgroundUpdateStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)

self.user_ips_max_age = hs.config.user_ips_max_age

if hs.config.run_background_tasks and self.user_ips_max_age:
self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)

@wrap_as_background_process("prune_old_user_ips")
async def _prune_old_user_ips(self):
"""Removes entries in user IPs older than the configured period.
"""

if self.user_ips_max_age is None:
# Nothing to do
return

if not await self.db_pool.updates.has_completed_background_update(
"devices_last_seen"
):
# Only start pruning if we have finished populating the devices
# last seen info.
return

# We do a slightly funky SQL delete to ensure we don't try and delete
# too much at once (as the table may be very large from before we
# started pruning).
#
# This works by finding the max last_seen that is less than the given
# time, but has no more than N rows before it, deleting all rows with
# a lesser last_seen time. (We COALESCE so that the sub-SELECT always
# returns exactly one row).
sql = """
DELETE FROM user_ips
WHERE last_seen <= (
SELECT COALESCE(MAX(last_seen), -1)
FROM (
SELECT last_seen FROM user_ips
WHERE last_seen <= ?
ORDER BY last_seen ASC
LIMIT 5000
) AS u
)
"""

timestamp = self.clock.time_msec() - self.user_ips_max_age

def _prune_old_user_ips_txn(txn):
txn.execute(sql, (timestamp,))

await self.db_pool.runInteraction(
"_prune_old_user_ips", _prune_old_user_ips_txn
)


class ClientIpStore(ClientIpWorkerStore):
def __init__(self, database: DatabasePool, db_conn, hs):

self.client_ip_last_seen = Cache(
Expand All @@ -360,8 +416,6 @@ def __init__(self, database: DatabasePool, db_conn, hs):

super().__init__(database, db_conn, hs)

self.user_ips_max_age = hs.config.user_ips_max_age

# (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
self._batch_row_update = {}

Expand All @@ -372,9 +426,6 @@ def __init__(self, database: DatabasePool, db_conn, hs):
"before", "shutdown", self._update_client_ips_batch
)

if self.user_ips_max_age:
self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)

async def insert_client_ip(
self, user_id, access_token, ip, user_agent, device_id, now=None
):
Expand Down Expand Up @@ -525,49 +576,3 @@ async def get_user_ip_and_agents(self, user):
}
for (access_token, ip), (user_agent, last_seen) in results.items()
]

@wrap_as_background_process("prune_old_user_ips")
async def _prune_old_user_ips(self):
"""Removes entries in user IPs older than the configured period.
"""

if self.user_ips_max_age is None:
# Nothing to do
return

if not await self.db_pool.updates.has_completed_background_update(
"devices_last_seen"
):
# Only start pruning if we have finished populating the devices
# last seen info.
return

# We do a slightly funky SQL delete to ensure we don't try and delete
# too much at once (as the table may be very large from before we
# started pruning).
#
# This works by finding the max last_seen that is less than the given
# time, but has no more than N rows before it, deleting all rows with
# a lesser last_seen time. (We COALESCE so that the sub-SELECT always
# returns exactly one row).
sql = """
DELETE FROM user_ips
WHERE last_seen <= (
SELECT COALESCE(MAX(last_seen), -1)
FROM (
SELECT last_seen FROM user_ips
WHERE last_seen <= ?
ORDER BY last_seen ASC
LIMIT 5000
) AS u
)
"""

timestamp = self.clock.time_msec() - self.user_ips_max_age

def _prune_old_user_ips_txn(txn):
txn.execute(sql, (timestamp,))

await self.db_pool.runInteraction(
"_prune_old_user_ips", _prune_old_user_ips_txn
)
14 changes: 5 additions & 9 deletions synapse/storage/databases/main/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from typing import Dict

from synapse.metrics import GaugeBucketCollector
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.event_push_actions import (
Expand Down Expand Up @@ -57,18 +57,13 @@ def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)

# Read the extrems every 60 minutes
def read_forward_extremities():
# run as a background process to make sure that the database transactions
# have a logcontext to report to
return run_as_background_process(
"read_forward_extremities", self._read_forward_extremities
)

hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000)
if hs.config.run_background_tasks:
self._clock.looping_call(self._read_forward_extremities, 60 * 60 * 1000)

# Used in _generate_user_daily_visits to keep track of progress
self._last_user_visit_update = self._get_start_of_day()

@wrap_as_background_process("read_forward_extremities")
async def _read_forward_extremities(self):
def fetch(txn):
txn.execute(
Expand Down Expand Up @@ -274,6 +269,7 @@ def _get_start_of_day(self):
today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0))
return today_start * 1000

@wrap_as_background_process("generate_user_daily_visits")
async def generate_user_daily_visits(self) -> None:
"""
Generates daily visit data for use in cohort/ retention analysis
Expand Down
Loading

0 comments on commit e4f72dd

Please sign in to comment.