Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions hathor/p2p/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from hathor.p2p.protocol import HathorProtocol
from hathor.p2p.rate_limiter import RateLimiter
from hathor.p2p.states.ready import ReadyState
from hathor.p2p.sync_factory import SyncManagerFactory
from hathor.p2p.sync_factory import SyncAgentFactory
from hathor.p2p.sync_version import SyncVersion
from hathor.p2p.utils import description_to_connection_string, parse_whitelist
from hathor.pubsub import HathorEvents, PubSubManager
Expand Down Expand Up @@ -83,7 +83,7 @@ class GlobalRateLimiter:
connecting_peers: dict[IStreamClientEndpoint, _ConnectingPeer]
handshaking_peers: set[HathorProtocol]
whitelist_only: bool
_sync_factories: dict[SyncVersion, SyncManagerFactory]
_sync_factories: dict[SyncVersion, SyncAgentFactory]

rate_limiter: RateLimiter

Expand Down Expand Up @@ -258,7 +258,7 @@ def get_sync_versions(self) -> set[SyncVersion]:
# XXX: this is to make it easy to simulate old behavior if we disable the sync-version capability
return {SyncVersion.V1}

def get_sync_factory(self, sync_version: SyncVersion) -> SyncManagerFactory:
def get_sync_factory(self, sync_version: SyncVersion) -> SyncAgentFactory:
"""Get the sync factory for a given version, support MUST be checked beforehand or it will raise an assert."""
assert sync_version in self._sync_factories, 'get_sync_factory must be called for a supported version'
return self._sync_factories[sync_version]
Expand Down
6 changes: 3 additions & 3 deletions hathor/p2p/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,21 +363,21 @@ def is_sync_enabled(self) -> bool:
if not self.is_state(self.PeerState.READY):
return False
assert isinstance(self.state, ReadyState)
return self.state.sync_manager.is_sync_enabled()
return self.state.sync_agent.is_sync_enabled()

def enable_sync(self) -> None:
"""Enable sync for this connection."""
assert self.is_state(self.PeerState.READY)
assert isinstance(self.state, ReadyState)
self.log.info('enable sync')
self.state.sync_manager.enable_sync()
self.state.sync_agent.enable_sync()

def disable_sync(self) -> None:
"""Disable sync for this connection."""
assert self.is_state(self.PeerState.READY)
assert isinstance(self.state, ReadyState)
self.log.info('disable sync')
self.state.sync_manager.disable_sync()
self.state.sync_agent.disable_sync()


