diff --git a/hathor/p2p/sync_v2/agent.py b/hathor/p2p/sync_v2/agent.py index 6cfbf5c6b..0a4f362e7 100644 --- a/hathor/p2p/sync_v2/agent.py +++ b/hathor/p2p/sync_v2/agent.py @@ -117,6 +117,10 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None self._deferred_best_block: Optional[Deferred[_HeightInfo]] = None self._deferred_peer_block_hashes: Optional[Deferred[list[_HeightInfo]]] = None + # Deferreds used when we are receiving a streaming of vertices. + self._deferred_blockchain_streaming: Optional[Deferred[None]] = None + self._deferred_transactions_streaming: Optional[Deferred[None]] = None + # When syncing blocks we start streaming with all peers # so the moment I get some repeated blocks, I stop the download # because it's probably a streaming that I've just received @@ -289,16 +293,8 @@ def run_sync(self) -> Generator[Any, Any, None]: def _run_sync(self) -> Generator[Any, Any, None]: """ Actual implementation of the sync step logic in run_sync. """ - if self.receiving_stream: - # If we're receiving a stream, wait for it to finish before running sync. - # If we're sending a stream, do the sync to update the peer's synced block - self.log.debug('receiving stream, try again later') - return - - if self.mempool_manager.is_running(): - # It's running a mempool sync, so we wait until it finishes - self.log.debug('running mempool sync, try again later') - return + assert not self.receiving_stream + assert not self.mempool_manager.is_running() assert self.protocol.connections is not None assert self.tx_storage.indexes is not None @@ -308,16 +304,17 @@ def _run_sync(self) -> Generator[Any, Any, None]: self.log.debug('needed tx exist, sync transactions') self.update_synced(False) # TODO: find out whether we can sync transactions from this peer to speed things up - self.run_sync_transactions() + yield self.run_sync_transactions() return is_block_synced = yield self.run_sync_blocks() if is_block_synced: # our blocks are synced, so sync the mempool self.state = PeerState.SYNCING_MEMPOOL - self.mempool_manager.run() + yield self.mempool_manager.run() - def run_sync_transactions(self) -> None: + @inlineCallbacks + def run_sync_transactions(self) -> Generator[Any, Any, None]: """ Run a step of the transaction syncing phase. """ self.state = PeerState.SYNCING_TRANSACTIONS @@ -344,7 +341,7 @@ def run_sync_transactions(self) -> None: self.log.info('run sync transactions', start=[i.hex() for i in needed_txs], end_block_hash=block.hash.hex(), end_block_height=block_height) - self.send_get_transactions_bfs(needed_txs, block.hash) + yield self.start_transactions_streaming(needed_txs, block.hash) def get_my_best_block(self) -> _HeightInfo: """Return my best block info.""" @@ -410,10 +407,11 @@ def run_sync_blocks(self) -> Generator[Any, Any, bool]: synced_block=self.synced_block) # Sync from common block - self.run_block_sync(self.synced_block.id, - self.synced_block.height, - self.peer_best_block.id, - self.peer_best_block.height) + yield self.start_blockchain_streaming(self.synced_block.id, + self.synced_block.height, + self.peer_best_block.id, + self.peer_best_block.height) + return False def get_tips(self) -> Deferred[list[bytes]]: @@ -513,16 +511,20 @@ def _setup_block_streaming(self, start_hash: bytes, start_height: int, end_hash: self._blk_stream_reverse = reverse self._last_received_block = None - def run_block_sync(self, start_hash: bytes, start_height: int, end_hash: bytes, end_height: int) -> None: - """ Called when the bestblock is after all checkpoints. - - It must syncs to the left until it reaches the remote's best block or the max stream limit. - """ + def start_blockchain_streaming(self, + start_hash: bytes, + start_height: int, + end_hash: bytes, + end_height: int) -> Deferred[None]: + """Request peer to start streaming blocks to us.""" + assert self._deferred_blockchain_streaming is None self._setup_block_streaming(start_hash, start_height, end_hash, end_height, False) quantity = end_height - start_height self.log.info('get next blocks', start_height=start_height, end_height=end_height, quantity=quantity, start_hash=start_hash.hex(), end_hash=end_hash.hex()) self.send_get_next_blocks(start_hash, end_hash, quantity) + self._deferred_blockchain_streaming = Deferred() + return self._deferred_blockchain_streaming def send_message(self, cmd: ProtocolMessages, payload: Optional[str] = None) -> None: """ Helper to send a message. @@ -728,6 +730,10 @@ def handle_blocks_end(self, payload: str) -> None: self.protocol.send_error_and_close_connection('Not expecting to receive BLOCKS-END message') return + assert self._deferred_blockchain_streaming is not None + self._deferred_blockchain_streaming.callback(None) + self._deferred_blockchain_streaming = None + self.log.debug('block streaming ended', reason=str(response_code)) def handle_blocks(self, payload: str) -> None: @@ -879,6 +885,13 @@ def _setup_tx_streaming(self): self._tx_max_quantity = DEFAULT_STREAMING_LIMIT # XXX: maybe this is redundant # XXX: what else can we add for checking if everything is going well? + def start_transactions_streaming(self, start_from: list[bytes], until_first_block: bytes) -> Deferred[None]: + """Request peer to start streaming transactions to us.""" + assert self._deferred_transactions_streaming is None + self.send_get_transactions_bfs(start_from, until_first_block) + self._deferred_transactions_streaming = Deferred() + return self._deferred_transactions_streaming + def send_get_transactions_bfs(self, start_from: list[bytes], until_first_block: bytes) -> None: """ Send a GET-TRANSACTIONS-BFS message. @@ -971,6 +984,10 @@ def handle_transactions_end(self, payload: str) -> None: self.protocol.send_error_and_close_connection('Not expecting to receive TRANSACTIONS-END message') return + assert self._deferred_transactions_streaming is not None + self._deferred_transactions_streaming.callback(None) + self._deferred_transactions_streaming = None + self.log.debug('transaction streaming ended', reason=str(response_code)) def handle_transaction(self, payload: str) -> None: diff --git a/hathor/p2p/sync_v2/mempool.py b/hathor/p2p/sync_v2/mempool.py index 97020eff1..552068fb1 100644 --- a/hathor/p2p/sync_v2/mempool.py +++ b/hathor/p2p/sync_v2/mempool.py @@ -39,6 +39,8 @@ def __init__(self, sync_agent: 'NodeBlockSync'): self.tx_storage = self.manager.tx_storage self.reactor = self.sync_agent.reactor + self._deferred: Optional[Deferred[None]] = None + # Set of tips we know but couldn't add to the DAG yet. self.missing_tips: set[bytes] = set() @@ -52,14 +54,19 @@ def is_running(self) -> bool: """Whether the sync-mempool is currently running.""" return self._is_running - def run(self) -> None: + def run(self) -> Deferred[None]: """Starts _run in, won't start again if already running.""" if self.is_running(): self.log.warn('already started') - return + assert self._deferred is not None + return self._deferred self._is_running = True self.reactor.callLater(0, self._run) + assert self._deferred is None + self._deferred = Deferred() + return self._deferred + @inlineCallbacks def _run(self) -> Generator[Deferred, Any, None]: try: @@ -67,6 +74,9 @@ def _run(self) -> Generator[Deferred, Any, None]: finally: # sync_agent.run_sync will start it again when needed self._is_running = False + assert self._deferred is not None + self._deferred.callback(None) + self._deferred = None @inlineCallbacks def _unsafe_run(self) -> Generator[Deferred, Any, None]: