Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #6098 from matrix-org/erikj/cleanup_user_ips_2
Browse files Browse the repository at this point in the history
  • Loading branch information
anoadragon453 committed Feb 25, 2020
2 parents 41e59e6 + d64b70a commit f6d3b67
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 11 deletions.
1 change: 1 addition & 0 deletions changelog.d/6098.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for pruning old rows in `user_ips` table.
6 changes: 6 additions & 0 deletions docs/sample_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,12 @@ retention:
#
redaction_retention_period: 7d

# How long to track users' last seen time and IPs in the database.
#
# Defaults to `28d`. Set to `null` to disable clearing out of old rows.
#
#user_ips_max_age: 14d


## TLS ##

Expand Down
13 changes: 13 additions & 0 deletions synapse/config/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,13 @@ def read_config(self, config, **kwargs):
else:
self.redaction_retention_period = None

# How long to keep entries in the `users_ips` table.
user_ips_max_age = config.get("user_ips_max_age", "28d")
if user_ips_max_age is not None:
self.user_ips_max_age = self.parse_duration(user_ips_max_age)
else:
self.user_ips_max_age = None

# Options to disable HS
self.hs_disabled = config.get("hs_disabled", False)
self.hs_disabled_message = config.get("hs_disabled_message", "")
Expand Down Expand Up @@ -941,6 +948,12 @@ def generate_config_section(
# Defaults to `7d`. Set to `null` to disable.
#
redaction_retention_period: 7d
# How long to track users' last seen time and IPs in the database.
#
# Defaults to `28d`. Set to `null` to disable clearing out of old rows.
#
#user_ips_max_age: 14d
"""
% locals()
)
Expand Down
33 changes: 31 additions & 2 deletions synapse/metrics/background_process_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import logging
import threading
from asyncio import iscoroutine
from functools import wraps

import six

Expand Down Expand Up @@ -173,7 +175,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
Args:
desc (str): a description for this background process type
func: a function, which may return a Deferred
func: a function, which may return a Deferred or a coroutine
args: positional args for func
kwargs: keyword args for func
Expand All @@ -197,7 +199,17 @@ def run():
_background_processes.setdefault(desc, set()).add(proc)

try:
yield func(*args, **kwargs)
result = func(*args, **kwargs)

# We probably don't have an ensureDeferred in our call stack to handle
# coroutine results, so we need to ensureDeferred here.
#
# But we need this check because ensureDeferred doesn't like being
# called on immediate values (as opposed to Deferreds or coroutines).
if iscoroutine(result):
result = defer.ensureDeferred(result)

return (yield result)
except Exception:
logger.exception("Background process '%s' threw an exception", desc)
finally:
Expand All @@ -208,3 +220,20 @@ def run():

with PreserveLoggingContext():
return run()


def wrap_as_background_process(desc):
"""Decorator that wraps a function that gets called as a background
process.
Equivalent of calling the function with `run_as_background_process`
"""

def wrap_as_background_process_inner(func):
@wraps(func)
def wrap_as_background_process_inner_2(*args, **kwargs):
return run_as_background_process(desc, func, *args, **kwargs)

return wrap_as_background_process_inner_2

return wrap_as_background_process_inner
22 changes: 21 additions & 1 deletion synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,34 @@ def has_completed_background_updates(self):
"background_updates",
keyvalues=None,
retcol="1",
desc="check_background_updates",
desc="has_completed_background_updates",
)
if not updates:
self._all_done = True
return True

return False

async def has_completed_background_update(self, update_name) -> bool:
"""Check if the given background update has finished running.
"""

if self._all_done:
return True

if update_name in self._background_update_queue:
return False

update_exists = await self._simple_select_one_onecol(
"background_updates",
keyvalues={"update_name": update_name},
retcol="1",
desc="has_completed_background_update",
allow_none=True,
)

return not update_exists

@defer.inlineCallbacks
def do_next_background_update(self, desired_duration_ms):
"""Does some amount of work on the next queued background update
Expand Down
62 changes: 54 additions & 8 deletions synapse/storage/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from twisted.internet import defer

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.util.caches import CACHE_SIZE_FACTOR

from . import background_updates
Expand All @@ -42,6 +42,8 @@ def __init__(self, db_conn, hs):

super(ClientIpStore, self).__init__(db_conn, hs)

self.user_ips_max_age = hs.config.user_ips_max_age

self.register_background_index_update(
"user_ips_device_index",
index_name="user_ips_device_id",
Expand Down Expand Up @@ -100,6 +102,9 @@ def __init__(self, db_conn, hs):
"before", "shutdown", self._update_client_ips_batch
)

if self.user_ips_max_age:
self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)

