Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Move update_client_ips to the background worker #10425

Closed
anoadragon453 opened this issue Jul 19, 2021 · 5 comments · Fixed by #12251
Closed

Move update_client_ips to the background worker #10425

anoadragon453 opened this issue Jul 19, 2021 · 5 comments · Fixed by #12251
Assignees
Labels
T-Enhancement New features, changes in functionality, improvements in performance, or user-facing enhancements.

Comments

@anoadragon453
Copy link
Member

@wrap_as_background_process("update_client_ips")
async def _update_client_ips_batch(self) -> None:
# If the DB pool has already terminated, don't try updating
if not self.db_pool.is_running():
return
to_update = self._batch_row_update
self._batch_row_update = {}
await self.db_pool.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
)

I have a feeling that we wanted to keep this on the main process as it needs to only be run by one, single process. However, this is no longer a concern now that we can dedicate all background work to a single worker through the run_background_tasks_on: <worker_name> config option.

This operation has been reported as potentially CPU intensive on some deployments, so moving it off the main process would be a win.

(Also random thought: can this be converted to something that operates entirely in the database (postgres)?)

@anoadragon453 anoadragon453 added the T-Enhancement New features, changes in functionality, improvements in performance, or user-facing enhancements. label Jul 19, 2021
@anoadragon453
Copy link
Member Author

anoadragon453 commented Jul 19, 2021

I think all that needs to happen is wrapping the looping_call in a conditional for run_background_tasks_on:

class ClientIpStore(ClientIpWorkerStore):
def __init__(self, database: DatabasePool, db_conn, hs):
self.client_ip_last_seen = LruCache(
cache_name="client_ip_last_seen", max_size=50000
)
super().__init__(database, db_conn, hs)
# (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
self._batch_row_update = {}
self._client_ip_looper = self._clock.looping_call(
self._update_client_ips_batch, 5 * 1000
)
self.hs.get_reactor().addSystemEventTrigger(
"before", "shutdown", self._update_client_ips_batch
)

So something like:

        if hs.config.run_background_tasks:
            self._client_ip_looper = self._clock.looping_call(
                self._update_client_ips_batch, 5 * 1000
            )

as well as the other bits in the constructor.

@clokep
Copy link
Member

clokep commented Jul 19, 2021

It doesn't look like #8500 has any info on why this wasn't done.

I believe it was because insert_client_ip is what caches the IP batch info (by writing to _batch_row_update). This gets called by the master process, so it isn't as simple as just saying "run this other code on the background worker" since...that worker will have the in-memory cache updated.

You might be able to send those over replication, but that would be more invasive.

@anoadragon453
Copy link
Member Author

@clokep does that imply that IP addresses are only noted in the database for the main process, not any workers?

@clokep
Copy link
Member

clokep commented Jul 29, 2021

@clokep does that imply that IP addresses are only noted in the database for the main process, not any workers?

I think so but it has been a while since I looked at this. 😄

@anoadragon453
Copy link
Member Author

In addition, there's a simple optimisation we could make to the function to likely make it a bit lighter on the database.

Currently we iterate over the list of all IP entries and make one or (more often) 2 separate queries against the database server. This results in both a high amount of DB and CPU usage.

for entry in to_update.items():
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
self.db_pool.simple_upsert_txn(
txn,
table="user_ips",
keyvalues={"user_id": user_id, "access_token": access_token, "ip": ip},
values={
"user_agent": user_agent,
"device_id": device_id,
"last_seen": last_seen,
},
lock=False,
)
# Technically an access token might not be associated with
# a device so we need to check.
if device_id:
# this is always an update rather than an upsert: the row should
# already exist, and if it doesn't, that may be because it has been
# deleted, and we don't want to re-create it.
self.db_pool.simple_update_txn(
txn,
table="devices",
keyvalues={"user_id": user_id, "device_id": device_id},
updatevalues={
"user_agent": user_agent,
"last_seen": last_seen,
"ip": ip,
},
)

One simple solution we could do is to simply execute a single batch query against the server containing all modifications. We could:

  • transform the numerous calls of simple_upsert_txn to a single simple_upsert_many_txn, which utilises txn.execute_batch.
  • simple_update_txn does not (yet) have a _many derivative. I'm not sure whether we'd want to create that for this purpose.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
T-Enhancement New features, changes in functionality, improvements in performance, or user-facing enhancements.
Projects
None yet
3 participants