diff --git a/hathor/p2p/node_sync.py b/hathor/p2p/node_sync.py index 9de01413c..0ae5d51c3 100644 --- a/hathor/p2p/node_sync.py +++ b/hathor/p2p/node_sync.py @@ -207,6 +207,9 @@ def __init__(self, protocol: 'HathorProtocol', downloader: Downloader, reactor: self.call_later_id: Optional[IDelayedCall] = None self.call_later_interval: int = 1 # seconds + # Keep track of call laters. + self._send_tips_call_later: list[IDelayedCall] = [] + # Timestamp of the peer's latest block (according to the peer itself) self.peer_timestamp: int = 0 @@ -284,6 +287,9 @@ def stop(self) -> None: self.send_data_queue.stop() if self.call_later_id and self.call_later_id.active(): self.call_later_id.cancel() + for call_later in self._send_tips_call_later: + if call_later.active(): + call_later.cancel() # XXX: force remove this connection from _all_ pending downloads self.downloader.drop_connection(self) @@ -617,13 +623,24 @@ def send_tips(self, timestamp: Optional[int] = None, include_hashes: bool = Fals """Try to send a TIPS message. If rate limit has been reached, it schedules to send it later.""" if not self.global_rate_limiter.add_hit(self.GlobalRateLimiter.SEND_TIPS): self.log.debug('send_tips throttled') - self.reactor.callLater(1, self.send_tips, timestamp, include_hashes, offset) + self._send_tips_call_later.append( + self.reactor.callLater( + 1, self.send_tips, timestamp, include_hashes, offset + ) + ) return self._send_tips(timestamp, include_hashes, offset) def _send_tips(self, timestamp: Optional[int] = None, include_hashes: bool = False, offset: int = 0) -> None: """ Send a TIPS message. """ + # Filter for active delayed calls once one is executing + self._send_tips_call_later = [ + call_later + for call_later in self._send_tips_call_later + if call_later.active() + ] + if timestamp is None: timestamp = self.manager.tx_storage.latest_timestamp diff --git a/hathor/simulator/simulator.py b/hathor/simulator/simulator.py index b17e0a074..878a5d2be 100644 --- a/hathor/simulator/simulator.py +++ b/hathor/simulator/simulator.py @@ -219,6 +219,9 @@ def get_peer(self, name: str) -> HathorManager: def add_connection(self, conn: 'FakeConnection') -> None: self._connections.append(conn) + def remove_connection(self, conn: 'FakeConnection') -> None: + self._connections.remove(conn) + def _run(self, interval: float, step: float, status_interval: float) -> Generator[None, None, None]: """ Implementation of run, yields at every step to allow verifications like in run_until_complete """ diff --git a/tests/p2p/test_sync_rate_limiter.py b/tests/p2p/test_sync_rate_limiter.py index df0020841..3d7d99291 100644 --- a/tests/p2p/test_sync_rate_limiter.py +++ b/tests/p2p/test_sync_rate_limiter.py @@ -1,4 +1,6 @@ -from unittest.mock import MagicMock +from unittest.mock import MagicMock, Mock + +from twisted.python.failure import Failure from hathor.simulator import FakeConnection from hathor.simulator.trigger import StopAfterNMinedBlocks @@ -25,8 +27,8 @@ def test_sync_rate_limiter(self): connected_peers2 = list(manager2.connections.connected_peers.values()) self.assertEqual(1, len(connected_peers2)) - protocol2 = connected_peers2[0] - sync2 = protocol2.state.sync_manager + protocol1 = connected_peers2[0] + sync2 = protocol1.state.sync_manager sync2._send_tips = MagicMock() for i in range(100): @@ -40,6 +42,91 @@ def test_sync_rate_limiter(self): self.simulator._clock.advance(2000) self.assertTrue(sync2._send_tips.call_count, 16) + def test_sync_rate_limiter_disconnect(self): + # Test send_tips delayed calls cancelation with disconnection + manager1 = self.create_peer() + manager2 = self.create_peer() + manager2.connections.MAX_ENABLED_SYNC = 0 + + conn12 = FakeConnection(manager1, manager2, latency=0.05) + self.simulator.add_connection(conn12) + self.simulator.run(3600) + + connections = manager2.connections + connections.rate_limiter.reset(connections.GlobalRateLimiter.SEND_TIPS) + connections.enable_rate_limiter(1, 1) + + connected_peers2 = list(manager2.connections.connected_peers.values()) + self.assertEqual(1, len(connected_peers2)) + + protocol1 = connected_peers2[0] + sync1 = protocol1.state.sync_manager + sync1._send_tips = Mock(wraps=sync1._send_tips) + + sync1.send_tips() + self.assertEqual(sync1._send_tips.call_count, 1) + self.assertEqual(len(sync1._send_tips_call_later), 0) + + sync1.send_tips() + self.assertEqual(sync1._send_tips.call_count, 1) + self.assertEqual(len(sync1._send_tips_call_later), 1) + + sync1.send_tips() + self.assertEqual(sync1._send_tips.call_count, 1) + self.assertEqual(len(sync1._send_tips_call_later), 2) + + # Close the connection. + conn12.disconnect(Failure(Exception('testing'))) + self.simulator.remove_connection(conn12) + + self.simulator.run(30) + + # Send tips should not be called any further since the connection has already been closed. + self.assertEqual(sync1._send_tips.call_count, 1) + # Residual delayed calls + self.assertEqual(len(sync1._send_tips_call_later), 2) + # The residual delayed calls should have been canceled + for call_later in sync1._send_tips_call_later: + self.assertFalse(call_later.active()) + + def test_sync_rate_limiter_delayed_calls_draining(self): + # Test the draining of delayed calls from _send_tips_call_later list + manager1 = self.create_peer() + manager2 = self.create_peer() + manager2.connections.MAX_ENABLED_SYNC = 0 + + conn12 = FakeConnection(manager1, manager2, latency=0.05) + self.simulator.add_connection(conn12) + self.simulator.run(3600) + + connections = manager2.connections + connections.rate_limiter.reset(connections.GlobalRateLimiter.SEND_TIPS) + connections.enable_rate_limiter(1, 1) + + connected_peers2 = list(manager2.connections.connected_peers.values()) + self.assertEqual(1, len(connected_peers2)) + + protocol1 = connected_peers2[0] + sync1 = protocol1.state.sync_manager + + sync1.send_tips() + self.assertEqual(len(sync1._send_tips_call_later), 0) + + sync1.send_tips() + self.assertEqual(len(sync1._send_tips_call_later), 1) + + sync1.send_tips() + self.assertEqual(len(sync1._send_tips_call_later), 2) + + sync1.send_tips() + self.assertEqual(len(sync1._send_tips_call_later), 3) + + self.simulator.run(30) + + # Without disconnection, all the delayed calls + # should have been executed + self.assertEqual(len(sync1._send_tips_call_later), 0) + class SyncV1RandomSimulatorTestCase(unittest.SyncV1Params, BaseRandomSimulatorTestCase): __test__ = True