From db38db4a07a85a39c8dfbfb67e5f610c04fb5abb Mon Sep 17 00:00:00 2001 From: Marcelo Salhab Brogliato Date: Thu, 9 Nov 2023 09:37:12 -0600 Subject: [PATCH] feat(sync-v2): Improve overall logging --- hathor/p2p/sync_v2/agent.py | 24 ++++++++++++------- .../sync_v2/blockchain_streaming_client.py | 2 +- hathor/p2p/sync_v2/streamers.py | 2 +- .../sync_v2/transaction_streaming_client.py | 4 ++-- 4 files changed, 20 insertions(+), 12 deletions(-) diff --git a/hathor/p2p/sync_v2/agent.py b/hathor/p2p/sync_v2/agent.py index 451002887..67ee52dc4 100644 --- a/hathor/p2p/sync_v2/agent.py +++ b/hathor/p2p/sync_v2/agent.py @@ -59,6 +59,9 @@ class _HeightInfo(NamedTuple): def __repr__(self): return f'_HeightInfo({self.height}, {self.id.hex()})' + def __str__(self): + return f'({self.height}, {self.id.hex()})' + def to_json(self) -> dict[str, Any]: return { 'height': self.height, @@ -249,7 +252,7 @@ def handle_not_found(self, payload: str) -> None: """ # XXX: NOT_FOUND is a valid message, but we shouldn't ever receive it unless the other peer is running with a # modified code or if there is a bug - self.log.warn('not found? close connection', payload=payload) + self.log.warn('vertex not found? close connection', payload=payload) self.protocol.send_error_and_close_connection('Unexpected NOT_FOUND') def handle_error(self, payload: str) -> None: @@ -325,7 +328,8 @@ def run_sync_blocks(self) -> Generator[Any, Any, bool]: # Are we synced? if self.peer_best_block == my_best_block: # Yes, we are synced! \o/ - self.log.info('blocks are synced', best_block=my_best_block) + if not self.is_synced(): + self.log.info('blocks are synced', best_block=my_best_block) self.update_synced(True) self.send_relay(enable=True) self.synced_block = self.peer_best_block @@ -337,8 +341,9 @@ def run_sync_blocks(self) -> Generator[Any, Any, bool]: common_block_hash = self.tx_storage.indexes.height.get(self.peer_best_block.height) if common_block_hash == self.peer_best_block.id: # If yes, nothing to sync from this peer. - self.log.info('nothing to sync because peer is behind me at the same best blockchain', - my_best_block=my_best_block, peer_best_block=self.peer_best_block) + if not self.is_synced(): + self.log.info('nothing to sync because peer is behind me at the same best blockchain', + my_best_block=my_best_block, peer_best_block=self.peer_best_block) self.update_synced(True) self.send_relay(enable=True) self.synced_block = self.peer_best_block @@ -476,6 +481,9 @@ def start_blockchain_streaming(self, start_block: _HeightInfo, end_block: _HeightInfo) -> Deferred[StreamEnd]: """Request peer to start streaming blocks to us.""" + self.log.info('requesting blocks streaming', + start_block=start_block, + end_block=end_block) self._blk_streaming_client = BlockchainStreamingClient(self, start_block, end_block) quantity = self._blk_streaming_client._blk_max_quantity self.send_get_next_blocks(start_block.id, end_block.id, quantity) @@ -508,7 +516,7 @@ def find_best_common_block(self, lo = _HeightInfo(height=0, id=self._settings.GENESIS_BLOCK_HASH) while hi.height - lo.height > 1: - self.log.info('find_best_common_block n-ary search query', lo=lo, hi=hi) + self.log.debug('find_best_common_block n-ary search query', lo=lo, hi=hi) step = math.ceil((hi.height - lo.height) / 10) heights = list(range(lo.height, hi.height, step)) heights.append(hi.height) @@ -1102,8 +1110,8 @@ def handle_data(self, payload: str) -> None: # If we have not requested the data, it is a new transaction being propagated # in the network, thus, we propagate it as well. if tx.can_validate_full(): - self.log.info('tx received in real time from peer', tx=tx.hash_hex, peer=self.protocol.get_peer_id()) + self.log.debug('tx received in real time from peer', tx=tx.hash_hex, peer=self.protocol.get_peer_id()) self.manager.on_new_tx(tx, propagate_to_peers=True) else: - self.log.info('skipping tx received in real time from peer', - tx=tx.hash_hex, peer=self.protocol.get_peer_id()) + self.log.debug('skipping tx received in real time from peer', + tx=tx.hash_hex, peer=self.protocol.get_peer_id()) diff --git a/hathor/p2p/sync_v2/blockchain_streaming_client.py b/hathor/p2p/sync_v2/blockchain_streaming_client.py index c2b8e0c0b..3635396b9 100644 --- a/hathor/p2p/sync_v2/blockchain_streaming_client.py +++ b/hathor/p2p/sync_v2/blockchain_streaming_client.py @@ -111,7 +111,7 @@ def handle_blocks(self, blk: Block) -> None: self._blk_repeated += 1 is_duplicated = True if self._blk_repeated > self.max_repeated_blocks: - self.log.debug('too many repeated block received', total_repeated=self._blk_repeated) + self.log.info('too many repeated block received', total_repeated=self._blk_repeated) self.fails(TooManyRepeatedVerticesError()) self._last_received_block = blk return diff --git a/hathor/p2p/sync_v2/streamers.py b/hathor/p2p/sync_v2/streamers.py index 1e9bc8b8a..52d767b2d 100644 --- a/hathor/p2p/sync_v2/streamers.py +++ b/hathor/p2p/sync_v2/streamers.py @@ -234,7 +234,7 @@ def get_iter(self) -> Iterator[BaseTransaction]: else: root = self.start_from skip_root = False - self.log.debug('sending transactions from block', + self.log.debug('iterating over transactions from block', block=not_none(self.current_block.hash).hex(), height=self.current_block.get_height(), start_from=self.start_from, diff --git a/hathor/p2p/sync_v2/transaction_streaming_client.py b/hathor/p2p/sync_v2/transaction_streaming_client.py index bac0f94af..7a3837405 100644 --- a/hathor/p2p/sync_v2/transaction_streaming_client.py +++ b/hathor/p2p/sync_v2/transaction_streaming_client.py @@ -103,7 +103,7 @@ def handle_transaction(self, tx: BaseTransaction) -> None: if tx.hash not in self._waiting_for: if tx.hash in self._db: # This case might happen during a resume, so we just log and keep syncing. - self.log.info('duplicated vertex received', tx_id=tx.hash.hex()) + self.log.debug('duplicated vertex received', tx_id=tx.hash.hex()) else: # TODO Uncomment the following code to fail on receiving unexpected vertices. # self.fails(UnexpectedVertex(tx.hash.hex())) @@ -140,7 +140,7 @@ def handle_transactions_end(self, response_code: StreamEnd) -> None: """This method is called by the sync agent when a TRANSACTIONS-END message is received.""" if self._deferred.called: return - self.log.info('transactions streaming ended', waiting_for=len(self._waiting_for)) + self.log.info('transactions streaming ended', reason=response_code, waiting_for=len(self._waiting_for)) self._deferred.callback(response_code) def _execute_and_prepare_next(self) -> None: