Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 40 additions & 23 deletions hathor/p2p/sync_v2/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None
self._deferred_best_block: Optional[Deferred[_HeightInfo]] = None
self._deferred_peer_block_hashes: Optional[Deferred[list[_HeightInfo]]] = None

# Deferreds used when we are receiving a streaming of vertices.
self._deferred_blockchain_streaming: Optional[Deferred[None]] = None
self._deferred_transactions_streaming: Optional[Deferred[None]] = None

# When syncing blocks we start streaming with all peers
# so the moment I get some repeated blocks, I stop the download
# because it's probably a streaming that I've just received
Expand Down Expand Up @@ -289,16 +293,8 @@ def run_sync(self) -> Generator[Any, Any, None]:
def _run_sync(self) -> Generator[Any, Any, None]:
""" Actual implementation of the sync step logic in run_sync.
"""
if self.receiving_stream:
# If we're receiving a stream, wait for it to finish before running sync.
# If we're sending a stream, do the sync to update the peer's synced block
self.log.debug('receiving stream, try again later')
return

if self.mempool_manager.is_running():
# It's running a mempool sync, so we wait until it finishes
self.log.debug('running mempool sync, try again later')
return
assert not self.receiving_stream
assert not self.mempool_manager.is_running()

assert self.protocol.connections is not None
assert self.tx_storage.indexes is not None
Expand All @@ -308,16 +304,17 @@ def _run_sync(self) -> Generator[Any, Any, None]:
self.log.debug('needed tx exist, sync transactions')
self.update_synced(False)
# TODO: find out whether we can sync transactions from this peer to speed things up
self.run_sync_transactions()
yield self.run_sync_transactions()
return

is_block_synced = yield self.run_sync_blocks()
if is_block_synced:
# our blocks are synced, so sync the mempool
self.state = PeerState.SYNCING_MEMPOOL
self.mempool_manager.run()
yield self.mempool_manager.run()

def run_sync_transactions(self) -> None:
@inlineCallbacks
def run_sync_transactions(self) -> Generator[Any, Any, None]:
""" Run a step of the transaction syncing phase.
"""
self.state = PeerState.SYNCING_TRANSACTIONS
Expand All @@ -344,7 +341,7 @@ def run_sync_transactions(self) -> None:

self.log.info('run sync transactions', start=[i.hex() for i in needed_txs], end_block_hash=block.hash.hex(),
end_block_height=block_height)
self.send_get_transactions_bfs(needed_txs, block.hash)
yield self.start_transactions_streaming(needed_txs, block.hash)

def get_my_best_block(self) -> _HeightInfo:
"""Return my best block info."""
Expand Down Expand Up @@ -410,10 +407,11 @@ def run_sync_blocks(self) -> Generator[Any, Any, bool]:
synced_block=self.synced_block)

# Sync from common block
self.run_block_sync(self.synced_block.id,
self.synced_block.height,
self.peer_best_block.id,
self.peer_best_block.height)
yield self.start_blockchain_streaming(self.synced_block.id,
self.synced_block.height,
self.peer_best_block.id,
self.peer_best_block.height)

return False

def get_tips(self) -> Deferred[list[bytes]]:
Expand Down Expand Up @@ -513,16 +511,20 @@ def _setup_block_streaming(self, start_hash: bytes, start_height: int, end_hash:
self._blk_stream_reverse = reverse
self._last_received_block = None

def run_block_sync(self, start_hash: bytes, start_height: int, end_hash: bytes, end_height: int) -> None:
""" Called when the bestblock is after all checkpoints.

It must syncs to the left until it reaches the remote's best block or the max stream limit.
"""
def start_blockchain_streaming(self,
start_hash: bytes,
start_height: int,
end_hash: bytes,
end_height: int) -> Deferred[None]:
"""Request peer to start streaming blocks to us."""
assert self._deferred_blockchain_streaming is None
self._setup_block_streaming(start_hash, start_height, end_hash, end_height, False)
quantity = end_height - start_height
self.log.info('get next blocks', start_height=start_height, end_height=end_height, quantity=quantity,
start_hash=start_hash.hex(), end_hash=end_hash.hex())
self.send_get_next_blocks(start_hash, end_hash, quantity)
self._deferred_blockchain_streaming = Deferred()
return self._deferred_blockchain_streaming

def send_message(self, cmd: ProtocolMessages, payload: Optional[str] = None) -> None:
""" Helper to send a message.
Expand Down Expand Up @@ -728,6 +730,10 @@ def handle_blocks_end(self, payload: str) -> None:
self.protocol.send_error_and_close_connection('Not expecting to receive BLOCKS-END message')
return

assert self._deferred_blockchain_streaming is not None
self._deferred_blockchain_streaming.callback(None)
self._deferred_blockchain_streaming = None

self.log.debug('block streaming ended', reason=str(response_code))

def handle_blocks(self, payload: str) -> None:
Expand Down Expand Up @@ -879,6 +885,13 @@ def _setup_tx_streaming(self):
self._tx_max_quantity = DEFAULT_STREAMING_LIMIT # XXX: maybe this is redundant
# XXX: what else can we add for checking if everything is going well?

def start_transactions_streaming(self, start_from: list[bytes], until_first_block: bytes) -> Deferred[None]:
"""Request peer to start streaming transactions to us."""
assert self._deferred_transactions_streaming is None
self.send_get_transactions_bfs(start_from, until_first_block)
self._deferred_transactions_streaming = Deferred()
return self._deferred_transactions_streaming

def send_get_transactions_bfs(self, start_from: list[bytes], until_first_block: bytes) -> None:
""" Send a GET-TRANSACTIONS-BFS message.

Expand Down Expand Up @@ -971,6 +984,10 @@ def handle_transactions_end(self, payload: str) -> None:
self.protocol.send_error_and_close_connection('Not expecting to receive TRANSACTIONS-END message')
return

assert self._deferred_transactions_streaming is not None
self._deferred_transactions_streaming.callback(None)
self._deferred_transactions_streaming = None

self.log.debug('transaction streaming ended', reason=str(response_code))

def handle_transaction(self, payload: str) -> None:
Expand Down
14 changes: 12 additions & 2 deletions hathor/p2p/sync_v2/mempool.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def __init__(self, sync_agent: 'NodeBlockSync'):
self.tx_storage = self.manager.tx_storage
self.reactor = self.sync_agent.reactor

self._deferred: Optional[Deferred[None]] = None

# Set of tips we know but couldn't add to the DAG yet.
self.missing_tips: set[bytes] = set()

Expand All @@ -52,21 +54,29 @@ def is_running(self) -> bool:
"""Whether the sync-mempool is currently running."""
return self._is_running

def run(self) -> None:
def run(self) -> Deferred[None]:
"""Starts _run in, won't start again if already running."""
if self.is_running():
self.log.warn('already started')
return
assert self._deferred is not None
return self._deferred
self._is_running = True
self.reactor.callLater(0, self._run)

assert self._deferred is None
self._deferred = Deferred()
return self._deferred

@inlineCallbacks
def _run(self) -> Generator[Deferred, Any, None]:
try:
yield self._unsafe_run()
finally:
# sync_agent.run_sync will start it again when needed
self._is_running = False
assert self._deferred is not None
self._deferred.callback(None)
self._deferred = None

@inlineCallbacks
def _unsafe_run(self) -> Generator[Deferred, Any, None]:
Expand Down