From a27e9a4321a5a0725e651a70b63e227a992e8688 Mon Sep 17 00:00:00 2001 From: Marcelo Salhab Brogliato Date: Fri, 27 Oct 2023 00:16:45 -0500 Subject: [PATCH] fix(sync-v2): Fix streamer sending more blocks than expected --- hathor/p2p/sync_v2/agent.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/hathor/p2p/sync_v2/agent.py b/hathor/p2p/sync_v2/agent.py index c252713e0..b22faa937 100644 --- a/hathor/p2p/sync_v2/agent.py +++ b/hathor/p2p/sync_v2/agent.py @@ -479,7 +479,7 @@ def run_block_sync(self, start_hash: bytes, start_height: int, end_hash: bytes, quantity = end_height - start_height self.log.info('get next blocks', start_height=start_height, end_height=end_height, quantity=quantity, start_hash=start_hash.hex(), end_hash=end_hash.hex()) - self.send_get_next_blocks(start_hash, end_hash) + self.send_get_next_blocks(start_hash, end_hash, quantity) def send_message(self, cmd: ProtocolMessages, payload: Optional[str] = None) -> None: """ Helper to send a message. @@ -603,12 +603,13 @@ def handle_peer_block_hashes(self, payload: str) -> None: if deferred: deferred.callback(data) - def send_get_next_blocks(self, start_hash: bytes, end_hash: bytes) -> None: + def send_get_next_blocks(self, start_hash: bytes, end_hash: bytes, quantity: int) -> None: """ Send a PEER-BLOCK-HASHES message. """ payload = json.dumps(dict( start_hash=start_hash.hex(), end_hash=end_hash.hex(), + quantity=quantity, )) self.send_message(ProtocolMessages.GET_NEXT_BLOCKS, payload) self.receiving_stream = True @@ -616,7 +617,7 @@ def send_get_next_blocks(self, start_hash: bytes, end_hash: bytes) -> None: def handle_get_next_blocks(self, payload: str) -> None: """ Handle a GET-NEXT-BLOCKS message. """ - self.log.debug('handle GET-NEXT-BLOCKS') + self.log.debug('handle GET-NEXT-BLOCKS', payload=payload) if self._is_streaming: self.protocol.send_error_and_close_connection('GET-NEXT-BLOCKS received before previous one finished') return @@ -624,9 +625,10 @@ def handle_get_next_blocks(self, payload: str) -> None: self.send_next_blocks( start_hash=bytes.fromhex(data['start_hash']), end_hash=bytes.fromhex(data['end_hash']), + quantity=data['quantity'], ) - def send_next_blocks(self, start_hash: bytes, end_hash: bytes) -> None: + def send_next_blocks(self, start_hash: bytes, end_hash: bytes, quantity: int) -> None: """ Send a NEXT-BLOCKS message. """ self.log.debug('start NEXT-BLOCKS stream') @@ -651,7 +653,8 @@ def send_next_blocks(self, start_hash: bytes, end_hash: bytes) -> None: # return if self.blockchain_streaming is not None and self.blockchain_streaming.is_running: self.blockchain_streaming.stop() - self.blockchain_streaming = BlockchainStreaming(self, blk, end_hash, limit=self.DEFAULT_STREAMING_LIMIT) + limit = min(quantity, self.DEFAULT_STREAMING_LIMIT) + self.blockchain_streaming = BlockchainStreaming(self, blk, end_hash, limit=limit) self.blockchain_streaming.start() def send_blocks(self, blk: Block) -> None: @@ -712,7 +715,10 @@ def handle_blocks(self, payload: str) -> None: self._blk_received += 1 if self._blk_received > self._blk_max_quantity + 1: - self.log.warn('too many blocks received', last_block=blk.hash_hex) + self.log.warn('too many blocks received', + blk_received=self._blk_received, + blk_max_quantity=self._blk_max_quantity, + last_block=blk.hash_hex) # Too many blocks. Punish peer? self.state = PeerState.ERROR return