Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions hathor/p2p/sync_v2/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,13 @@ def start_blockchain_streaming(self,
self.send_get_next_blocks(start_block.id, end_block.id, quantity)
return self._blk_streaming_client.wait()

def stop_blk_streaming_server(self, response_code: StreamEnd) -> None:
"""Stop blockchain streaming server."""
assert self._blk_streaming_server is not None
self._blk_streaming_server.stop()
self._blk_streaming_server = None
self.send_blocks_end(response_code)

def send_message(self, cmd: ProtocolMessages, payload: Optional[str] = None) -> None:
""" Helper to send a message.
"""
Expand Down Expand Up @@ -678,7 +685,7 @@ def send_next_blocks(self, start_block: Block, end_hash: bytes, quantity: int) -
"""
self.log.debug('start NEXT-BLOCKS stream')
if self._blk_streaming_server is not None and self._blk_streaming_server.is_running:
self._blk_streaming_server.stop()
self.stop_blk_streaming_server(StreamEnd.PER_REQUEST)
limit = min(quantity, self.DEFAULT_STREAMING_LIMIT)
self._blk_streaming_server = BlockchainStreamingServer(self, start_block, end_hash, limit=limit)
self._blk_streaming_server.start()
Expand Down Expand Up @@ -760,8 +767,7 @@ def handle_stop_block_streaming(self, payload: str) -> None:
return

self.log.debug('got stop streaming message')
self._blk_streaming_server.stop()
self._blk_streaming_server = None
self.stop_blk_streaming_server(StreamEnd.PER_REQUEST)

def send_stop_transactions_streaming(self) -> None:
""" Send a STOP-TRANSACTIONS-STREAMING message.
Expand All @@ -780,8 +786,7 @@ def handle_stop_transactions_streaming(self, payload: str) -> None:
return

self.log.debug('got stop streaming message')
self._tx_streaming_server.stop()
self._tx_streaming_server = None
self.stop_tx_streaming_server(StreamEnd.PER_REQUEST)

def get_peer_best_block(self) -> Deferred[_HeightInfo]:
""" Async call to get the remote peer's best block.
Expand Down Expand Up @@ -853,6 +858,13 @@ def resume_transactions_streaming(self) -> Deferred[StreamEnd]:
self.send_get_transactions_bfs(start_from, first_block_hash, last_block_hash)
return self._tx_streaming_client.resume()

def stop_tx_streaming_server(self, response_code: StreamEnd) -> None:
"""Stop transaction streaming server."""
assert self._tx_streaming_server is not None
self._tx_streaming_server.stop()
self._tx_streaming_server = None
self.send_transactions_end(response_code)

def send_get_transactions_bfs(self,
start_from: list[bytes],
first_block_hash: bytes,
Expand Down Expand Up @@ -921,7 +933,7 @@ def handle_get_transactions_bfs(self, payload: str) -> None:
vertex_id=tx.hash.hex(),
first_block=first_block.hash.hex(),
vertex_first_block=meta.first_block)
self.send_blocks_end(StreamEnd.INVALID_PARAMS)
self.send_transactions_end(StreamEnd.INVALID_PARAMS)
return
start_from_txs.append(tx)

Expand All @@ -934,7 +946,7 @@ def send_transactions_bfs(self,
""" Start a transactions BFS stream.
"""
if self._tx_streaming_server is not None and self._tx_streaming_server.is_running:
self._tx_streaming_server.stop()
self.stop_tx_streaming_server(StreamEnd.PER_REQUEST)
self._tx_streaming_server = TransactionsStreamingServer(self,
start_from,
first_block,
Expand Down
41 changes: 23 additions & 18 deletions hathor/p2p/sync_v2/streamers.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class StreamEnd(IntFlag):
TX_NOT_CONFIRMED = 4
INVALID_PARAMS = 5
INTERNAL_ERROR = 6
PER_REQUEST = 7

def __str__(self):
if self is StreamEnd.END_HASH_REACHED:
Expand All @@ -57,6 +58,8 @@ def __str__(self):
return 'streamed with invalid parameters'
elif self is StreamEnd.INTERNAL_ERROR:
return 'internal error'
elif self is StreamEnd.PER_REQUEST:
return 'stopped per request'
else:
raise ValueError(f'invalid StreamEnd value: {self.value}')

Expand Down Expand Up @@ -99,12 +102,15 @@ def safe_send_next(self) -> None:
try:
self.send_next()
except Exception:
self.stop()
self.sync_agent.send_blocks_end(StreamEnd.INTERNAL_ERROR)
self._stop_streaming_server(StreamEnd.INTERNAL_ERROR)
raise
else:
self.schedule_if_needed()

def _stop_streaming_server(self, response_code: StreamEnd) -> None:
"""Stop streaming server."""
raise NotImplementedError

def start(self) -> None:
"""Start pushing."""
self.log.debug('start streaming')
Expand Down Expand Up @@ -153,6 +159,9 @@ def __init__(self, sync_agent: 'NodeBlockSync', start_block: Block, end_hash: by
self.end_hash = end_hash
self.reverse = reverse

def _stop_streaming_server(self, response_code: StreamEnd) -> None:
self.sync_agent.stop_blk_streaming_server(response_code)

def send_next(self) -> None:
"""Push next block to peer."""
assert self.is_running
Expand All @@ -165,26 +174,23 @@ def send_next(self) -> None:

meta = cur.get_metadata()
if meta.voided_by:
self.stop()
self.sync_agent.send_blocks_end(StreamEnd.STREAM_BECAME_VOIDED)
self.sync_agent.stop_blk_streaming_server(StreamEnd.STREAM_BECAME_VOIDED)
return

if cur.hash == self.end_hash:
# only send the last when not reverse
if not self.reverse:
self.log.debug('send next block', blk_id=cur.hash.hex())
self.sync_agent.send_blocks(cur)
self.stop()
self.sync_agent.send_blocks_end(StreamEnd.END_HASH_REACHED)
self.sync_agent.stop_blk_streaming_server(StreamEnd.END_HASH_REACHED)
return

if self.counter >= self.limit:
# only send the last when not reverse
if not self.reverse:
self.log.debug('send next block', blk_id=cur.hash.hex())
self.sync_agent.send_blocks(cur)
self.stop()
self.sync_agent.send_blocks_end(StreamEnd.LIMIT_EXCEEDED)
self.sync_agent.stop_blk_streaming_server(StreamEnd.LIMIT_EXCEEDED)
return

self.counter += 1
Expand All @@ -199,8 +205,7 @@ def send_next(self) -> None:

# XXX: don't send the genesis or the current block
if self.current_block is None or self.current_block.is_genesis:
self.stop()
self.sync_agent.send_blocks_end(StreamEnd.NO_MORE_BLOCKS)
self.sync_agent.stop_blk_streaming_server(StreamEnd.NO_MORE_BLOCKS)
return


Expand Down Expand Up @@ -235,6 +240,9 @@ def __init__(self,
self.bfs = BFSOrderWalk(self.tx_storage, is_dag_verifications=True, is_dag_funds=True, is_left_to_right=False)
self.iter = self.get_iter()

def _stop_streaming_server(self, response_code: StreamEnd) -> None:
self.sync_agent.stop_tx_streaming_server(response_code)

def get_iter(self) -> Iterator[BaseTransaction]:
"""Return an iterator that yields all transactions confirmed by each block in sequence."""
root: Union[BaseTransaction, Iterable[BaseTransaction]]
Expand All @@ -258,8 +266,7 @@ def get_iter(self) -> Iterator[BaseTransaction]:

# Check if this block is still in the best blockchain.
if self.current_block.get_metadata().voided_by:
self.stop()
self.sync_agent.send_blocks_end(StreamEnd.STREAM_BECAME_VOIDED)
self.sync_agent.stop_tx_streaming_server(StreamEnd.STREAM_BECAME_VOIDED)
return

self.current_block = self.current_block.get_next_block_best_chain()
Expand All @@ -275,8 +282,7 @@ def send_next(self) -> None:
except StopIteration:
# nothing more to send
self.log.debug('no more transactions, stopping streaming')
self.stop()
self.sync_agent.send_transactions_end(StreamEnd.END_HASH_REACHED)
self.sync_agent.stop_tx_streaming_server(StreamEnd.END_HASH_REACHED)
return

# Skip blocks.
Expand All @@ -290,8 +296,7 @@ def send_next(self) -> None:
cur_metadata = cur.get_metadata()
if cur_metadata.first_block is None:
self.log.debug('reached a tx that is not confirmed, stopping streaming')
self.stop()
self.sync_agent.send_transactions_end(StreamEnd.TX_NOT_CONFIRMED)
self.sync_agent.stop_tx_streaming_server(StreamEnd.TX_NOT_CONFIRMED)
return

# Check if tx is confirmed by the `self.current_block` or any next block.
Expand All @@ -308,6 +313,6 @@ def send_next(self) -> None:

self.counter += 1
if self.counter >= self.limit:
self.stop()
self.sync_agent.send_transactions_end(StreamEnd.LIMIT_EXCEEDED)
self.log.debug('limit exceeded, stopping streaming')
self.sync_agent.stop_tx_streaming_server(StreamEnd.LIMIT_EXCEEDED)
return