Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 82 additions & 9 deletions hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from hathor.conf.settings import HathorSettings as HathorSettingsType
from hathor.consensus import ConsensusAlgorithm
from hathor.event import EventManager
from hathor.event.storage import EventMemoryStorage, EventRocksDBStorage, EventStorage
from hathor.event.websocket import EventWebsocketFactory
from hathor.indexes import IndexesManager
from hathor.manager import HathorManager
from hathor.p2p.peer_id import PeerId
Expand All @@ -41,6 +43,7 @@ class StorageType(Enum):

class BuildArtifacts(NamedTuple):
"""Artifacts created by a builder."""
peer_id: PeerId
settings: HathorSettingsType
rng: Random
reactor: Reactor
Expand Down Expand Up @@ -75,15 +78,19 @@ def __init__(self) -> None:
self._network: Optional[str] = None
self._cmdline: str = ''

self._storage_type: Optional[StorageType] = None
self._storage_type: StorageType = StorageType.MEMORY
self._force_memory_index: bool = False

self._event_manager: Optional[EventManager] = None
self._event_ws_factory: Optional[EventWebsocketFactory] = None

self._rocksdb_path: Optional[str] = None
self._rocksdb_storage: Optional[RocksDBStorage] = None
self._rocksdb_cache_capacity: Optional[int] = None
self._rocksdb_with_index: Optional[bool] = None

self._tx_storage: Optional[TransactionStorage] = None
self._event_storage: Optional[EventStorage] = None

self._reactor: Optional[Reactor] = None
self._pubsub: Optional[PubSubManager] = None
Expand Down Expand Up @@ -119,6 +126,8 @@ def build(self) -> BuildArtifacts:
consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub)

wallet = self._get_or_create_wallet()
event_storage = self._get_or_create_event_storage()
event_manager = self._get_or_create_event_manager()
tx_storage = self._get_or_create_tx_storage()
indexes = tx_storage.indexes
assert indexes is not None
Expand Down Expand Up @@ -146,9 +155,6 @@ def build(self) -> BuildArtifacts:
if self._network is None:
raise TypeError('you must set a network')

if self._event_manager is not None:
kwargs['event_manager'] = self._event_manager

if self._full_verification is not None:
kwargs['full_verification'] = self._full_verification

Expand All @@ -158,16 +164,19 @@ def build(self) -> BuildArtifacts:
consensus_algorithm=consensus_algorithm,
peer_id=peer_id,
tx_storage=tx_storage,
event_storage=event_storage,
network=self._network,
wallet=wallet,
rng=self._rng,
checkpoints=self._checkpoints,
capabilities=self._capabilities,
environment_info=get_environment_info(self._cmdline, peer_id.id),
event_manager=event_manager,
**kwargs
)

self.artifacts = BuildArtifacts(
peer_id=peer_id,
settings=settings,
rng=self._rng,
reactor=reactor,
Expand Down Expand Up @@ -239,30 +248,84 @@ def _get_or_create_pubsub(self) -> PubSubManager:

def _get_or_create_rocksdb_storage(self) -> RocksDBStorage:
assert self._rocksdb_path is not None
if self._rocksdb_storage is None:
self._rocksdb_storage = RocksDBStorage(path=self._rocksdb_path)

if self._rocksdb_storage is not None:
return self._rocksdb_storage

kwargs = {}
if self._rocksdb_cache_capacity is not None:
kwargs = dict(cache_capacity=self._rocksdb_cache_capacity)

self._rocksdb_storage = RocksDBStorage(
path=self._rocksdb_path,
**kwargs
)

return self._rocksdb_storage

def _get_or_create_tx_storage(self) -> TransactionStorage:
if self._tx_storage is not None:
return self._tx_storage

if self._storage_type == StorageType.MEMORY:
return TransactionMemoryStorage()
elif self._storage_type == StorageType.ROCKSDB:

if self._storage_type == StorageType.ROCKSDB:
rocksdb_storage = self._get_or_create_rocksdb_storage()
use_memory_index = self._force_memory_index
return TransactionRocksDBStorage(rocksdb_storage, use_memory_indexes=use_memory_index)

kwargs = {}
if self._rocksdb_with_index is not None:
kwargs = dict(with_index=self._rocksdb_with_index)

return TransactionRocksDBStorage(
rocksdb_storage,
use_memory_indexes=use_memory_index,
**kwargs
)

raise NotImplementedError

def _get_or_create_event_storage(self) -> EventStorage:
if self._event_storage is not None:
pass
elif self._storage_type == StorageType.MEMORY:
self._event_storage = EventMemoryStorage()
elif self._storage_type == StorageType.ROCKSDB:
rocksdb_storage = self._get_or_create_rocksdb_storage()
self._event_storage = EventRocksDBStorage(rocksdb_storage)
else:
raise NotImplementedError

return self._event_storage

def _get_or_create_event_manager(self) -> Optional[EventManager]:
if self._event_manager is None and self._event_ws_factory is not None:
self._event_manager = EventManager(
reactor=self._get_reactor(),
pubsub=self._get_or_create_pubsub(),
event_storage=self._get_or_create_event_storage(),
event_ws_factory=self._event_ws_factory
)

return self._event_manager

def use_memory(self) -> 'Builder':
self.check_if_can_modify()
self._storage_type = StorageType.MEMORY
return self

def use_rocksdb(self, path: str) -> 'Builder':
def use_rocksdb(
self,
path: str,
with_index: Optional[bool] = None,
cache_capacity: Optional[int] = None
) -> 'Builder':
self.check_if_can_modify()
self._storage_type = StorageType.ROCKSDB
self._rocksdb_path = path
self._rocksdb_with_index = with_index
self._rocksdb_cache_capacity = cache_capacity
return self

def force_memory_index(self) -> 'Builder':
Expand Down Expand Up @@ -320,11 +383,21 @@ def enable_wallet_index(self) -> 'Builder':
self.enable_tokens_index()
return self

def enable_event_manager(self, *, event_ws_factory: EventWebsocketFactory) -> 'Builder':
self.check_if_can_modify()
self._event_ws_factory = event_ws_factory
return self

def set_tx_storage(self, tx_storage: TransactionStorage) -> 'Builder':
self.check_if_can_modify()
self._tx_storage = tx_storage
return self

def set_event_storage(self, event_storage: EventStorage) -> 'Builder':
self.check_if_can_modify()
self._event_storage = event_storage
return self

def set_reactor(self, reactor: Reactor) -> 'Builder':
self.check_if_can_modify()
self._reactor = reactor
Expand Down
14 changes: 6 additions & 8 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,15 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
)

