Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hathor/_openapi/openapi_base.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
],
"info": {
"title": "Hathor API",
"version": "0.68.2"
"version": "0.68.3"
},
"consumes": [
"application/json"
Expand Down
1 change: 1 addition & 0 deletions hathor/indexes/mempool_tips_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def iter_all(self, tx_storage: 'TransactionStorage') -> Iterator[Transaction]:
bfs = BFSTimestampWalk(tx_storage, is_dag_verifications=True, is_dag_funds=True, is_left_to_right=False)
for tx in bfs.run(self.iter(tx_storage), skip_root=False):
if not isinstance(tx, Transaction):
bfs.skip_neighbors(tx)
continue
if tx.get_metadata().first_block is not None:
bfs.skip_neighbors(tx)
Expand Down
46 changes: 25 additions & 21 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ def generate_parent_txs(self, timestamp: Optional[float]) -> 'ParentTxs':
best_block = self.tx_storage.get_best_block()
assert timestamp >= best_block.timestamp

def get_tx_parents(tx: BaseTransaction) -> list[Transaction]:
def get_tx_parents(tx: BaseTransaction, *, with_inputs: bool = False) -> list[Transaction]:
if tx.is_genesis:
genesis_txs = [self._settings.GENESIS_TX1_HASH, self._settings.GENESIS_TX2_HASH]
if tx.is_transaction:
Expand All @@ -590,34 +590,38 @@ def get_tx_parents(tx: BaseTransaction) -> list[Transaction]:

parents = tx.get_tx_parents()
assert len(parents) == 2
return list(parents)

unconfirmed_tips = [tx for tx in self.tx_storage.iter_mempool_tips() if tx.timestamp < timestamp]
unconfirmed_extras = sorted(
(tx for tx in self.tx_storage.iter_mempool() if tx.timestamp < timestamp and tx not in unconfirmed_tips),
key=lambda tx: tx.timestamp,
)
txs = list(parents)
if with_inputs:
input_tx_ids = set(i.tx_id for i in tx.inputs)
inputs = (self.tx_storage.get_transaction(tx_id) for tx_id in input_tx_ids)
input_txs = (tx for tx in inputs if isinstance(tx, Transaction))
txs.extend(input_txs)

# mix the blocks tx-parents, with their own tx-parents to avoid carrying one of the genesis tx over
best_block_tx_parents = get_tx_parents(best_block)
tx1_tx_grandparents = get_tx_parents(best_block_tx_parents[0])
tx2_tx_grandparents = get_tx_parents(best_block_tx_parents[1])
confirmed_tips = sorted(
set(best_block_tx_parents) | set(tx1_tx_grandparents) | set(tx2_tx_grandparents),
key=lambda tx: tx.timestamp,
)
return txs

unconfirmed_tips = [tx for tx in self.tx_storage.iter_mempool_tips() if tx.timestamp < timestamp]
match unconfirmed_tips:
case []:
# mix the blocks tx-parents, with their own tx-parents to avoid carrying one of the genesis tx over
best_block_tx_parents = get_tx_parents(best_block)
tx1_tx_grandparents = get_tx_parents(best_block_tx_parents[0], with_inputs=True)
tx2_tx_grandparents = get_tx_parents(best_block_tx_parents[1], with_inputs=True)
confirmed_tips = sorted(
set(best_block_tx_parents) | set(tx1_tx_grandparents) | set(tx2_tx_grandparents),
key=lambda tx: tx.timestamp,
)
self.log.debug('generate_parent_txs: empty mempool, repeat parents')
return ParentTxs.from_txs(can_include=confirmed_tips[-2:], must_include=())
case [tip_tx]:
if unconfirmed_extras:
self.log.debug('generate_parent_txs: one tx tip and at least one other mempool tx')
return ParentTxs.from_txs(can_include=unconfirmed_extras[-1:], must_include=(tip_tx,))
else:
self.log.debug('generate_parent_txs: one tx in mempool, fill with one repeated parent')
return ParentTxs.from_txs(can_include=confirmed_tips[-1:], must_include=(tip_tx,))
best_block_tx_parents = get_tx_parents(best_block)
repeated_parents = get_tx_parents(tip_tx, with_inputs=True)
confirmed_tips = sorted(
set(best_block_tx_parents) | set(repeated_parents),
key=lambda tx: tx.timestamp,
)
self.log.debug('generate_parent_txs: one tx in mempool, fill with one repeated parent')
return ParentTxs.from_txs(can_include=confirmed_tips[-1:], must_include=(tip_tx,))
case _:
self.log.debug('generate_parent_txs: multiple unconfirmed mempool tips')
return ParentTxs.from_txs(can_include=unconfirmed_tips, must_include=())
Expand Down
20 changes: 11 additions & 9 deletions hathor/p2p/sync_v2/blockchain_streaming_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,17 @@ def handle_blocks(self, blk: Block) -> None:

# Check for repeated blocks.
is_duplicated = False
if self.tx_storage.partial_vertex_exists(blk.hash):
# We reached a block we already have. Skip it.
self._blk_repeated += 1
is_duplicated = True
if self._blk_repeated > self.max_repeated_blocks:
self.log.info('too many repeated block received', total_repeated=self._blk_repeated)
self.fails(TooManyRepeatedVerticesError())
self._last_received_block = blk
return
if (blk_meta := self.tx_storage.get_metadata(blk.hash)) is not None:
# XXX: check whether the block is part of the best chain
if not blk_meta.voided_by:
# We reached a block we already have. Skip it.
self._blk_repeated += 1
if self._blk_repeated > self.max_repeated_blocks:
self.log.info('too many repeated block received', total_repeated=self._blk_repeated)
self.fails(TooManyRepeatedVerticesError())
is_duplicated = True
self._last_received_block = blk
return

# basic linearity validation, crucial for correctly predicting the next block's height
if self._reverse:
Expand Down
2 changes: 1 addition & 1 deletion hathor/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from structlog import get_logger

BASE_VERSION = '0.68.2'
BASE_VERSION = '0.68.3'

DEFAULT_VERSION_SUFFIX = "local"
BUILD_VERSION_FILE_PATH = "./BUILD_VERSION"
Expand Down
67 changes: 67 additions & 0 deletions hathor_tests/p2p/test_sync_v2.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import base64
import re
from typing import cast
from unittest.mock import patch

from twisted.internet.defer import Deferred, succeed
Expand All @@ -9,6 +10,8 @@
from hathor.p2p.peer import PrivatePeer
from hathor.p2p.states import ReadyState
from hathor.p2p.sync_v2.agent import NodeBlockSync, _HeightInfo
from hathor.p2p.sync_v2.blockchain_streaming_client import BlockchainStreamingClient
from hathor.p2p.sync_v2.exception import StreamingError
from hathor.simulator import FakeConnection
from hathor.simulator.trigger import (
StopAfterNMinedBlocks,
Expand All @@ -17,6 +20,7 @@
StopWhenTrue,
Trigger,
)
from hathor.transaction import Block
from hathor.transaction.storage import TransactionRocksDBStorage
from hathor.transaction.storage.transaction_storage import TransactionStorage
from hathor.transaction.storage.traversal import DFSWalk
Expand Down Expand Up @@ -284,6 +288,69 @@ def test_receiving_tips_limit(self) -> None:
# and also the second node should have aborted the connection
self.assertTrue(conn12.proto2.aborting)

def test_sync_v2_reorg_stuck_on_repeated_blocks(self) -> None:
manager = self.create_peer()

dag_builder = TestDAGBuilder.from_manager(manager)
artifacts = dag_builder.build_from_str("""
blockchain genesis b[0..5]
blockchain b5 lose[1..11]
blockchain b5 win[1..12]
""")

# Load the losing chain.
for node, vertex in artifacts.list:
if node.name.startswith('lose') or node.name.startswith('b'):
cloned = vertex.clone(include_metadata=True, include_storage=False)
assert manager.vertex_handler.on_new_relayed_vertex(cloned)

# Simulate a previous partial sync by adding 10 winning blocks, but not the one that would reorg.
for i in range(1, 11):
win_blk = artifacts.get_typed_vertex(f'win{i}', Block)
cloned = win_blk.clone(include_metadata=False, include_storage=False)
assert manager.vertex_handler.on_new_relayed_vertex(cloned)
assert cloned.get_metadata().voided_by == {cloned.hash}

win11 = artifacts.get_typed_vertex('win11', Block)
win12 = artifacts.get_typed_vertex('win12', Block)
start_block = artifacts.get_typed_vertex('b5', Block)

self.assertFalse(manager.tx_storage.transaction_exists(win11.hash))
self.assertFalse(manager.tx_storage.transaction_exists(win12.hash))

start_info = _HeightInfo(height=start_block.get_height(), id=start_block.hash)
end_info = _HeightInfo(height=win12.get_height(), id=win12.hash)

class DummyProtocol:
def get_short_peer_id(self) -> str:
return 'dummy'

class DummySync:
def __init__(self) -> None:
self.protocol = DummyProtocol()
self.tx_storage = manager.tx_storage
self.vertex_handler = manager.vertex_handler

client = BlockchainStreamingClient(cast(NodeBlockSync, DummySync()), start_info, end_info)

errors: list[StreamingError] = []
client.wait().addErrback(lambda failure: errors.append(failure.value))

# Restarted stream re-sends the start block and the 10 already-downloaded winning blocks before the new ones.
stream: list[Block] = [start_block] + [
artifacts.get_typed_vertex(f'win{i}', Block) for i in range(1, 13)
]
for blk in stream:
client.handle_blocks(blk)
if errors:
break

self.assertFalse(errors, 'should stream without hitting repeated-block guard')
self.assertTrue(manager.tx_storage.transaction_exists(win11.hash))
self.assertTrue(manager.tx_storage.transaction_exists(win12.hash))
best_block = manager.tx_storage.get_best_block()
self.assertEqual(best_block.hash, win12.hash)

def _prepare_sync_v2_find_best_common_block_reorg(self) -> FakeConnection:
manager1 = self.create_peer()
manager1.allow_mining_without_peers()
Expand Down
61 changes: 61 additions & 0 deletions hathor_tests/tx/test_mempool_iter_all.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright 2025 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from unittest.mock import patch

from hathor.simulator.utils import add_new_blocks, gen_new_tx
from hathor_tests import unittest


class MempoolIterAllTraversalTestCase(unittest.TestCase):
"""Regression helpers for ByteCollectionMempoolTipsIndex.iter_all."""

def setUp(self) -> None:
super().setUp()
self.manager = self.create_peer('testnet', unlock_wallet=True)

def test_iter_mempool_walks_block_chain_via_inputs(self) -> None:
# Mine enough blocks so at least one reward is spendable by the wallet.
num_blocks = self._settings.REWARD_SPEND_MIN_BLOCKS + 2
add_new_blocks(self.manager, num_blocks, advance_clock=1)
self.run_to_completion()

address = self.get_address(0)
assert address is not None
tx = gen_new_tx(self.manager, address, value=10)
self.manager.propagate_tx(tx)
self.run_to_completion()

# Capture which vertices iter_mempool touches while walking dependencies.
with patch.object(self.manager.tx_storage, 'get_vertex',
wraps=self.manager.tx_storage.get_vertex) as get_vertex:
mempool = list(self.manager.tx_storage.iter_mempool())

self.assertEqual({tx.hash}, {t.hash for t in mempool})

tx_storage = self.manager.tx_storage
expected_blocks = {
txin.tx_id
for txin in tx.inputs
if tx_storage.get_transaction(txin.tx_id).is_block
}
visited_blocks = {
call.args[0]
for call in get_vertex.call_args_list
if tx_storage.get_transaction(call.args[0]).is_block
}

# iter_mempool should only touch the blocks whose outputs are being spent in the mempool.
self.assertTrue(expected_blocks, 'at least one block reward should be spent')
self.assertEqual(expected_blocks, visited_blocks)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

[tool.poetry]
name = "hathor"
version = "0.68.2"
version = "0.68.3"
description = "Hathor Network full-node"
authors = ["Hathor Team <contact@hathor.network>"]
license = "Apache-2.0"
Expand Down
Loading