|
14 | 14 | import itertools
|
15 | 15 | import logging
|
16 | 16 | from queue import Empty, PriorityQueue
|
17 |
| -from typing import Collection, Dict, Iterable, List, Set, Tuple |
| 17 | +from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple |
18 | 18 |
|
19 | 19 | from synapse.api.constants import MAX_DEPTH
|
20 | 20 | from synapse.api.errors import StoreError
|
21 |
| -from synapse.events import EventBase |
| 21 | +from synapse.api.room_versions import RoomVersion |
| 22 | +from synapse.events import EventBase, make_event_from_dict |
22 | 23 | from synapse.metrics.background_process_metrics import wrap_as_background_process
|
23 |
| -from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause |
| 24 | +from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause |
24 | 25 | from synapse.storage.database import DatabasePool, LoggingTransaction
|
25 | 26 | from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
26 | 27 | from synapse.storage.databases.main.signatures import SignatureWorkerStore
|
27 | 28 | from synapse.storage.engines import PostgresEngine
|
28 | 29 | from synapse.storage.types import Cursor
|
| 30 | +from synapse.util import json_encoder |
29 | 31 | from synapse.util.caches.descriptors import cached
|
30 | 32 | from synapse.util.caches.lrucache import LruCache
|
31 | 33 | from synapse.util.iterutils import batch_iter
|
@@ -1044,6 +1046,107 @@ def _delete_old_forward_extrem_cache_txn(txn):
|
1044 | 1046 | _delete_old_forward_extrem_cache_txn,
|
1045 | 1047 | )
|
1046 | 1048 |
|
| 1049 | + async def insert_received_event_to_staging( |
| 1050 | + self, origin: str, event: EventBase |
| 1051 | + ) -> None: |
| 1052 | + """Insert a newly received event from federation into the staging area.""" |
| 1053 | + |
| 1054 | + # We use an upsert here to handle the case where we see the same event |
| 1055 | + # from the same server multiple times. |
| 1056 | + await self.db_pool.simple_upsert( |
| 1057 | + table="federation_inbound_events_staging", |
| 1058 | + keyvalues={ |
| 1059 | + "origin": origin, |
| 1060 | + "event_id": event.event_id, |
| 1061 | + }, |
| 1062 | + values={}, |
| 1063 | + insertion_values={ |
| 1064 | + "room_id": event.room_id, |
| 1065 | + "received_ts": self._clock.time_msec(), |
| 1066 | + "event_json": json_encoder.encode(event.get_dict()), |
| 1067 | + "internal_metadata": json_encoder.encode( |
| 1068 | + event.internal_metadata.get_dict() |
| 1069 | + ), |
| 1070 | + }, |
| 1071 | + desc="insert_received_event_to_staging", |
| 1072 | + ) |
| 1073 | + |
| 1074 | + async def remove_received_event_from_staging( |
| 1075 | + self, |
| 1076 | + origin: str, |
| 1077 | + event_id: str, |
| 1078 | + ) -> None: |
| 1079 | + """Remove the given event from the staging area""" |
| 1080 | + await self.db_pool.simple_delete( |
| 1081 | + table="federation_inbound_events_staging", |
| 1082 | + keyvalues={ |
| 1083 | + "origin": origin, |
| 1084 | + "event_id": event_id, |
| 1085 | + }, |
| 1086 | + desc="remove_received_event_from_staging", |
| 1087 | + ) |
| 1088 | + |
| 1089 | + async def get_next_staged_event_id_for_room( |
| 1090 | + self, |
| 1091 | + room_id: str, |
| 1092 | + ) -> Optional[Tuple[str, str]]: |
| 1093 | + """Get the next event ID in the staging area for the given room.""" |
| 1094 | + |
| 1095 | + def _get_next_staged_event_id_for_room_txn(txn): |
| 1096 | + sql = """ |
| 1097 | + SELECT origin, event_id |
| 1098 | + FROM federation_inbound_events_staging |
| 1099 | + WHERE room_id = ? |
| 1100 | + ORDER BY received_ts ASC |
| 1101 | + LIMIT 1 |
| 1102 | + """ |
| 1103 | + |
| 1104 | + txn.execute(sql, (room_id,)) |
| 1105 | + |
| 1106 | + return txn.fetchone() |
| 1107 | + |
| 1108 | + return await self.db_pool.runInteraction( |
| 1109 | + "get_next_staged_event_id_for_room", _get_next_staged_event_id_for_room_txn |
| 1110 | + ) |
| 1111 | + |
| 1112 | + async def get_next_staged_event_for_room( |
| 1113 | + self, |
| 1114 | + room_id: str, |
| 1115 | + room_version: RoomVersion, |
| 1116 | + ) -> Optional[Tuple[str, EventBase]]: |
| 1117 | + """Get the next event in the staging area for the given room.""" |
| 1118 | + |
| 1119 | + def _get_next_staged_event_for_room_txn(txn): |
| 1120 | + sql = """ |
| 1121 | + SELECT event_json, internal_metadata, origin |
| 1122 | + FROM federation_inbound_events_staging |
| 1123 | + WHERE room_id = ? |
| 1124 | + ORDER BY received_ts ASC |
| 1125 | + LIMIT 1 |
| 1126 | + """ |
| 1127 | + txn.execute(sql, (room_id,)) |
| 1128 | + |
| 1129 | + return txn.fetchone() |
| 1130 | + |
| 1131 | + row = await self.db_pool.runInteraction( |
| 1132 | + "get_next_staged_event_for_room", _get_next_staged_event_for_room_txn |
| 1133 | + ) |
| 1134 | + |
| 1135 | + if not row: |
| 1136 | + return None |
| 1137 | + |
| 1138 | + event_d = db_to_json(row[0]) |
| 1139 | + internal_metadata_d = db_to_json(row[1]) |
| 1140 | + origin = row[2] |
| 1141 | + |
| 1142 | + event = make_event_from_dict( |
| 1143 | + event_dict=event_d, |
| 1144 | + room_version=room_version, |
| 1145 | + internal_metadata_dict=internal_metadata_d, |
| 1146 | + ) |
| 1147 | + |
| 1148 | + return origin, event |
| 1149 | + |
1047 | 1150 |
|
1048 | 1151 | class EventFederationStore(EventFederationWorkerStore):
|
1049 | 1152 | """Responsible for storing and serving up the various graphs associated
|
|
0 commit comments