class HathorLineReceiver(LineReceiver, HathorProtocol):
Expand Down
2 changes: 1 addition & 1 deletion hathor/p2p/resources/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def render_GET(self, request):
for conn in self.manager.connections.iter_ready_connections():
remote = conn.transport.getPeer()
status = {}
status[conn.state.sync_manager.name] = conn.state.sync_manager.get_status()
status[conn.state.sync_agent.name] = conn.state.sync_agent.get_status()
connected_peers.append({
'id': conn.peer.id,
'app_version': conn.app_version,
Expand Down
20 changes: 10 additions & 10 deletions hathor/p2p/states/ready.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from hathor.p2p.messages import ProtocolMessages
from hathor.p2p.peer_id import PeerId
from hathor.p2p.states.base import BaseState
from hathor.p2p.sync_manager import SyncManager
from hathor.p2p.sync_agent import SyncAgent
from hathor.transaction import BaseTransaction
from hathor.util import json_dumps, json_loads

Expand Down Expand Up @@ -77,8 +77,8 @@ def __init__(self, protocol: 'HathorProtocol') -> None:
self.log.debug(f'loading {sync_version}')
sync_factory = connections.get_sync_factory(sync_version)

self.sync_manager: SyncManager = sync_factory.create_sync_manager(self.protocol, reactor=self.reactor)
self.cmd_map.update(self.sync_manager.get_cmd_dict())
self.sync_agent: SyncAgent = sync_factory.create_sync_agent(self.protocol, reactor=self.reactor)
self.cmd_map.update(self.sync_agent.get_cmd_dict())

def on_enter(self) -> None:
if self.protocol.connections:
Expand All @@ -87,24 +87,24 @@ def on_enter(self) -> None:
self.lc_ping.start(1, now=False)
self.send_get_peers()

self.sync_manager.start()
self.sync_agent.start()

def on_exit(self) -> None:
if self.lc_ping.running:
self.lc_ping.stop()

if self.sync_manager.is_started():
self.sync_manager.stop()
if self.sync_agent.is_started():
self.sync_agent.stop()

def prepare_to_disconnect(self) -> None:
if self.sync_manager.is_started():
self.sync_manager.stop()
if self.sync_agent.is_started():
self.sync_agent.stop()

def send_tx_to_peer(self, tx: BaseTransaction) -> None:
self.sync_manager.send_tx_to_peer_if_possible(tx)
self.sync_agent.send_tx_to_peer_if_possible(tx)

def is_synced(self) -> bool:
return self.sync_manager.is_synced()
return self.sync_agent.is_synced()

def send_get_peers(self) -> None:
""" Send a GET-PEERS command, requesting a list of nodes.
Expand Down
2 changes: 1 addition & 1 deletion hathor/p2p/sync_manager.py → hathor/p2p/sync_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from hathor.transaction import BaseTransaction


class SyncManager(ABC):
class SyncAgent(ABC):
@abstractmethod
def is_started(self) -> bool:
"""Whether the manager started running"""
Expand Down
6 changes: 3 additions & 3 deletions hathor/p2p/sync_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Optional

from hathor.p2p.sync_manager import SyncManager
from hathor.p2p.sync_agent import SyncAgent
from hathor.util import Reactor

if TYPE_CHECKING:
from hathor.p2p.protocol import HathorProtocol


class SyncManagerFactory(ABC):
class SyncAgentFactory(ABC):
@abstractmethod
def create_sync_manager(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None) -> SyncManager:
def create_sync_agent(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None) -> SyncAgent:
pass
4 changes: 2 additions & 2 deletions hathor/p2p/sync_v1/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from hathor.conf import HathorSettings
from hathor.p2p.messages import GetNextPayload, GetTipsPayload, NextPayload, ProtocolMessages, TipsPayload
from hathor.p2p.sync_manager import SyncManager
from hathor.p2p.sync_agent import SyncAgent
from hathor.p2p.sync_v1.downloader import Downloader
from hathor.transaction import BaseTransaction
from hathor.transaction.base_transaction import tx_or_block_from_bytes
Expand Down Expand Up @@ -172,7 +172,7 @@ def stopProducing(self) -> None:
self.priority_queue.clear()


class NodeSyncTimestamp(SyncManager):
class NodeSyncTimestamp(SyncAgent):
""" An algorithm to sync the DAG between two peers using the timestamp of the transactions.

This algorithm must assume that a new item may arrive while it is running. The item's timestamp
Expand Down
8 changes: 4 additions & 4 deletions hathor/p2p/sync_v1/factory_v1_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
from typing import TYPE_CHECKING, Optional

from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.sync_factory import SyncManagerFactory
from hathor.p2p.sync_manager import SyncManager
from hathor.p2p.sync_agent import SyncAgent
from hathor.p2p.sync_factory import SyncAgentFactory
from hathor.p2p.sync_v1.agent import NodeSyncTimestamp
from hathor.p2p.sync_v1.downloader import Downloader
from hathor.util import Reactor
Expand All @@ -25,7 +25,7 @@
from hathor.p2p.protocol import HathorProtocol


class SyncV10Factory(SyncManagerFactory):
class SyncV10Factory(SyncAgentFactory):
def __init__(self, connections: ConnectionsManager):
self.connections = connections
self._downloader: Optional[Downloader] = None
Expand All @@ -36,5 +36,5 @@ def get_downloader(self) -> Downloader:
self._downloader = Downloader(self.connections.manager)
return self._downloader

def create_sync_manager(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None) -> SyncManager:
def create_sync_agent(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None) -> SyncAgent:
return NodeSyncTimestamp(protocol, downloader=self.get_downloader(), reactor=reactor)
8 changes: 4 additions & 4 deletions hathor/p2p/sync_v1/factory_v1_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
from typing import TYPE_CHECKING, Optional

from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.sync_factory import SyncManagerFactory
from hathor.p2p.sync_manager import SyncManager
from hathor.p2p.sync_agent import SyncAgent
from hathor.p2p.sync_factory import SyncAgentFactory
from hathor.p2p.sync_v1.agent import NodeSyncTimestamp
from hathor.p2p.sync_v1.downloader import Downloader
from hathor.util import Reactor
Expand All @@ -25,7 +25,7 @@
from hathor.p2p.protocol import HathorProtocol


class SyncV11Factory(SyncManagerFactory):
class SyncV11Factory(SyncAgentFactory):
def __init__(self, connections: ConnectionsManager):
self.connections = connections
self._downloader: Optional[Downloader] = None
Expand All @@ -36,5 +36,5 @@ def get_downloader(self) -> Downloader:
self._downloader = Downloader(self.connections.manager)
return self._downloader

def create_sync_manager(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None) -> SyncManager:
def create_sync_agent(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None) -> SyncAgent:
return NodeSyncTimestamp(protocol, downloader=self.get_downloader(), reactor=reactor)
8 changes: 4 additions & 4 deletions hathor/p2p/sync_v2/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
from typing import TYPE_CHECKING, Optional

from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.sync_factory import SyncManagerFactory
from hathor.p2p.sync_manager import SyncManager
from hathor.p2p.sync_agent import SyncAgent
from hathor.p2p.sync_factory import SyncAgentFactory
from hathor.p2p.sync_v2.manager import NodeBlockSync
from hathor.util import Reactor

if TYPE_CHECKING:
from hathor.p2p.protocol import HathorProtocol


class SyncV2Factory(SyncManagerFactory):
class SyncV2Factory(SyncAgentFactory):
def __init__(self, connections: ConnectionsManager):
self.connections = connections

def create_sync_manager(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None) -> SyncManager:
def create_sync_agent(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None) -> SyncAgent:
return NodeBlockSync(protocol, reactor=reactor)
4 changes: 2 additions & 2 deletions hathor/p2p/sync_v2/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from hathor.conf import HathorSettings
from hathor.p2p.messages import ProtocolMessages
from hathor.p2p.sync_manager import SyncManager
from hathor.p2p.sync_agent import SyncAgent
from hathor.p2p.sync_v2.mempool import SyncMempoolManager
from hathor.p2p.sync_v2.streamers import DEFAULT_STREAMING_LIMIT, BlockchainStreaming, StreamEnd, TransactionsStreaming
from hathor.transaction import BaseTransaction, Block, Transaction
Expand All @@ -53,7 +53,7 @@ class PeerState(Enum):
SYNCING_MEMPOOL = 'syncing-mempool'


class NodeBlockSync(SyncManager):
class NodeBlockSync(SyncAgent):
""" An algorithm to sync two peers based on their blockchain.
"""
name: str = 'node-block-sync'
Expand Down
18 changes: 9 additions & 9 deletions hathor/p2p/sync_v2/mempool.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@
class SyncMempoolManager:
"""Manage the sync-v2 mempool with one peer.
"""
def __init__(self, sync_manager: 'NodeBlockSync'):
def __init__(self, sync_agent: 'NodeBlockSync'):
"""Initialize the sync-v2 mempool manager."""
self.log = logger.new(peer=sync_manager.protocol.get_short_peer_id())
self.log = logger.new(peer=sync_agent.protocol.get_short_peer_id())

# Shortcuts.
self.sync_manager = sync_manager
self.manager = self.sync_manager.manager
self.sync_agent = sync_agent
self.manager = self.sync_agent.manager
self.tx_storage = self.manager.tx_storage
self.reactor = self.sync_manager.reactor
self.reactor = self.sync_agent.reactor

# Set of tips we know but couldn't add to the DAG yet.
self.missing_tips: set[bytes] = set()
Expand Down Expand Up @@ -65,21 +65,21 @@ def _run(self) -> Generator[Deferred, Any, None]:
try:
yield self._unsafe_run()
finally:
# sync_manager.run_sync will start it again when needed
# sync_agent.run_sync will start it again when needed
self._is_running = False

@inlineCallbacks
def _unsafe_run(self) -> Generator[Deferred, Any, None]:
"""Run a single loop of the sync-v2 mempool."""
if not self.missing_tips:
# No missing tips? Let's get them!
tx_hashes: list[bytes] = yield self.sync_manager.get_tips()
tx_hashes: list[bytes] = yield self.sync_agent.get_tips()
self.missing_tips.update(h for h in tx_hashes if not self.tx_storage.transaction_exists(h))

while self.missing_tips:
self.log.debug('We have missing tips! Let\'s start!', missing_tips=[x.hex() for x in self.missing_tips])
tx_id = next(iter(self.missing_tips))
tx: BaseTransaction = yield self.sync_manager.get_tx(tx_id)
tx: BaseTransaction = yield self.sync_agent.get_tx(tx_id)
# Stack used by the DFS in the dependencies.
# We use a deque for performance reasons.
self.log.debug('start mempool DSF', tx=tx.hash_hex)
Expand All @@ -98,7 +98,7 @@ def _dfs(self, stack: deque[BaseTransaction]) -> Generator[Deferred, Any, None]:
assert tx == stack.pop()
else:
self.log.debug('Iterate in the DFS.', missing_dep=missing_dep.hex())
tx_dep = yield self.sync_manager.get_tx(missing_dep)
tx_dep = yield self.sync_agent.get_tx(missing_dep)
stack.append(tx_dep)
if len(stack) > self.MAX_STACK_LENGTH:
stack.popleft()
Expand Down
16 changes: 8 additions & 8 deletions hathor/simulator/fake_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ def is_both_synced(self) -> bool:
return False
assert isinstance(state1, ReadyState) # mypy can't infer this from the above
assert isinstance(state2, ReadyState) # mypy can't infer this from the above
state1_is_errored = state1.sync_manager.is_errored()
state2_is_errored = state2.sync_manager.is_errored()
state1_is_errored = state1.sync_agent.is_errored()
state2_is_errored = state2.sync_agent.is_errored()
if state1_is_errored or state2_is_errored:
self.log.debug('peer errored', peer1_errored=state1_is_errored, peer2_errored=state2_is_errored)
return False
state1_is_synced = state1.sync_manager.is_synced()
state2_is_synced = state2.sync_manager.is_synced()
state1_is_synced = state1.sync_agent.is_synced()
state2_is_synced = state2.sync_agent.is_synced()
if not state1_is_synced or not state2_is_synced:
self.log.debug('peer not synced', peer1_synced=state1_is_synced, peer2_synced=state2_is_synced)
return False
Expand All @@ -120,13 +120,13 @@ def can_step(self) -> bool:
return True
assert isinstance(state1, ReadyState) # mypy can't infer this from the above
assert isinstance(state2, ReadyState) # mypy can't infer this from the above
state1_is_errored = state1.sync_manager.is_errored()
state2_is_errored = state2.sync_manager.is_errored()
state1_is_errored = state1.sync_agent.is_errored()
state2_is_errored = state2.sync_agent.is_errored()
if state1_is_errored or state2_is_errored:
self.log.debug('peer errored', peer1_errored=state1_is_errored, peer2_errored=state2_is_errored)
return False
state1_is_synced = state1.sync_manager.is_synced()
state2_is_synced = state2.sync_manager.is_synced()
state1_is_synced = state1.sync_agent.is_synced()
state2_is_synced = state2.sync_agent.is_synced()
if not state1_is_synced or not state2_is_synced:
self.log.debug('peer not synced', peer1_synced=state1_is_synced, peer2_synced=state2_is_synced)
return True
Expand Down
16 changes: 8 additions & 8 deletions tests/p2p/test_capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ def test_capabilities(self):
# Even if we don't have the capability we must connect because the whitelist url conf is None
self.assertEqual(conn._proto1.state.state_name, 'READY')
self.assertEqual(conn._proto2.state.state_name, 'READY')
self.assertIsInstance(conn._proto1.state.sync_manager, NodeSyncTimestamp)
self.assertIsInstance(conn._proto2.state.sync_manager, NodeSyncTimestamp)
self.assertIsInstance(conn._proto1.state.sync_agent, NodeSyncTimestamp)
self.assertIsInstance(conn._proto2.state.sync_agent, NodeSyncTimestamp)

manager3 = self.create_peer(network, capabilities=[settings.CAPABILITY_WHITELIST])
manager4 = self.create_peer(network, capabilities=[settings.CAPABILITY_WHITELIST])
Expand All @@ -38,8 +38,8 @@ def test_capabilities(self):

self.assertEqual(conn2._proto1.state.state_name, 'READY')
self.assertEqual(conn2._proto2.state.state_name, 'READY')
self.assertIsInstance(conn2._proto1.state.sync_manager, NodeSyncTimestamp)
self.assertIsInstance(conn2._proto2.state.sync_manager, NodeSyncTimestamp)
self.assertIsInstance(conn2._proto1.state.sync_agent, NodeSyncTimestamp)
self.assertIsInstance(conn2._proto2.state.sync_agent, NodeSyncTimestamp)


class SyncV2HathorCapabilitiesTestCase(unittest.SyncV2Params, unittest.TestCase):
Expand All @@ -59,8 +59,8 @@ def test_capabilities(self):
# Even if we don't have the capability we must connect because the whitelist url conf is None
self.assertEqual(conn._proto1.state.state_name, 'READY')
self.assertEqual(conn._proto2.state.state_name, 'READY')
self.assertIsInstance(conn._proto1.state.sync_manager, NodeBlockSync)
self.assertIsInstance(conn._proto2.state.sync_manager, NodeBlockSync)
self.assertIsInstance(conn._proto1.state.sync_agent, NodeBlockSync)
self.assertIsInstance(conn._proto2.state.sync_agent, NodeBlockSync)

manager3 = self.create_peer(network, capabilities=[settings.CAPABILITY_WHITELIST,
settings.CAPABILITY_SYNC_VERSION])
Expand All @@ -76,8 +76,8 @@ def test_capabilities(self):

self.assertEqual(conn2._proto1.state.state_name, 'READY')
self.assertEqual(conn2._proto2.state.state_name, 'READY')
self.assertIsInstance(conn2._proto1.state.sync_manager, NodeBlockSync)
self.assertIsInstance(conn2._proto2.state.sync_manager, NodeBlockSync)
self.assertIsInstance(conn2._proto1.state.sync_agent, NodeBlockSync)
self.assertIsInstance(conn2._proto2.state.sync_agent, NodeBlockSync)


# sync-bridge should behave like sync-v2
Expand Down
2 changes: 1 addition & 1 deletion tests/p2p/test_split_brain.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def test_split_brain_plain(self):
dot2 = GraphvizVisualizer(manager2.tx_storage, include_verifications=True).dot()
dot2.render('dot2-post')

node_sync = conn.proto1.state.sync_manager
node_sync = conn.proto1.state.sync_agent
self.assertSyncedProgress(node_sync)
self.assertTipsEqual(manager1, manager2)
self.assertConsensusEqual(manager1, manager2)
Expand Down
Loading