@defer.inlineCallbacks
def _remove_user_ip_nonunique(self, progress, batch_size):
def f(conn):
Expand Down Expand Up @@ -319,20 +324,19 @@ def insert_client_ip(

self._batch_row_update[key] = (user_agent, device_id, now)

@wrap_as_background_process("update_client_ips")
def _update_client_ips_batch(self):

# If the DB pool has already terminated, don't try updating
if not self.hs.get_db_pool().running:
return

def update():
to_update = self._batch_row_update
self._batch_row_update = {}
return self.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
)
to_update = self._batch_row_update
self._batch_row_update = {}

return run_as_background_process("update_client_ips", update)
return self.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
)

def _update_client_ips_batch_txn(self, txn, to_update):
if "user_ips" in self._unsafe_to_upsert_tables or (
Expand Down Expand Up @@ -496,3 +500,45 @@ def _devices_last_seen_update_txn(txn):
yield self._end_background_update("devices_last_seen")

return updated

@wrap_as_background_process("prune_old_user_ips")
async def _prune_old_user_ips(self):
"""Removes entries in user IPs older than the configured period.
"""

if self.user_ips_max_age is None:
# Nothing to do
return

if not await self.has_completed_background_update("devices_last_seen"):
# Only start pruning if we have finished populating the devices
# last seen info.
return

# We do a slightly funky SQL delete to ensure we don't try and delete
# too much at once (as the table may be very large from before we
# started pruning).
#
# This works by finding the max last_seen that is less than the given
# time, but has no more than N rows before it, deleting all rows with
# a lesser last_seen time. (We COALESCE so that the sub-SELECT always
# returns exactly one row).
sql = """
DELETE FROM user_ips
WHERE last_seen <= (
SELECT COALESCE(MAX(last_seen), -1)
FROM (
SELECT last_seen FROM user_ips
WHERE last_seen <= ?
ORDER BY last_seen ASC
LIMIT 5000
) AS u
)
"""

timestamp = self.clock.time_msec() - self.user_ips_max_age

def _prune_old_user_ips_txn(txn):
txn.execute(sql, (timestamp,))

await self.runInteraction("_prune_old_user_ips", _prune_old_user_ips_txn)
71 changes: 71 additions & 0 deletions tests/storage/test_client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,77 @@ def test_devices_last_seen_bg_update(self):
r,
)

def test_old_user_ips_pruned(self):
# First make sure we have completed all updates.
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)

# Insert a user IP
user_id = "@user:id"
self.get_success(
self.store.insert_client_ip(
user_id, "access_token", "ip", "user_agent", "device_id"
)
)

# Force persisting to disk
self.reactor.advance(200)

# We should see that in the DB
result = self.get_success(
self.store._simple_select_list(
table="user_ips",
keyvalues={"user_id": user_id},
retcols=["access_token", "ip", "user_agent", "device_id", "last_seen"],
desc="get_user_ip_and_agents",
)
)

self.assertEqual(
result,
[
{
"access_token": "access_token",
"ip": "ip",
"user_agent": "user_agent",
"device_id": "device_id",
"last_seen": 0,
}
],
)

# Now advance by a couple of months
self.reactor.advance(60 * 24 * 60 * 60)

# We should get no results.
result = self.get_success(
self.store._simple_select_list(
table="user_ips",
keyvalues={"user_id": user_id},
retcols=["access_token", "ip", "user_agent", "device_id", "last_seen"],
desc="get_user_ip_and_agents",
)
)

self.assertEqual(result, [])

# But we should still get the correct values for the device
result = self.get_success(
self.store.get_last_client_ip_by_device(user_id, "device_id")
)

r = result[(user_id, "device_id")]
self.assertDictContainsSubset(
{
"user_id": user_id,
"device_id": "device_id",
"ip": "ip",
"user_agent": "user_agent",
"last_seen": 0,
},
r,
)


class ClientIpAuthTestCase(unittest.HomeserverTestCase):

Expand Down

0 comments on commit f6d3b67

Please sign in to comment.