Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions hathor/p2p/sync_v2/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ def run_block_sync(self, start_hash: bytes, start_height: int, end_hash: bytes,
quantity = end_height - start_height
self.log.info('get next blocks', start_height=start_height, end_height=end_height, quantity=quantity,
start_hash=start_hash.hex(), end_hash=end_hash.hex())
self.send_get_next_blocks(start_hash, end_hash)
self.send_get_next_blocks(start_hash, end_hash, quantity)

def send_message(self, cmd: ProtocolMessages, payload: Optional[str] = None) -> None:
""" Helper to send a message.
Expand Down Expand Up @@ -603,30 +603,32 @@ def handle_peer_block_hashes(self, payload: str) -> None:
if deferred:
deferred.callback(data)

def send_get_next_blocks(self, start_hash: bytes, end_hash: bytes) -> None:
def send_get_next_blocks(self, start_hash: bytes, end_hash: bytes, quantity: int) -> None:
""" Send a PEER-BLOCK-HASHES message.
"""
payload = json.dumps(dict(
start_hash=start_hash.hex(),
end_hash=end_hash.hex(),
quantity=quantity,
))
self.send_message(ProtocolMessages.GET_NEXT_BLOCKS, payload)
self.receiving_stream = True

def handle_get_next_blocks(self, payload: str) -> None:
""" Handle a GET-NEXT-BLOCKS message.
"""
self.log.debug('handle GET-NEXT-BLOCKS')
self.log.debug('handle GET-NEXT-BLOCKS', payload=payload)
if self._is_streaming:
self.protocol.send_error_and_close_connection('GET-NEXT-BLOCKS received before previous one finished')
return
data = json.loads(payload)
self.send_next_blocks(
start_hash=bytes.fromhex(data['start_hash']),
end_hash=bytes.fromhex(data['end_hash']),
quantity=data['quantity'],
)

def send_next_blocks(self, start_hash: bytes, end_hash: bytes) -> None:
def send_next_blocks(self, start_hash: bytes, end_hash: bytes, quantity: int) -> None:
""" Send a NEXT-BLOCKS message.
"""
self.log.debug('start NEXT-BLOCKS stream')
Expand All @@ -651,7 +653,8 @@ def send_next_blocks(self, start_hash: bytes, end_hash: bytes) -> None:
# return
if self.blockchain_streaming is not None and self.blockchain_streaming.is_running:
self.blockchain_streaming.stop()
self.blockchain_streaming = BlockchainStreaming(self, blk, end_hash, limit=self.DEFAULT_STREAMING_LIMIT)
limit = min(quantity, self.DEFAULT_STREAMING_LIMIT)
self.blockchain_streaming = BlockchainStreaming(self, blk, end_hash, limit=limit)
self.blockchain_streaming.start()

def send_blocks(self, blk: Block) -> None:
Expand Down Expand Up @@ -712,7 +715,10 @@ def handle_blocks(self, payload: str) -> None:

self._blk_received += 1
if self._blk_received > self._blk_max_quantity + 1:
self.log.warn('too many blocks received', last_block=blk.hash_hex)
self.log.warn('too many blocks received',
blk_received=self._blk_received,
blk_max_quantity=self._blk_max_quantity,
last_block=blk.hash_hex)
# Too many blocks. Punish peer?
self.state = PeerState.ERROR
return
Expand Down