From 292550f68317ec669fd4c73709a961a6afc54287 Mon Sep 17 00:00:00 2001 From: Alex Ruzenhack Date: Wed, 7 Jun 2023 18:30:44 +0100 Subject: [PATCH] chore(p2p): Add stop to delayed calls --- hathor/conf/settings.py | 3 ++ hathor/p2p/sync_v1/agent.py | 5 +++ tests/p2p/test_sync_rate_limiter.py | 51 +++++++++++++++++++++++++++++ 3 files changed, 59 insertions(+) diff --git a/hathor/conf/settings.py b/hathor/conf/settings.py index 62deef54d..3f4a9309c 100644 --- a/hathor/conf/settings.py +++ b/hathor/conf/settings.py @@ -390,6 +390,9 @@ def MAXIMUM_NUMBER_OF_HALVINGS(self) -> int: # All settings related to Feature Activation FEATURE_ACTIVATION: FeatureActivationSettings = FeatureActivationSettings() + # Maximum number of GET_TIPS delayed calls per connection while running sync. + MAX_GET_TIPS_DELAYED_CALLS: int = 5 + @classmethod def from_yaml(cls, *, filepath: str) -> 'HathorSettings': """Takes a filepath to a yaml file and returns a validated HathorSettings instance.""" diff --git a/hathor/p2p/sync_v1/agent.py b/hathor/p2p/sync_v1/agent.py index ff551a91d..3cdc8965f 100644 --- a/hathor/p2p/sync_v1/agent.py +++ b/hathor/p2p/sync_v1/agent.py @@ -623,6 +623,11 @@ def send_tips(self, timestamp: Optional[int] = None, include_hashes: bool = Fals """Try to send a TIPS message. If rate limit has been reached, it schedules to send it later.""" if not self.global_rate_limiter.add_hit(self.GlobalRateLimiter.SEND_TIPS): self.log.debug('send_tips throttled') + if len(self._send_tips_call_later) >= settings.MAX_GET_TIPS_DELAYED_CALLS: + self.protocol.send_error_and_close_connection( + 'Too many GET_TIPS message' + ) + return self._send_tips_call_later.append( self.reactor.callLater( 1, self.send_tips, timestamp, include_hashes, offset diff --git a/tests/p2p/test_sync_rate_limiter.py b/tests/p2p/test_sync_rate_limiter.py index 3d7d99291..724448187 100644 --- a/tests/p2p/test_sync_rate_limiter.py +++ b/tests/p2p/test_sync_rate_limiter.py @@ -127,6 +127,57 @@ def test_sync_rate_limiter_delayed_calls_draining(self): # should have been executed self.assertEqual(len(sync1._send_tips_call_later), 0) + def test_sync_rate_limiter_delayed_calls_stop(self): + # Test the draining of delayed calls from _send_tips_call_later list + manager1 = self.create_peer() + manager2 = self.create_peer() + manager2.connections.MAX_ENABLED_SYNC = 0 + + conn12 = FakeConnection(manager1, manager2, latency=0.05) + self.simulator.add_connection(conn12) + self.simulator.run(3600) + + connections = manager2.connections + connections.rate_limiter.reset(connections.GlobalRateLimiter.SEND_TIPS) + connections.enable_rate_limiter(1, 1) + + connected_peers2 = list(manager2.connections.connected_peers.values()) + self.assertEqual(1, len(connected_peers2)) + + protocol1 = connected_peers2[0] + sync1 = protocol1.state.sync_manager + + sync1.send_tips() + self.assertEqual(len(sync1._send_tips_call_later), 0) + + from hathor.conf import HathorSettings + settings = HathorSettings() + + # add delayed calls to the the maximum + max_delayed_calls = settings.MAX_GET_TIPS_DELAYED_CALLS + for count in range(max_delayed_calls): + sync1.send_tips() + + # we should have the maxinum delayed calls + self.assertEqual(len(sync1._send_tips_call_later), max_delayed_calls) + # Transport connection is still open + self.assertFalse(conn12.tr2.disconnecting) + + # add one delayed call beyond the maximum + sync1.send_tips() + # we keep the maximum delayed calls allowed + self.assertEqual(len(sync1._send_tips_call_later), max_delayed_calls) + # Transport connection is aborted + self.assertTrue(conn12.tr2.disconnecting) + + self.simulator.run(30) + + # A residual delayed calls is kept when connection closes + self.assertEqual(len(sync1._send_tips_call_later), max_delayed_calls) + # All residual tasks should have been canceled + for call_later in sync1._send_tips_call_later: + self.assertEqual(call_later.active(), False) + class SyncV1RandomSimulatorTestCase(unittest.SyncV1Params, BaseRandomSimulatorTestCase): __test__ = True