|  | 
| 18 | 18 | # [This file includes modifications made by New Vector Limited] | 
| 19 | 19 | # | 
| 20 | 20 | # | 
|  | 21 | +import logging | 
| 21 | 22 | from unittest.mock import Mock | 
| 22 | 23 | 
 | 
| 23 | 24 | from synapse.handlers.typing import RoomMember, TypingWriterHandler | 
| @@ -99,75 +100,83 @@ def test_reset(self) -> None: | 
| 99 | 100 |         This is emulated by jumping the stream ahead, then reconnecting (which | 
| 100 | 101 |         sends the proper position and RDATA). | 
| 101 | 102 |         """ | 
| 102 |  | -        typing = self.hs.get_typing_handler() | 
| 103 |  | -        assert isinstance(typing, TypingWriterHandler) | 
| 104 |  | - | 
| 105 |  | -        # Create a typing update before we reconnect so that there is a missing | 
| 106 |  | -        # update to fetch. | 
| 107 |  | -        typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True) | 
| 108 |  | - | 
| 109 |  | -        self.reconnect() | 
| 110 |  | - | 
| 111 |  | -        typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True) | 
| 112 |  | - | 
| 113 |  | -        self.reactor.advance(0) | 
| 114 |  | - | 
| 115 |  | -        # We should now see an attempt to connect to the master | 
| 116 |  | -        request = self.handle_http_replication_attempt() | 
| 117 |  | -        self.assert_request_is_get_repl_stream_updates(request, "typing") | 
| 118 |  | - | 
| 119 |  | -        self.mock_handler.on_rdata.assert_called_once() | 
| 120 |  | -        stream_name, _, token, rdata_rows = self.mock_handler.on_rdata.call_args[0] | 
| 121 |  | -        self.assertEqual(stream_name, "typing") | 
| 122 |  | -        self.assertEqual(1, len(rdata_rows)) | 
| 123 |  | -        row: TypingStream.TypingStreamRow = rdata_rows[0] | 
| 124 |  | -        self.assertEqual(ROOM_ID, row.room_id) | 
| 125 |  | -        self.assertEqual([USER_ID], row.user_ids) | 
| 126 |  | - | 
| 127 |  | -        # Push the stream forward a bunch so it can be reset. | 
| 128 |  | -        for i in range(100): | 
| 129 |  | -            typing._push_update( | 
| 130 |  | -                member=RoomMember(ROOM_ID, "@test%s:blue" % i), typing=True | 
|  | 103 | +        # A huge RDATA log line is triggered in this test, which breaks trial | 
|  | 104 | +        # ref: https://github.com/twisted/twisted/issues/12482 | 
|  | 105 | +        server_logger = logging.getLogger("tests.server") | 
|  | 106 | +        server_logger_was_disabled = server_logger.disabled | 
|  | 107 | +        server_logger.disabled = True | 
|  | 108 | +        try: | 
|  | 109 | +            typing = self.hs.get_typing_handler() | 
|  | 110 | +            assert isinstance(typing, TypingWriterHandler) | 
|  | 111 | + | 
|  | 112 | +            # Create a typing update before we reconnect so that there is a missing | 
|  | 113 | +            # update to fetch. | 
|  | 114 | +            typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True) | 
|  | 115 | + | 
|  | 116 | +            self.reconnect() | 
|  | 117 | + | 
|  | 118 | +            typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True) | 
|  | 119 | + | 
|  | 120 | +            self.reactor.advance(0) | 
|  | 121 | + | 
|  | 122 | +            # We should now see an attempt to connect to the master | 
|  | 123 | +            request = self.handle_http_replication_attempt() | 
|  | 124 | +            self.assert_request_is_get_repl_stream_updates(request, "typing") | 
|  | 125 | + | 
|  | 126 | +            self.mock_handler.on_rdata.assert_called_once() | 
|  | 127 | +            stream_name, _, token, rdata_rows = self.mock_handler.on_rdata.call_args[0] | 
|  | 128 | +            self.assertEqual(stream_name, "typing") | 
|  | 129 | +            self.assertEqual(1, len(rdata_rows)) | 
|  | 130 | +            row: TypingStream.TypingStreamRow = rdata_rows[0] | 
|  | 131 | +            self.assertEqual(ROOM_ID, row.room_id) | 
|  | 132 | +            self.assertEqual([USER_ID], row.user_ids) | 
|  | 133 | + | 
|  | 134 | +            # Push the stream forward a bunch so it can be reset. | 
|  | 135 | +            for i in range(100): | 
|  | 136 | +                typing._push_update( | 
|  | 137 | +                    member=RoomMember(ROOM_ID, "@test%s:blue" % i), typing=True | 
|  | 138 | +                ) | 
|  | 139 | +            self.reactor.advance(0) | 
|  | 140 | + | 
|  | 141 | +            # Disconnect. | 
|  | 142 | +            self.disconnect() | 
|  | 143 | + | 
|  | 144 | +            # Reset the typing handler | 
|  | 145 | +            self.hs.get_replication_streams()["typing"].last_token = 0 | 
|  | 146 | +            self.hs.get_replication_command_handler()._streams["typing"].last_token = 0 | 
|  | 147 | +            typing._latest_room_serial = 0 | 
|  | 148 | +            typing._typing_stream_change_cache = StreamChangeCache( | 
|  | 149 | +                name="TypingStreamChangeCache", | 
|  | 150 | +                server_name=self.hs.hostname, | 
|  | 151 | +                current_stream_pos=typing._latest_room_serial, | 
| 131 | 152 |             ) | 
| 132 |  | -        self.reactor.advance(0) | 
| 133 |  | - | 
| 134 |  | -        # Disconnect. | 
| 135 |  | -        self.disconnect() | 
| 136 |  | - | 
| 137 |  | -        # Reset the typing handler | 
| 138 |  | -        self.hs.get_replication_streams()["typing"].last_token = 0 | 
| 139 |  | -        self.hs.get_replication_command_handler()._streams["typing"].last_token = 0 | 
| 140 |  | -        typing._latest_room_serial = 0 | 
| 141 |  | -        typing._typing_stream_change_cache = StreamChangeCache( | 
| 142 |  | -            name="TypingStreamChangeCache", | 
| 143 |  | -            server_name=self.hs.hostname, | 
| 144 |  | -            current_stream_pos=typing._latest_room_serial, | 
| 145 |  | -        ) | 
| 146 |  | -        typing._reset() | 
| 147 |  | - | 
| 148 |  | -        # Reconnect. | 
| 149 |  | -        self.reconnect() | 
| 150 |  | -        self.pump(0.1) | 
| 151 |  | - | 
| 152 |  | -        # We should now see an attempt to connect to the master | 
| 153 |  | -        request = self.handle_http_replication_attempt() | 
| 154 |  | -        self.assert_request_is_get_repl_stream_updates(request, "typing") | 
| 155 |  | - | 
| 156 |  | -        # Reset the test code. | 
| 157 |  | -        self.mock_handler.on_rdata.reset_mock() | 
| 158 |  | -        self.mock_handler.on_rdata.assert_not_called() | 
| 159 |  | - | 
| 160 |  | -        # Push additional data. | 
| 161 |  | -        typing._push_update(member=RoomMember(ROOM_ID_2, USER_ID_2), typing=False) | 
| 162 |  | -        self.reactor.advance(0) | 
| 163 |  | - | 
| 164 |  | -        self.mock_handler.on_rdata.assert_called_once() | 
| 165 |  | -        stream_name, _, token, rdata_rows = self.mock_handler.on_rdata.call_args[0] | 
| 166 |  | -        self.assertEqual(stream_name, "typing") | 
| 167 |  | -        self.assertEqual(1, len(rdata_rows)) | 
| 168 |  | -        row = rdata_rows[0] | 
| 169 |  | -        self.assertEqual(ROOM_ID_2, row.room_id) | 
| 170 |  | -        self.assertEqual([], row.user_ids) | 
| 171 |  | - | 
| 172 |  | -        # The token should have been reset. | 
| 173 |  | -        self.assertEqual(token, 1) | 
|  | 153 | +            typing._reset() | 
|  | 154 | + | 
|  | 155 | +            # Reconnect. | 
|  | 156 | +            self.reconnect() | 
|  | 157 | +            self.pump(0.1) | 
|  | 158 | + | 
|  | 159 | +            # We should now see an attempt to connect to the master | 
|  | 160 | +            request = self.handle_http_replication_attempt() | 
|  | 161 | +            self.assert_request_is_get_repl_stream_updates(request, "typing") | 
|  | 162 | + | 
|  | 163 | +            # Reset the test code. | 
|  | 164 | +            self.mock_handler.on_rdata.reset_mock() | 
|  | 165 | +            self.mock_handler.on_rdata.assert_not_called() | 
|  | 166 | + | 
|  | 167 | +            # Push additional data. | 
|  | 168 | +            typing._push_update(member=RoomMember(ROOM_ID_2, USER_ID_2), typing=False) | 
|  | 169 | +            self.reactor.advance(0) | 
|  | 170 | + | 
|  | 171 | +            self.mock_handler.on_rdata.assert_called_once() | 
|  | 172 | +            stream_name, _, token, rdata_rows = self.mock_handler.on_rdata.call_args[0] | 
|  | 173 | +            self.assertEqual(stream_name, "typing") | 
|  | 174 | +            self.assertEqual(1, len(rdata_rows)) | 
|  | 175 | +            row = rdata_rows[0] | 
|  | 176 | +            self.assertEqual(ROOM_ID_2, row.room_id) | 
|  | 177 | +            self.assertEqual([], row.user_ids) | 
|  | 178 | + | 
|  | 179 | +            # The token should have been reset. | 
|  | 180 | +            self.assertEqual(token, 1) | 
|  | 181 | +        finally: | 
|  | 182 | +            server_logger.disabled = server_logger_was_disabled | 
0 commit comments