tx_storage: TransactionStorage
event_storage: EventStorage
rocksdb_storage: RocksDBStorage
self.event_storage: Optional[EventStorage] = None
self.event_ws_factory: Optional[EventWebsocketFactory] = None

if args.memory_storage:
self.check_or_raise(not args.data, '--data should not be used with --memory-storage')
# if using MemoryStorage, no need to have cache
tx_storage = TransactionMemoryStorage()
if args.x_enable_event_queue:
self.event_storage = EventMemoryStorage()
event_storage = EventMemoryStorage()
self.check_or_raise(not args.x_rocksdb_indexes, 'RocksDB indexes require RocksDB data')
self.log.info('with storage', storage_class=type(tx_storage).__name__)
else:
Expand All @@ -118,8 +117,7 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
tx_storage = TransactionRocksDBStorage(rocksdb_storage,
with_index=(not args.cache),
use_memory_indexes=args.memory_indexes)
if args.x_enable_event_queue:
self.event_storage = EventRocksDBStorage(rocksdb_storage)
event_storage = EventRocksDBStorage(rocksdb_storage)

self.log.info('with storage', storage_class=type(tx_storage).__name__, path=args.data)
if args.cache:
Expand Down Expand Up @@ -147,10 +145,9 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa

event_manager: Optional[EventManager] = None
if args.x_enable_event_queue:
assert self.event_storage is not None, 'cannot create EventManager without EventStorage'
self.event_ws_factory = EventWebsocketFactory(reactor, self.event_storage)
self.event_ws_factory = EventWebsocketFactory(reactor, event_storage)
event_manager = EventManager(
event_storage=self.event_storage,
event_storage=event_storage,
event_ws_factory=self.event_ws_factory,
pubsub=pubsub,
reactor=reactor,
Expand Down Expand Up @@ -184,6 +181,7 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
network=network,
hostname=hostname,
tx_storage=tx_storage,
event_storage=event_storage,
event_manager=event_manager,
wallet=self.wallet,
stratum_port=args.stratum,
Expand Down
5 changes: 5 additions & 0 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from hathor.conf import HathorSettings
from hathor.consensus import ConsensusAlgorithm
from hathor.event.event_manager import EventManager
from hathor.event.storage import EventStorage
from hathor.exception import (
DoubleSpendingError,
HathorError,
Expand Down Expand Up @@ -87,6 +88,7 @@ def __init__(self,
consensus_algorithm: ConsensusAlgorithm,
peer_id: PeerId,
tx_storage: TransactionStorage,
event_storage: EventStorage,
network: str,
hostname: Optional[str] = None,
wallet: Optional[BaseWallet] = None,
Expand Down Expand Up @@ -168,6 +170,9 @@ def __init__(self,

self._event_manager = event_manager

if self._event_manager:
assert self._event_manager.event_storage == event_storage

if enable_sync_v2:
assert self.tx_storage.indexes is not None
self.log.debug('enable sync-v2 indexes')
Expand Down
15 changes: 10 additions & 5 deletions hathor/simulator/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
from hathor.builder import Builder
from hathor.conf import HathorSettings
from hathor.daa import TestMode, _set_test_mode
from hathor.event.websocket import EventWebsocketFactory
from hathor.manager import HathorManager
from hathor.p2p.peer_id import PeerId
from hathor.simulator.clock import HeapClock
from hathor.simulator.miner.geometric_miner import GeometricMiner
from hathor.simulator.tx_generator import RandomTransactionGenerator
from hathor.transaction.genesis import _get_genesis_transactions_unsafe
from hathor.transaction.storage.memory_storage import TransactionMemoryStorage
from hathor.util import Random
from hathor.wallet import HDWallet

Expand Down Expand Up @@ -137,26 +137,31 @@ def create_peer(
enable_sync_v1: bool = True,
enable_sync_v2: bool = True,
soft_voided_tx_ids: Optional[Set[bytes]] = None,
full_verification: bool = True
full_verification: bool = True,
event_ws_factory: Optional[EventWebsocketFactory] = None
) -> HathorManager:
assert self._started, 'Simulator is not started.'
assert peer_id is not None # XXX: temporary, for checking that tests are using the peer_id

wallet = HDWallet(gap_limit=2)
wallet._manually_initialize()

artifacts = Builder() \
builder = Builder() \
.set_reactor(self._clock) \
.set_peer_id(peer_id or PeerId()) \
.set_network(network or self._network) \
.set_wallet(wallet) \
.set_rng(Random(self.rng.getrandbits(64))) \
.set_tx_storage(TransactionMemoryStorage()) \
.set_enable_sync_v1(enable_sync_v1) \
.set_enable_sync_v2(enable_sync_v2) \
.set_full_verification(full_verification) \
.set_soft_voided_tx_ids(soft_voided_tx_ids or set()) \
.build()
.use_memory()

if event_ws_factory:
builder.enable_event_manager(event_ws_factory=event_ws_factory)

artifacts = builder.build()

artifacts.manager.start()
self.run_to_completion()
Expand Down
19 changes: 5 additions & 14 deletions tests/event/test_event_manager.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from unittest.mock import Mock

from hathor.event import EventManager
from hathor.event.model.event_type import EventType
from hathor.event.storage.memory_storage import EventMemoryStorage
from hathor.event.websocket import EventWebsocketFactory
from hathor.pubsub import HathorEvents, PubSubManager
from hathor.pubsub import HathorEvents
from tests import unittest


Expand All @@ -13,21 +12,13 @@ class BaseEventManagerTest(unittest.TestCase):

def setUp(self):
super().setUp()
self.event_storage = EventMemoryStorage()
self.event_ws_factory = Mock(spec_set=EventWebsocketFactory)
self.network = 'testnet'
pubsub = PubSubManager(self.clock)
self.event_manager = EventManager(
event_storage=self.event_storage,
event_ws_factory=self.event_ws_factory,
pubsub=pubsub,
reactor=self.clock
)
self.event_storage = EventMemoryStorage()
self.manager = self.create_peer(
self.network,
event_manager=self.event_manager,
pubsub=pubsub,
full_verification=False
event_ws_factory=Mock(spec_set=EventWebsocketFactory),
full_verification=False,
event_storage=self.event_storage
)

def test_if_event_is_persisted(self):
Expand Down
16 changes: 3 additions & 13 deletions tests/event/test_event_reorg.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from unittest.mock import Mock

from hathor.conf import HathorSettings
from hathor.event import EventManager
from hathor.event.model.event_type import EventType
from hathor.event.storage import EventMemoryStorage
from hathor.event.websocket import EventWebsocketFactory
from hathor.pubsub import PubSubManager
from tests import unittest
from tests.utils import add_new_blocks, get_genesis_key

Expand All @@ -18,20 +16,12 @@ class BaseEventReorgTest(unittest.TestCase):
def setUp(self):
super().setUp()
self.network = 'testnet'
self.event_ws_factory = Mock(spec_set=EventWebsocketFactory)
self.event_storage = EventMemoryStorage()
pubsub = PubSubManager(self.clock)
self.event_manager = EventManager(
event_storage=self.event_storage,
event_ws_factory=self.event_ws_factory,
pubsub=pubsub,
reactor=self.clock
)
self.manager = self.create_peer(
self.network,
event_manager=self.event_manager,
pubsub=pubsub,
full_verification=False
event_ws_factory=Mock(spec_set=EventWebsocketFactory),
full_verification=False,
event_storage=self.event_storage
)

# read genesis keys
Expand Down
Loading