Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #2103 from matrix-org/erikj/no-double-encode
Browse files Browse the repository at this point in the history
Don't double encode replication data
  • Loading branch information
erikjohnston authored Apr 7, 2017
2 parents 877c029 + ad544c8 commit 98ce212
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 47 deletions.
4 changes: 1 addition & 3 deletions synapse/app/federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import sys
import logging
import gc
import ujson as json

logger = logging.getLogger("synapse.app.appservice")

Expand Down Expand Up @@ -290,8 +289,7 @@ def process_replication_rows(self, stream_name, token, rows):
# Parse the rows in the stream
for row in rows:
typ = row.type
content_js = row.data
content = json.loads(content_js)
content = row.data

if typ == send_queue.PRESENCE_TYPE:
destination = content["destination"]
Expand Down
4 changes: 1 addition & 3 deletions synapse/app/synchrotron.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import logging
import contextlib
import gc
import ujson as json

logger = logging.getLogger("synapse.app.synchrotron")

Expand Down Expand Up @@ -254,9 +253,8 @@ def process_replication_rows(self, token, rows):
self._latest_room_serial = token

for row in rows:
typing = json.loads(row.user_ids)
self._room_serials[row.room_id] = token
self._room_typing[row.room_id] = typing
self._room_typing[row.room_id] = row.user_ids


class SynchrotronApplicationService(object):
Expand Down
19 changes: 9 additions & 10 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import synapse.metrics

from blist import sorteddict
import ujson


metrics = synapse.metrics.get_metrics_for(__name__)
Expand Down Expand Up @@ -258,10 +257,10 @@ def get_replication_rows(self, from_token, to_token, limit, federation_ack=None)
)

for (key, (dest, user_id)) in dest_user_ids:
rows.append((key, PRESENCE_TYPE, ujson.dumps({
rows.append((key, PRESENCE_TYPE, {
"destination": dest,
"state": self.presence_map[user_id].as_dict(),
})))
}))

# Fetch changes keyed edus
keys = self.keyed_edu_changed.keys()
Expand All @@ -271,10 +270,10 @@ def get_replication_rows(self, from_token, to_token, limit, federation_ack=None)

for (pos, (destination, edu_key)) in keyed_edus:
rows.append(
(pos, KEYED_EDU_TYPE, ujson.dumps({
(pos, KEYED_EDU_TYPE, {
"key": edu_key,
"edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(),
}))
})
)

# Fetch changed edus
Expand All @@ -284,7 +283,7 @@ def get_replication_rows(self, from_token, to_token, limit, federation_ack=None)
edus = set((k, self.edus[k]) for k in keys[i:j])

for (pos, edu) in edus:
rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict())))
rows.append((pos, EDU_TYPE, edu.get_internal_dict()))

# Fetch changed failures
keys = self.failures.keys()
Expand All @@ -293,10 +292,10 @@ def get_replication_rows(self, from_token, to_token, limit, federation_ack=None)
failures = set((k, self.failures[k]) for k in keys[i:j])

for (pos, (destination, failure)) in failures:
rows.append((pos, FAILURE_TYPE, ujson.dumps({
rows.append((pos, FAILURE_TYPE, {
"destination": destination,
"failure": failure,
})))
}))

# Fetch changed device messages
keys = self.device_messages.keys()
Expand All @@ -305,9 +304,9 @@ def get_replication_rows(self, from_token, to_token, limit, federation_ack=None)
device_messages = set((k, self.device_messages[k]) for k in keys[i:j])

for (pos, destination) in device_messages:
rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({
rows.append((pos, DEVICE_MESSAGE_TYPE, {
"destination": destination,
})))
}))

# Sort rows based on pos
rows.sort()
Expand Down
4 changes: 1 addition & 3 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import logging

from collections import namedtuple
import ujson as json

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -288,8 +287,7 @@ def get_all_typing_updates(self, last_id, current_id):
for room_id, serial in self._room_serials.items():
if last_id < serial and serial <= current_id:
typing = self._room_typing[room_id]
typing_bytes = json.dumps(list(typing), ensure_ascii=False)
rows.append((serial, room_id, typing_bytes))
rows.append((serial, room_id, list(typing)))
rows.sort()
return rows

Expand Down
104 changes: 76 additions & 28 deletions synapse/replication/tcp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,34 +36,82 @@
MAX_EVENTS_BEHIND = 10000


EventStreamRow = namedtuple("EventStreamRow",
("event_id", "room_id", "type", "state_key", "redacts"))
BackfillStreamRow = namedtuple("BackfillStreamRow",
("event_id", "room_id", "type", "state_key", "redacts"))
PresenceStreamRow = namedtuple("PresenceStreamRow",
("user_id", "state", "last_active_ts",
"last_federation_update_ts", "last_user_sync_ts",
"status_msg", "currently_active"))
TypingStreamRow = namedtuple("TypingStreamRow",
("room_id", "user_ids"))
ReceiptsStreamRow = namedtuple("ReceiptsStreamRow",
("room_id", "receipt_type", "user_id", "event_id",
"data"))
PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",))
PushersStreamRow = namedtuple("PushersStreamRow",
("user_id", "app_id", "pushkey", "deleted",))
CachesStreamRow = namedtuple("CachesStreamRow",
("cache_func", "keys", "invalidation_ts",))
PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow",
("room_id", "visibility", "appservice_id",
"network_id",))
DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", ("user_id", "destination",))
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",))
FederationStreamRow = namedtuple("FederationStreamRow", ("type", "data",))
TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow",
("user_id", "room_id", "data"))
AccountDataStreamRow = namedtuple("AccountDataStream",
("user_id", "room_id", "data_type", "data"))
EventStreamRow = namedtuple("EventStreamRow", (
"event_id", # str
"room_id", # str
"type", # str
"state_key", # str, optional
"redacts", # str, optional
))
BackfillStreamRow = namedtuple("BackfillStreamRow", (
"event_id", # str
"room_id", # str
"type", # str
"state_key", # str, optional
"redacts", # str, optional
))
PresenceStreamRow = namedtuple("PresenceStreamRow", (
"user_id", # str
"state", # str
"last_active_ts", # int
"last_federation_update_ts", # int
"last_user_sync_ts", # int
"status_msg", # str
"currently_active", # bool
))
TypingStreamRow = namedtuple("TypingStreamRow", (
"room_id", # str
"user_ids", # list(str)
))
ReceiptsStreamRow = namedtuple("ReceiptsStreamRow", (
"room_id", # str
"receipt_type", # str
"user_id", # str
"event_id", # str
"data", # dict
))
PushRulesStreamRow = namedtuple("PushRulesStreamRow", (
"user_id", # str
))
PushersStreamRow = namedtuple("PushersStreamRow", (
"user_id", # str
"app_id", # str
"pushkey", # str
"deleted", # bool
))
CachesStreamRow = namedtuple("CachesStreamRow", (
"cache_func", # str
"keys", # list(str)
"invalidation_ts", # int
))
PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow", (
"room_id", # str
"visibility", # str
"appservice_id", # str, optional
"network_id", # str, optional
))
DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", (
"user_id", # str
"destination", # str
))
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", (
"entity", # str
))
FederationStreamRow = namedtuple("FederationStreamRow", (
"type", # str
"data", # dict
))
TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", (
"user_id", # str
"room_id", # str
"data", # dict
))
AccountDataStreamRow = namedtuple("AccountDataStream", (
"user_id", # str
"room_id", # str
"data_type", # str
"data", # dict
))


class Stream(object):
Expand Down

0 comments on commit 98ce212

Please sign in to comment.