diff --git a/hathor/builder/builder.py b/hathor/builder/builder.py index e83db1369..816486916 100644 --- a/hathor/builder/builder.py +++ b/hathor/builder/builder.py @@ -22,6 +22,8 @@ from hathor.conf.settings import HathorSettings as HathorSettingsType from hathor.consensus import ConsensusAlgorithm from hathor.event import EventManager +from hathor.event.storage import EventMemoryStorage, EventRocksDBStorage, EventStorage +from hathor.event.websocket import EventWebsocketFactory from hathor.indexes import IndexesManager from hathor.manager import HathorManager from hathor.p2p.peer_id import PeerId @@ -41,6 +43,7 @@ class StorageType(Enum): class BuildArtifacts(NamedTuple): """Artifacts created by a builder.""" + peer_id: PeerId settings: HathorSettingsType rng: Random reactor: Reactor @@ -75,15 +78,19 @@ def __init__(self) -> None: self._network: Optional[str] = None self._cmdline: str = '' - self._storage_type: Optional[StorageType] = None + self._storage_type: StorageType = StorageType.MEMORY self._force_memory_index: bool = False self._event_manager: Optional[EventManager] = None + self._event_ws_factory: Optional[EventWebsocketFactory] = None self._rocksdb_path: Optional[str] = None self._rocksdb_storage: Optional[RocksDBStorage] = None + self._rocksdb_cache_capacity: Optional[int] = None + self._rocksdb_with_index: Optional[bool] = None self._tx_storage: Optional[TransactionStorage] = None + self._event_storage: Optional[EventStorage] = None self._reactor: Optional[Reactor] = None self._pubsub: Optional[PubSubManager] = None @@ -119,6 +126,8 @@ def build(self) -> BuildArtifacts: consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub) wallet = self._get_or_create_wallet() + event_storage = self._get_or_create_event_storage() + event_manager = self._get_or_create_event_manager() tx_storage = self._get_or_create_tx_storage() indexes = tx_storage.indexes assert indexes is not None @@ -146,9 +155,6 @@ def build(self) -> BuildArtifacts: if self._network is None: raise TypeError('you must set a network') - if self._event_manager is not None: - kwargs['event_manager'] = self._event_manager - if self._full_verification is not None: kwargs['full_verification'] = self._full_verification @@ -158,16 +164,19 @@ def build(self) -> BuildArtifacts: consensus_algorithm=consensus_algorithm, peer_id=peer_id, tx_storage=tx_storage, + event_storage=event_storage, network=self._network, wallet=wallet, rng=self._rng, checkpoints=self._checkpoints, capabilities=self._capabilities, environment_info=get_environment_info(self._cmdline, peer_id.id), + event_manager=event_manager, **kwargs ) self.artifacts = BuildArtifacts( + peer_id=peer_id, settings=settings, rng=self._rng, reactor=reactor, @@ -239,30 +248,84 @@ def _get_or_create_pubsub(self) -> PubSubManager: def _get_or_create_rocksdb_storage(self) -> RocksDBStorage: assert self._rocksdb_path is not None - if self._rocksdb_storage is None: - self._rocksdb_storage = RocksDBStorage(path=self._rocksdb_path) + + if self._rocksdb_storage is not None: + return self._rocksdb_storage + + kwargs = {} + if self._rocksdb_cache_capacity is not None: + kwargs = dict(cache_capacity=self._rocksdb_cache_capacity) + + self._rocksdb_storage = RocksDBStorage( + path=self._rocksdb_path, + **kwargs + ) + return self._rocksdb_storage def _get_or_create_tx_storage(self) -> TransactionStorage: if self._tx_storage is not None: return self._tx_storage + if self._storage_type == StorageType.MEMORY: return TransactionMemoryStorage() - elif self._storage_type == StorageType.ROCKSDB: + + if self._storage_type == StorageType.ROCKSDB: rocksdb_storage = self._get_or_create_rocksdb_storage() use_memory_index = self._force_memory_index - return TransactionRocksDBStorage(rocksdb_storage, use_memory_indexes=use_memory_index) + + kwargs = {} + if self._rocksdb_with_index is not None: + kwargs = dict(with_index=self._rocksdb_with_index) + + return TransactionRocksDBStorage( + rocksdb_storage, + use_memory_indexes=use_memory_index, + **kwargs + ) + raise NotImplementedError + def _get_or_create_event_storage(self) -> EventStorage: + if self._event_storage is not None: + pass + elif self._storage_type == StorageType.MEMORY: + self._event_storage = EventMemoryStorage() + elif self._storage_type == StorageType.ROCKSDB: + rocksdb_storage = self._get_or_create_rocksdb_storage() + self._event_storage = EventRocksDBStorage(rocksdb_storage) + else: + raise NotImplementedError + + return self._event_storage + + def _get_or_create_event_manager(self) -> Optional[EventManager]: + if self._event_manager is None and self._event_ws_factory is not None: + self._event_manager = EventManager( + reactor=self._get_reactor(), + pubsub=self._get_or_create_pubsub(), + event_storage=self._get_or_create_event_storage(), + event_ws_factory=self._event_ws_factory + ) + + return self._event_manager + def use_memory(self) -> 'Builder': self.check_if_can_modify() self._storage_type = StorageType.MEMORY return self - def use_rocksdb(self, path: str) -> 'Builder': + def use_rocksdb( + self, + path: str, + with_index: Optional[bool] = None, + cache_capacity: Optional[int] = None + ) -> 'Builder': self.check_if_can_modify() self._storage_type = StorageType.ROCKSDB self._rocksdb_path = path + self._rocksdb_with_index = with_index + self._rocksdb_cache_capacity = cache_capacity return self def force_memory_index(self) -> 'Builder': @@ -320,11 +383,21 @@ def enable_wallet_index(self) -> 'Builder': self.enable_tokens_index() return self + def enable_event_manager(self, *, event_ws_factory: EventWebsocketFactory) -> 'Builder': + self.check_if_can_modify() + self._event_ws_factory = event_ws_factory + return self + def set_tx_storage(self, tx_storage: TransactionStorage) -> 'Builder': self.check_if_can_modify() self._tx_storage = tx_storage return self + def set_event_storage(self, event_storage: EventStorage) -> 'Builder': + self.check_if_can_modify() + self._event_storage = event_storage + return self + def set_reactor(self, reactor: Reactor) -> 'Builder': self.check_if_can_modify() self._reactor = reactor diff --git a/hathor/builder/cli_builder.py b/hathor/builder/cli_builder.py index 887928df9..f0485a853 100644 --- a/hathor/builder/cli_builder.py +++ b/hathor/builder/cli_builder.py @@ -97,16 +97,15 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa ) tx_storage: TransactionStorage + event_storage: EventStorage rocksdb_storage: RocksDBStorage - self.event_storage: Optional[EventStorage] = None self.event_ws_factory: Optional[EventWebsocketFactory] = None if args.memory_storage: self.check_or_raise(not args.data, '--data should not be used with --memory-storage') # if using MemoryStorage, no need to have cache tx_storage = TransactionMemoryStorage() - if args.x_enable_event_queue: - self.event_storage = EventMemoryStorage() + event_storage = EventMemoryStorage() self.check_or_raise(not args.x_rocksdb_indexes, 'RocksDB indexes require RocksDB data') self.log.info('with storage', storage_class=type(tx_storage).__name__) else: @@ -118,8 +117,7 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa tx_storage = TransactionRocksDBStorage(rocksdb_storage, with_index=(not args.cache), use_memory_indexes=args.memory_indexes) - if args.x_enable_event_queue: - self.event_storage = EventRocksDBStorage(rocksdb_storage) + event_storage = EventRocksDBStorage(rocksdb_storage) self.log.info('with storage', storage_class=type(tx_storage).__name__, path=args.data) if args.cache: @@ -147,10 +145,9 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa event_manager: Optional[EventManager] = None if args.x_enable_event_queue: - assert self.event_storage is not None, 'cannot create EventManager without EventStorage' - self.event_ws_factory = EventWebsocketFactory(reactor, self.event_storage) + self.event_ws_factory = EventWebsocketFactory(reactor, event_storage) event_manager = EventManager( - event_storage=self.event_storage, + event_storage=event_storage, event_ws_factory=self.event_ws_factory, pubsub=pubsub, reactor=reactor, @@ -184,6 +181,7 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa network=network, hostname=hostname, tx_storage=tx_storage, + event_storage=event_storage, event_manager=event_manager, wallet=self.wallet, stratum_port=args.stratum, diff --git a/hathor/manager.py b/hathor/manager.py index a207794e7..32019cb90 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -30,6 +30,7 @@ from hathor.conf import HathorSettings from hathor.consensus import ConsensusAlgorithm from hathor.event.event_manager import EventManager +from hathor.event.storage import EventStorage from hathor.exception import ( DoubleSpendingError, HathorError, @@ -87,6 +88,7 @@ def __init__(self, consensus_algorithm: ConsensusAlgorithm, peer_id: PeerId, tx_storage: TransactionStorage, + event_storage: EventStorage, network: str, hostname: Optional[str] = None, wallet: Optional[BaseWallet] = None, @@ -168,6 +170,9 @@ def __init__(self, self._event_manager = event_manager + if self._event_manager: + assert self._event_manager.event_storage == event_storage + if enable_sync_v2: assert self.tx_storage.indexes is not None self.log.debug('enable sync-v2 indexes') diff --git a/hathor/simulator/simulator.py b/hathor/simulator/simulator.py index d8cc3f35d..6831c3bcf 100644 --- a/hathor/simulator/simulator.py +++ b/hathor/simulator/simulator.py @@ -23,13 +23,13 @@ from hathor.builder import Builder from hathor.conf import HathorSettings from hathor.daa import TestMode, _set_test_mode +from hathor.event.websocket import EventWebsocketFactory from hathor.manager import HathorManager from hathor.p2p.peer_id import PeerId from hathor.simulator.clock import HeapClock from hathor.simulator.miner.geometric_miner import GeometricMiner from hathor.simulator.tx_generator import RandomTransactionGenerator from hathor.transaction.genesis import _get_genesis_transactions_unsafe -from hathor.transaction.storage.memory_storage import TransactionMemoryStorage from hathor.util import Random from hathor.wallet import HDWallet @@ -137,7 +137,8 @@ def create_peer( enable_sync_v1: bool = True, enable_sync_v2: bool = True, soft_voided_tx_ids: Optional[Set[bytes]] = None, - full_verification: bool = True + full_verification: bool = True, + event_ws_factory: Optional[EventWebsocketFactory] = None ) -> HathorManager: assert self._started, 'Simulator is not started.' assert peer_id is not None # XXX: temporary, for checking that tests are using the peer_id @@ -145,18 +146,22 @@ def create_peer( wallet = HDWallet(gap_limit=2) wallet._manually_initialize() - artifacts = Builder() \ + builder = Builder() \ .set_reactor(self._clock) \ .set_peer_id(peer_id or PeerId()) \ .set_network(network or self._network) \ .set_wallet(wallet) \ .set_rng(Random(self.rng.getrandbits(64))) \ - .set_tx_storage(TransactionMemoryStorage()) \ .set_enable_sync_v1(enable_sync_v1) \ .set_enable_sync_v2(enable_sync_v2) \ .set_full_verification(full_verification) \ .set_soft_voided_tx_ids(soft_voided_tx_ids or set()) \ - .build() + .use_memory() + + if event_ws_factory: + builder.enable_event_manager(event_ws_factory=event_ws_factory) + + artifacts = builder.build() artifacts.manager.start() self.run_to_completion() diff --git a/tests/event/test_event_manager.py b/tests/event/test_event_manager.py index 7751436fb..1b4f7250f 100644 --- a/tests/event/test_event_manager.py +++ b/tests/event/test_event_manager.py @@ -1,10 +1,9 @@ from unittest.mock import Mock -from hathor.event import EventManager from hathor.event.model.event_type import EventType from hathor.event.storage.memory_storage import EventMemoryStorage from hathor.event.websocket import EventWebsocketFactory -from hathor.pubsub import HathorEvents, PubSubManager +from hathor.pubsub import HathorEvents from tests import unittest @@ -13,21 +12,13 @@ class BaseEventManagerTest(unittest.TestCase): def setUp(self): super().setUp() - self.event_storage = EventMemoryStorage() - self.event_ws_factory = Mock(spec_set=EventWebsocketFactory) self.network = 'testnet' - pubsub = PubSubManager(self.clock) - self.event_manager = EventManager( - event_storage=self.event_storage, - event_ws_factory=self.event_ws_factory, - pubsub=pubsub, - reactor=self.clock - ) + self.event_storage = EventMemoryStorage() self.manager = self.create_peer( self.network, - event_manager=self.event_manager, - pubsub=pubsub, - full_verification=False + event_ws_factory=Mock(spec_set=EventWebsocketFactory), + full_verification=False, + event_storage=self.event_storage ) def test_if_event_is_persisted(self): diff --git a/tests/event/test_event_reorg.py b/tests/event/test_event_reorg.py index feaed01af..78d11a4b0 100644 --- a/tests/event/test_event_reorg.py +++ b/tests/event/test_event_reorg.py @@ -1,11 +1,9 @@ from unittest.mock import Mock from hathor.conf import HathorSettings -from hathor.event import EventManager from hathor.event.model.event_type import EventType from hathor.event.storage import EventMemoryStorage from hathor.event.websocket import EventWebsocketFactory -from hathor.pubsub import PubSubManager from tests import unittest from tests.utils import add_new_blocks, get_genesis_key @@ -18,20 +16,12 @@ class BaseEventReorgTest(unittest.TestCase): def setUp(self): super().setUp() self.network = 'testnet' - self.event_ws_factory = Mock(spec_set=EventWebsocketFactory) self.event_storage = EventMemoryStorage() - pubsub = PubSubManager(self.clock) - self.event_manager = EventManager( - event_storage=self.event_storage, - event_ws_factory=self.event_ws_factory, - pubsub=pubsub, - reactor=self.clock - ) self.manager = self.create_peer( self.network, - event_manager=self.event_manager, - pubsub=pubsub, - full_verification=False + event_ws_factory=Mock(spec_set=EventWebsocketFactory), + full_verification=False, + event_storage=self.event_storage ) # read genesis keys diff --git a/tests/unittest.py b/tests/unittest.py index 00c52c108..8bda63c54 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -145,7 +145,7 @@ def _create_test_wallet(self): def create_peer(self, network, peer_id=None, wallet=None, tx_storage=None, unlock_wallet=True, wallet_index=False, capabilities=None, full_verification=True, enable_sync_v1=None, enable_sync_v2=None, checkpoints=None, utxo_index=False, event_manager=None, use_memory_index=None, start_manager=True, - pubsub=None): + pubsub=None, event_storage=None, event_ws_factory=None): if enable_sync_v1 is None: assert hasattr(self, '_enable_sync_v1'), ('`_enable_sync_v1` has no default by design, either set one on ' 'the test class or pass `enable_sync_v1` by argument') @@ -178,12 +178,19 @@ def create_peer(self, network, peer_id=None, wallet=None, tx_storage=None, unloc wallet.unlock(b'MYPASS') builder.set_wallet(wallet) + if event_storage: + builder.set_event_storage(event_storage) + if event_manager: builder.set_event_manager(event_manager) + if event_ws_factory: + builder.enable_event_manager(event_ws_factory=event_ws_factory) + if tx_storage is not None: builder.set_tx_storage(tx_storage) - elif self.use_memory_storage: + + if self.use_memory_storage: builder.use_memory() else: directory = tempfile.mkdtemp()