diff --git a/hathor/p2p/sync_v2/agent.py b/hathor/p2p/sync_v2/agent.py index eff6eda1d..24ab4516d 100644 --- a/hathor/p2p/sync_v2/agent.py +++ b/hathor/p2p/sync_v2/agent.py @@ -502,6 +502,13 @@ def start_blockchain_streaming(self, self.send_get_next_blocks(start_block.id, end_block.id, quantity) return self._blk_streaming_client.wait() + def stop_blk_streaming_server(self, response_code: StreamEnd) -> None: + """Stop blockchain streaming server.""" + assert self._blk_streaming_server is not None + self._blk_streaming_server.stop() + self._blk_streaming_server = None + self.send_blocks_end(response_code) + def send_message(self, cmd: ProtocolMessages, payload: Optional[str] = None) -> None: """ Helper to send a message. """ @@ -678,7 +685,7 @@ def send_next_blocks(self, start_block: Block, end_hash: bytes, quantity: int) - """ self.log.debug('start NEXT-BLOCKS stream') if self._blk_streaming_server is not None and self._blk_streaming_server.is_running: - self._blk_streaming_server.stop() + self.stop_blk_streaming_server(StreamEnd.PER_REQUEST) limit = min(quantity, self.DEFAULT_STREAMING_LIMIT) self._blk_streaming_server = BlockchainStreamingServer(self, start_block, end_hash, limit=limit) self._blk_streaming_server.start() @@ -760,8 +767,7 @@ def handle_stop_block_streaming(self, payload: str) -> None: return self.log.debug('got stop streaming message') - self._blk_streaming_server.stop() - self._blk_streaming_server = None + self.stop_blk_streaming_server(StreamEnd.PER_REQUEST) def send_stop_transactions_streaming(self) -> None: """ Send a STOP-TRANSACTIONS-STREAMING message. @@ -780,8 +786,7 @@ def handle_stop_transactions_streaming(self, payload: str) -> None: return self.log.debug('got stop streaming message') - self._tx_streaming_server.stop() - self._tx_streaming_server = None + self.stop_tx_streaming_server(StreamEnd.PER_REQUEST) def get_peer_best_block(self) -> Deferred[_HeightInfo]: """ Async call to get the remote peer's best block. @@ -853,6 +858,13 @@ def resume_transactions_streaming(self) -> Deferred[StreamEnd]: self.send_get_transactions_bfs(start_from, first_block_hash, last_block_hash) return self._tx_streaming_client.resume() + def stop_tx_streaming_server(self, response_code: StreamEnd) -> None: + """Stop transaction streaming server.""" + assert self._tx_streaming_server is not None + self._tx_streaming_server.stop() + self._tx_streaming_server = None + self.send_transactions_end(response_code) + def send_get_transactions_bfs(self, start_from: list[bytes], first_block_hash: bytes, @@ -921,7 +933,7 @@ def handle_get_transactions_bfs(self, payload: str) -> None: vertex_id=tx.hash.hex(), first_block=first_block.hash.hex(), vertex_first_block=meta.first_block) - self.send_blocks_end(StreamEnd.INVALID_PARAMS) + self.send_transactions_end(StreamEnd.INVALID_PARAMS) return start_from_txs.append(tx) @@ -934,7 +946,7 @@ def send_transactions_bfs(self, """ Start a transactions BFS stream. """ if self._tx_streaming_server is not None and self._tx_streaming_server.is_running: - self._tx_streaming_server.stop() + self.stop_tx_streaming_server(StreamEnd.PER_REQUEST) self._tx_streaming_server = TransactionsStreamingServer(self, start_from, first_block, diff --git a/hathor/p2p/sync_v2/streamers.py b/hathor/p2p/sync_v2/streamers.py index 1290a1e08..b51f7030c 100644 --- a/hathor/p2p/sync_v2/streamers.py +++ b/hathor/p2p/sync_v2/streamers.py @@ -41,6 +41,7 @@ class StreamEnd(IntFlag): TX_NOT_CONFIRMED = 4 INVALID_PARAMS = 5 INTERNAL_ERROR = 6 + PER_REQUEST = 7 def __str__(self): if self is StreamEnd.END_HASH_REACHED: @@ -57,6 +58,8 @@ def __str__(self): return 'streamed with invalid parameters' elif self is StreamEnd.INTERNAL_ERROR: return 'internal error' + elif self is StreamEnd.PER_REQUEST: + return 'stopped per request' else: raise ValueError(f'invalid StreamEnd value: {self.value}') @@ -99,12 +102,15 @@ def safe_send_next(self) -> None: try: self.send_next() except Exception: - self.stop() - self.sync_agent.send_blocks_end(StreamEnd.INTERNAL_ERROR) + self._stop_streaming_server(StreamEnd.INTERNAL_ERROR) raise else: self.schedule_if_needed() + def _stop_streaming_server(self, response_code: StreamEnd) -> None: + """Stop streaming server.""" + raise NotImplementedError + def start(self) -> None: """Start pushing.""" self.log.debug('start streaming') @@ -153,6 +159,9 @@ def __init__(self, sync_agent: 'NodeBlockSync', start_block: Block, end_hash: by self.end_hash = end_hash self.reverse = reverse + def _stop_streaming_server(self, response_code: StreamEnd) -> None: + self.sync_agent.stop_blk_streaming_server(response_code) + def send_next(self) -> None: """Push next block to peer.""" assert self.is_running @@ -165,8 +174,7 @@ def send_next(self) -> None: meta = cur.get_metadata() if meta.voided_by: - self.stop() - self.sync_agent.send_blocks_end(StreamEnd.STREAM_BECAME_VOIDED) + self.sync_agent.stop_blk_streaming_server(StreamEnd.STREAM_BECAME_VOIDED) return if cur.hash == self.end_hash: @@ -174,8 +182,7 @@ def send_next(self) -> None: if not self.reverse: self.log.debug('send next block', blk_id=cur.hash.hex()) self.sync_agent.send_blocks(cur) - self.stop() - self.sync_agent.send_blocks_end(StreamEnd.END_HASH_REACHED) + self.sync_agent.stop_blk_streaming_server(StreamEnd.END_HASH_REACHED) return if self.counter >= self.limit: @@ -183,8 +190,7 @@ def send_next(self) -> None: if not self.reverse: self.log.debug('send next block', blk_id=cur.hash.hex()) self.sync_agent.send_blocks(cur) - self.stop() - self.sync_agent.send_blocks_end(StreamEnd.LIMIT_EXCEEDED) + self.sync_agent.stop_blk_streaming_server(StreamEnd.LIMIT_EXCEEDED) return self.counter += 1 @@ -199,8 +205,7 @@ def send_next(self) -> None: # XXX: don't send the genesis or the current block if self.current_block is None or self.current_block.is_genesis: - self.stop() - self.sync_agent.send_blocks_end(StreamEnd.NO_MORE_BLOCKS) + self.sync_agent.stop_blk_streaming_server(StreamEnd.NO_MORE_BLOCKS) return @@ -235,6 +240,9 @@ def __init__(self, self.bfs = BFSOrderWalk(self.tx_storage, is_dag_verifications=True, is_dag_funds=True, is_left_to_right=False) self.iter = self.get_iter() + def _stop_streaming_server(self, response_code: StreamEnd) -> None: + self.sync_agent.stop_tx_streaming_server(response_code) + def get_iter(self) -> Iterator[BaseTransaction]: """Return an iterator that yields all transactions confirmed by each block in sequence.""" root: Union[BaseTransaction, Iterable[BaseTransaction]] @@ -258,8 +266,7 @@ def get_iter(self) -> Iterator[BaseTransaction]: # Check if this block is still in the best blockchain. if self.current_block.get_metadata().voided_by: - self.stop() - self.sync_agent.send_blocks_end(StreamEnd.STREAM_BECAME_VOIDED) + self.sync_agent.stop_tx_streaming_server(StreamEnd.STREAM_BECAME_VOIDED) return self.current_block = self.current_block.get_next_block_best_chain() @@ -275,8 +282,7 @@ def send_next(self) -> None: except StopIteration: # nothing more to send self.log.debug('no more transactions, stopping streaming') - self.stop() - self.sync_agent.send_transactions_end(StreamEnd.END_HASH_REACHED) + self.sync_agent.stop_tx_streaming_server(StreamEnd.END_HASH_REACHED) return # Skip blocks. @@ -290,8 +296,7 @@ def send_next(self) -> None: cur_metadata = cur.get_metadata() if cur_metadata.first_block is None: self.log.debug('reached a tx that is not confirmed, stopping streaming') - self.stop() - self.sync_agent.send_transactions_end(StreamEnd.TX_NOT_CONFIRMED) + self.sync_agent.stop_tx_streaming_server(StreamEnd.TX_NOT_CONFIRMED) return # Check if tx is confirmed by the `self.current_block` or any next block. @@ -308,6 +313,6 @@ def send_next(self) -> None: self.counter += 1 if self.counter >= self.limit: - self.stop() - self.sync_agent.send_transactions_end(StreamEnd.LIMIT_EXCEEDED) + self.log.debug('limit exceeded, stopping streaming') + self.sync_agent.stop_tx_streaming_server(StreamEnd.LIMIT_EXCEEDED) return