diff --git a/hathor/builder/resources_builder.py b/hathor/builder/resources_builder.py index 8925c0d5b..0bfb98e19 100644 --- a/hathor/builder/resources_builder.py +++ b/hathor/builder/resources_builder.py @@ -14,6 +14,7 @@ import os from argparse import Namespace +from functools import partial from typing import TYPE_CHECKING, Any, Dict, Optional from autobahn.twisted.resource import WebSocketResource @@ -21,9 +22,11 @@ from twisted.web import server from twisted.web.resource import Resource +from hathor.conf import HathorSettings from hathor.event.resources.event import EventResource from hathor.exception import BuilderError from hathor.prometheus import PrometheusMetricsExporter +from hathor.pubsub import EventArguments, HathorEvents if TYPE_CHECKING: from hathor.event.websocket.factory import EventWebsocketFactory @@ -45,9 +48,27 @@ def __init__(self, manager: 'HathorManager', event_ws_factory: Optional['EventWe def build(self, args: Namespace) -> Optional[server.Site]: if args.prometheus: self.create_prometheus(args) - if args.status: - return self.create_resources(args) - return None + if not args.status: + return None + + root = Resource() + self.build_before_loading(args, root) + + self.manager.pubsub.subscribe(HathorEvents.LOAD_FINISHED, partial(self.on_load_finished, args, root)) + + settings = HathorSettings() + + real_root = Resource() + real_root.putChild(settings.API_VERSION_PREFIX.encode('ascii'), root) + + from hathor.profiler.site import SiteProfiler + status_server = SiteProfiler(real_root) + + return status_server + + def on_load_finished(self, args: Namespace, root: Resource, ev_key: HathorEvents, ev_args: EventArguments) -> None: + assert self._built_status is False + self.build_after_loading(args, root) def create_prometheus(self, args: Namespace) -> PrometheusMetricsExporter: kwargs: Dict[str, Any] = { @@ -66,8 +87,9 @@ def create_prometheus(self, args: Namespace) -> PrometheusMetricsExporter: self._built_prometheus = True return prometheus - def create_resources(self, args: Namespace) -> server.Site: - from hathor.conf import HathorSettings + def build_before_loading(self, args: Namespace, root: Resource) -> None: + """This method builds the resources that are safe to run while the full node is + loading.""" from hathor.debug_resources import ( DebugCrashResource, DebugLogResource, @@ -76,6 +98,43 @@ def create_resources(self, args: Namespace) -> server.Site: DebugRaiseResource, DebugRejectResource, ) + from hathor.profiler import get_cpu_profiler + from hathor.profiler.resources import CPUProfilerResource, ProfilerResource + from hathor.version_resource import VersionResource + + cpu = get_cpu_profiler() + + resources = [] + + if args.enable_debug_api: + debug_resource = Resource() + root.putChild(b'_debug', debug_resource) + resources.extend([ + (b'log', DebugLogResource(), debug_resource), + (b'raise', DebugRaiseResource(), debug_resource), + (b'reject', DebugRejectResource(), debug_resource), + (b'print', DebugPrintResource(), debug_resource), + ]) + + if args.enable_crash_api: + crash_resource = Resource() + root.putChild(b'_crash', crash_resource) + resources.extend([ + (b'exit', DebugCrashResource(), crash_resource), + (b'mess_around', DebugMessAroundResource(self.manager), crash_resource), + ]) + + resources.extend([ + (b'version', VersionResource(self.manager), root), + (b'profiler', ProfilerResource(self.manager), root), + (b'top', CPUProfilerResource(self.manager, cpu), root), + ]) + + for url_path, resource, parent in resources: + parent.putChild(url_path, resource) + + def build_after_loading(self, args: Namespace, root: Resource) -> None: + """This method builds all other resources after the loading is complete.""" from hathor.mining.ws import MiningWebsocketFactory from hathor.p2p.resources import ( AddPeersResource, @@ -85,8 +144,6 @@ def create_resources(self, args: Namespace) -> server.Site: NetfilterRuleResource, StatusResource, ) - from hathor.profiler import get_cpu_profiler - from hathor.profiler.resources import CPUProfilerResource, ProfilerResource from hathor.transaction.resources import ( BlockAtHeightResource, CreateTxResource, @@ -104,7 +161,6 @@ def create_resources(self, args: Namespace) -> server.Site: UtxoSearchResource, ValidateAddressResource, ) - from hathor.version_resource import VersionResource from hathor.wallet.resources import ( AddressResource, BalanceResource, @@ -130,11 +186,7 @@ def create_resources(self, args: Namespace) -> server.Site: ) from hathor.websocket import HathorAdminWebsocketFactory, WebsocketStatsResource - settings = HathorSettings() - cpu = get_cpu_profiler() - # TODO get this from a file. How should we do with the factory? - root = Resource() wallet_resource = Resource() root.putChild(b'wallet', wallet_resource) thin_wallet_resource = Resource() @@ -153,7 +205,6 @@ def create_resources(self, args: Namespace) -> server.Site: resources = [ (b'status', StatusResource(self.manager), root), - (b'version', VersionResource(self.manager), root), (b'create_tx', CreateTxResource(self.manager), root), (b'decode_tx', DecodeTxResource(self.manager), root), (b'validate_address', ValidateAddressResource(self.manager), root), @@ -165,8 +216,6 @@ def create_resources(self, args: Namespace) -> server.Site: (b'block_at_height', BlockAtHeightResource(self.manager), root), (b'transaction_acc_weight', TransactionAccWeightResource(self.manager), root), (b'dashboard_tx', DashboardTransactionResource(self.manager), root), - (b'profiler', ProfilerResource(self.manager), root), - (b'top', CPUProfilerResource(self.manager, cpu), root), (b'mempool', MempoolResource(self.manager), root), # mining (b'mining', MiningResource(self.manager), root), @@ -196,23 +245,6 @@ def create_resources(self, args: Namespace) -> server.Site: (b'utxo_search', UtxoSearchResource(self.manager), root), ]) - if args.enable_debug_api: - debug_resource = Resource() - root.putChild(b'_debug', debug_resource) - resources.extend([ - (b'log', DebugLogResource(), debug_resource), - (b'raise', DebugRaiseResource(), debug_resource), - (b'reject', DebugRejectResource(), debug_resource), - (b'print', DebugPrintResource(), debug_resource), - ]) - if args.enable_crash_api: - crash_resource = Resource() - root.putChild(b'_crash', crash_resource) - resources.extend([ - (b'exit', DebugCrashResource(), crash_resource), - (b'mess_around', DebugMessAroundResource(self.manager), crash_resource), - ]) - for url_path, resource, parent in resources: parent.putChild(url_path, resource) @@ -257,15 +289,9 @@ def create_resources(self, args: Namespace) -> server.Site: # Websocket stats resource root.putChild(b'websocket_stats', WebsocketStatsResource(ws_factory)) - real_root = Resource() - real_root.putChild(settings.API_VERSION_PREFIX.encode('ascii'), root) - - from hathor.profiler.site import SiteProfiler - status_server = SiteProfiler(real_root) self.log.info('with status', listen=args.status, with_wallet_api=with_wallet_api) # Set websocket factory in metrics self.manager.metrics.websocket_factory = ws_factory self._built_status = True - return status_server diff --git a/hathor/cli/run_node.py b/hathor/cli/run_node.py index 4c4169201..b286f37f9 100644 --- a/hathor/cli/run_node.py +++ b/hathor/cli/run_node.py @@ -15,6 +15,7 @@ import os import sys from argparse import SUPPRESS, ArgumentParser, Namespace +from functools import partial from typing import Any, Callable, List, Tuple from pydantic import ValidationError @@ -22,6 +23,7 @@ from hathor.conf import TESTNET_SETTINGS_FILEPATH, HathorSettings from hathor.exception import PreInitializationError +from hathor.pubsub import HathorEvents logger = get_logger() # LOGGING_CAPTURE_STDOUT = True @@ -141,10 +143,6 @@ def prepare(self, args: Namespace, *, register_resources: bool = True) -> None: self.tx_storage = self.manager.tx_storage self.wallet = self.manager.wallet - self.start_manager(args) - - if args.stratum: - self.reactor.listenTCP(args.stratum, self.manager.stratum_factory) if register_resources: resources_builder = ResourcesBuilder(self.manager, builder.event_ws_factory) @@ -171,6 +169,13 @@ def prepare(self, args: Namespace, *, register_resources: bool = True) -> None: rocksdb_storage=getattr(builder, 'rocksdb_storage', None), stratum_factory=self.manager.stratum_factory, ) + if args.sysctl: + self.init_sysctl(args.sysctl) + + def on_load_finished(self, args: Namespace) -> None: + # resources_builder.build_after_loading(args, root) + if args.stratum: + self.reactor.listenTCP(args.stratum, self.manager.stratum_factory) def start_sentry_if_possible(self, args: Namespace) -> None: """Start Sentry integration if possible.""" @@ -194,8 +199,8 @@ def start_sentry_if_possible(self, args: Namespace) -> None: ) def start_manager(self, args: Namespace) -> None: - self.start_sentry_if_possible(args) - self.manager.start() + self.manager.pubsub.subscribe(HathorEvents.LOAD_FINISHED, partial(self.on_load_finished, args)) + self.reactor.callLater(0, self.manager.start) def register_signal_handlers(self, args: Namespace) -> None: """Register signal handlers.""" @@ -334,8 +339,9 @@ def __init__(self, *, argv=None): self.prepare(args) self.register_signal_handlers(args) - if args.sysctl: - self.init_sysctl(args.sysctl) + self.prepare(args) + self.start_sentry_if_possible(args) + self.start_manager(args) def init_sysctl(self, description: str) -> None: """Initialize sysctl and listen for connections. diff --git a/hathor/indexes/base_index.py b/hathor/indexes/base_index.py index 6d452f6d3..c46494381 100644 --- a/hathor/indexes/base_index.py +++ b/hathor/indexes/base_index.py @@ -16,12 +16,15 @@ from typing import TYPE_CHECKING, Optional from structlog import get_logger +from twisted.internet import defer +from twisted.internet.defer import Deferred from hathor.indexes.scope import Scope from hathor.transaction.base_transaction import BaseTransaction if TYPE_CHECKING: # pragma: no cover from hathor.indexes.manager import IndexesManager + from hathor.util import Reactor logger = get_logger() @@ -35,12 +38,12 @@ class BaseIndex(ABC): def __init__(self) -> None: self.log = logger.new() - def init_start(self, indexes_manager: 'IndexesManager') -> None: + def init_start(self, reactor: 'Reactor', indexes_manager: 'IndexesManager') -> Deferred: """ This method will always be called when starting the index manager, regardless of initialization state. It comes with a no-op implementation by default because usually indexes will not need this. """ - pass + return defer.succeed(None) @abstractmethod def get_scope(self) -> Scope: diff --git a/hathor/indexes/manager.py b/hathor/indexes/manager.py index 98aca2440..9379b5f09 100644 --- a/hathor/indexes/manager.py +++ b/hathor/indexes/manager.py @@ -13,11 +13,14 @@ # limitations under the License. import operator +import time from abc import ABC, abstractmethod from functools import reduce -from typing import TYPE_CHECKING, Iterator, List, Optional +from typing import TYPE_CHECKING, Any, Generator, Iterator, List, Optional from structlog import get_logger +from twisted.internet.defer import inlineCallbacks +from twisted.internet.task import deferLater from hathor.indexes.address_index import AddressIndex from hathor.indexes.base_index import BaseIndex @@ -29,6 +32,7 @@ from hathor.indexes.tips_index import ScopeType as TipsScopeType, TipsIndex from hathor.indexes.tokens_index import TokensIndex from hathor.indexes.utxo_index import UtxoIndex +from hathor.profiler import get_cpu_profiler from hathor.transaction import BaseTransaction from hathor.util import progress @@ -37,8 +41,10 @@ from hathor.pubsub import PubSubManager from hathor.transaction.storage import TransactionStorage + from hathor.util import Reactor logger = get_logger() +cpu = get_cpu_profiler() MAX_CACHE_SIZE_DURING_LOAD = 1000 @@ -52,6 +58,8 @@ class IndexesManager(ABC): log = get_logger() + reactor: 'Reactor' + info: InfoIndex all_tips: TipsIndex block_tips: TipsIndex @@ -129,7 +137,8 @@ def force_clear_all(self) -> None: for index in self.iter_all_indexes(): index.force_clear() - def _manually_initialize(self, tx_storage: 'TransactionStorage') -> None: + @inlineCallbacks + def _manually_initialize(self, tx_storage: 'TransactionStorage') -> Generator[Any, Any, None]: """ Initialize the indexes, checking the indexes that need initialization, and the optimal iterator to use. """ from hathor.transaction.storage.transaction_storage import NULL_INDEX_LAST_STARTED_AT @@ -168,7 +177,8 @@ def _manually_initialize(self, tx_storage: 'TransactionStorage') -> None: self.log.debug('indexes pre-init') for index in self.iter_all_indexes(): - index.init_start(self) + self.log.debug('index.init_start', index=index) + yield index.init_start(self.reactor, self) if indexes_to_init: overall_scope = reduce(operator.__or__, map(lambda i: i.get_scope(), indexes_to_init)) @@ -179,11 +189,24 @@ def _manually_initialize(self, tx_storage: 'TransactionStorage') -> None: tx_iter = iter([]) self.log.debug('indexes init') + t0 = time.time() for tx in tx_iter: + t1 = time.time() + if t1 - t0 > 0.1: + yield deferLater(self.reactor, 0, lambda: None) + t0 = t1 # feed each transaction to the indexes that they are interested in + cpu.mark_begin('IndexManager.init_loop_step') for index in indexes_to_init: if index.get_scope().matches(tx): - index.init_loop_step(tx) + db_name = index.get_db_name() or '' + key = f'{index.__class__.__name__}.init_loop_step!{db_name}' + try: + cpu.mark_begin(key) + index.init_loop_step(tx) + finally: + cpu.mark_end(key) + cpu.mark_end('IndexManager.init_loop_step') # Restore cache capacity. if isinstance(tx_storage, TransactionCacheStorage): @@ -279,12 +302,14 @@ def del_tx(self, tx: BaseTransaction, *, remove_all: bool = False, relax_assert: class MemoryIndexesManager(IndexesManager): - def __init__(self) -> None: + def __init__(self, reactor: 'Reactor') -> None: from hathor.indexes.memory_height_index import MemoryHeightIndex from hathor.indexes.memory_info_index import MemoryInfoIndex from hathor.indexes.memory_timestamp_index import MemoryTimestampIndex from hathor.indexes.memory_tips_index import MemoryTipsIndex + self.reactor = reactor + self.info = MemoryInfoIndex() self.all_tips = MemoryTipsIndex(scope_type=TipsScopeType.ALL) self.block_tips = MemoryTipsIndex(scope_type=TipsScopeType.BLOCKS) @@ -331,12 +356,13 @@ def enable_deps_index(self) -> None: class RocksDBIndexesManager(IndexesManager): - def __init__(self, db: 'rocksdb.DB') -> None: + def __init__(self, reactor: 'Reactor', db: 'rocksdb.DB') -> None: from hathor.indexes.partial_rocksdb_tips_index import PartialRocksDBTipsIndex from hathor.indexes.rocksdb_height_index import RocksDBHeightIndex from hathor.indexes.rocksdb_info_index import RocksDBInfoIndex from hathor.indexes.rocksdb_timestamp_index import RocksDBTimestampIndex + self.reactor = reactor self._db = db self.info = RocksDBInfoIndex(self._db) diff --git a/hathor/indexes/memory_info_index.py b/hathor/indexes/memory_info_index.py index cddd73ff6..e2f511168 100644 --- a/hathor/indexes/memory_info_index.py +++ b/hathor/indexes/memory_info_index.py @@ -14,11 +14,15 @@ from typing import TYPE_CHECKING, Optional +from twisted.internet import defer +from twisted.internet.defer import Deferred + from hathor.indexes.info_index import InfoIndex from hathor.transaction import BaseTransaction if TYPE_CHECKING: # pragma: no cover from hathor.indexes.manager import IndexesManager + from hathor.util import Reactor class MemoryInfoIndex(InfoIndex): @@ -28,8 +32,9 @@ def __init__(self): self._first_timestamp = 0 self._latest_timestamp = 0 - def init_start(self, indexes_manager: 'IndexesManager') -> None: + def init_start(self, reactor: 'Reactor', indexes_manager: 'IndexesManager') -> Deferred: self.force_clear() + return defer.succeed(None) def get_db_name(self) -> Optional[str]: return None diff --git a/hathor/indexes/memory_tips_index.py b/hathor/indexes/memory_tips_index.py index b8b8c6310..3487327fb 100644 --- a/hathor/indexes/memory_tips_index.py +++ b/hathor/indexes/memory_tips_index.py @@ -19,9 +19,11 @@ from structlog import get_logger from hathor.indexes.tips_index import ScopeType, TipsIndex +from hathor.profiler import get_cpu_profiler from hathor.transaction import BaseTransaction logger = get_logger() +cpu = get_cpu_profiler() class MemoryTipsIndex(TipsIndex): @@ -66,13 +68,16 @@ def init_loop_step(self, tx: BaseTransaction) -> None: return self.add_tx(tx) + @cpu.profiler('_add_interval') def _add_interval(self, interval: Interval) -> None: self.tree.add(interval) self.tx_last_interval[interval.data] = interval + @cpu.profiler('_del_interval') def _del_interval(self, interval: Interval) -> None: self.tree.remove(interval) + @cpu.profiler(key=lambda _, tx: 'add_tx!{}'.format(tx.hash.hex())) def add_tx(self, tx: BaseTransaction) -> bool: """ Add a new transaction to the index @@ -107,6 +112,7 @@ def add_tx(self, tx: BaseTransaction) -> bool: self._add_interval(interval) return True + @cpu.profiler(key=lambda _, tx: 'del_tx!{}'.format(tx.hash.hex())) def del_tx(self, tx: BaseTransaction, *, relax_assert: bool = False) -> None: """ Remove a transaction from the index. """ diff --git a/hathor/indexes/partial_rocksdb_tips_index.py b/hathor/indexes/partial_rocksdb_tips_index.py index b41252d11..81e71d1db 100644 --- a/hathor/indexes/partial_rocksdb_tips_index.py +++ b/hathor/indexes/partial_rocksdb_tips_index.py @@ -14,16 +14,18 @@ import math import time -from typing import TYPE_CHECKING, Dict, Iterator, Optional, Union +from typing import TYPE_CHECKING, Any, Dict, Generator, Iterator, Optional, Union import structlog from intervaltree import Interval, IntervalTree from structlog import get_logger +from twisted.internet.defer import inlineCallbacks +from twisted.internet.task import deferLater from hathor.indexes.memory_tips_index import MemoryTipsIndex from hathor.indexes.rocksdb_utils import RocksDBIndexUtils from hathor.indexes.tips_index import ScopeType -from hathor.util import LogDuration +from hathor.util import LogDuration, Reactor if TYPE_CHECKING: # pragma: no cover import rocksdb @@ -140,7 +142,8 @@ def _from_key(self, key: bytes) -> Interval: assert len(tx_id) == 32 return Interval(_from_db_value(begin), _from_db_value(end), tx_id) - def init_start(self, indexes_manager: 'IndexesManager') -> None: + @inlineCallbacks + def init_start(self, reactor: 'Reactor', indexes_manager: 'IndexesManager') -> Generator[Any, Any, None]: log = self.log.new(index=f'tips-{self._name}') total: Optional[int] if self is indexes_manager.all_tips: @@ -153,6 +156,7 @@ def init_start(self, indexes_manager: 'IndexesManager') -> None: log.info('index not identified, skipping total count') total = None for iv in progress(self._iter_intervals_db(), log=log, total=total): + yield deferLater(reactor, 0, lambda: None) self.tree.add(iv) self.tx_last_interval[iv.data] = iv diff --git a/hathor/indexes/rocksdb_info_index.py b/hathor/indexes/rocksdb_info_index.py index 6b6025146..310792d6c 100644 --- a/hathor/indexes/rocksdb_info_index.py +++ b/hathor/indexes/rocksdb_info_index.py @@ -15,6 +15,8 @@ from typing import TYPE_CHECKING, Optional from structlog import get_logger +from twisted.internet import defer +from twisted.internet.defer import Deferred from hathor.indexes.memory_info_index import MemoryInfoIndex from hathor.indexes.rocksdb_utils import RocksDBIndexUtils @@ -24,6 +26,7 @@ import rocksdb from hathor.indexes.manager import IndexesManager + from hathor.util import Reactor logger = get_logger() @@ -42,10 +45,11 @@ def __init__(self, db: 'rocksdb.DB', *, cf_name: Optional[bytes] = None) -> None RocksDBIndexUtils.__init__(self, db, cf_name or _CF_NAME_ADDRESS_INDEX) MemoryInfoIndex.__init__(self) - def init_start(self, indexes_manager: 'IndexesManager') -> None: + def init_start(self, reactor: 'Reactor', indexes_manager: 'IndexesManager') -> Deferred: self._load_all_values() self.log.info('loaded info-index', block_count=self._block_count, tx_count=self._tx_count, first_timestamp=self._first_timestamp, latest_timestamp=self._latest_timestamp) + return defer.succeed(None) def get_db_name(self) -> Optional[str]: return _DB_NAME diff --git a/hathor/manager.py b/hathor/manager.py index 52232a4c7..15b9b1dc5 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -16,12 +16,12 @@ import sys import time from enum import Enum -from typing import Any, Iterable, Iterator, List, NamedTuple, Optional, Tuple, Union +from typing import Any, Generator, Iterable, Iterator, List, NamedTuple, Optional, Tuple, Union from hathorlib.base_transaction import tx_or_block_from_bytes as lib_tx_or_block_from_bytes from structlog import get_logger from twisted.internet import defer -from twisted.internet.defer import Deferred +from twisted.internet.defer import Deferred, inlineCallbacks from twisted.internet.task import LoopingCall from twisted.python.threadpool import ThreadPool @@ -217,7 +217,8 @@ def __init__(self, self.lc_check_sync_state.clock = self.reactor self.lc_check_sync_state_interval = self.CHECK_SYNC_STATE_INTERVAL - def start(self) -> None: + @inlineCallbacks + def start(self) -> Generator[Any, Any, None]: """ A factory must be started only once. And it is usually automatically started. """ if self.is_started: @@ -258,8 +259,6 @@ def start(self) -> None: self.state = self.NodeState.INITIALIZING self.pubsub.publish(HathorEvents.MANAGER_ON_START) - self.connections.start() - self.pow_thread_pool.start() # Disable get transaction lock when initializing components self.tx_storage.disable_lock() @@ -271,9 +270,12 @@ def start(self) -> None: # finish it. It's just to know if the full node has stopped a full initialization in the middle self.tx_storage.finish_full_verification() else: - self._initialize_components_new() + yield self._initialize_components_new() self.tx_storage.enable_lock() + self.connections.start() + self.pow_thread_pool.start() + # Metric starts to capture data self.metrics.start() @@ -553,7 +555,8 @@ def _initialize_components(self) -> None: self.log.info('ready', vertex_count=cnt, tx_rate=tx_rate, total_load_time=total_load_time, height=h, blocks=block_count, txs=tx_count, **environment_info) - def _initialize_components_new(self) -> None: + @inlineCallbacks + def _initialize_components_new(self) -> Generator[Any, Any, None]: """You are not supposed to run this method manually. You should run `doStart()` to initialize the manager. @@ -605,7 +608,7 @@ def _initialize_components_new(self) -> None: # TODO: move support for full-verification here, currently we rely on the original _initialize_components # method for full-verification to work, if we implement it here we'll reduce a lot of duplicate and # complex code - self.tx_storage.indexes._manually_initialize(self.tx_storage) + yield self.tx_storage.indexes._manually_initialize(self.tx_storage) # Verify if all checkpoints that exist in the database are correct try: diff --git a/hathor/simulator/simulator.py b/hathor/simulator/simulator.py index 8320d6c78..d1ee4c041 100644 --- a/hathor/simulator/simulator.py +++ b/hathor/simulator/simulator.py @@ -174,7 +174,7 @@ def create_peer( artifacts = builder.build() - artifacts.manager.start() + self._clock.callLater(0, artifacts.manager.start) self.run_to_completion() # Don't use it anywhere else. It is unsafe to generate mnemonic words like this. diff --git a/hathor/transaction/storage/cache_storage.py b/hathor/transaction/storage/cache_storage.py index dfc6b0fec..962a4d3b3 100644 --- a/hathor/transaction/storage/cache_storage.py +++ b/hathor/transaction/storage/cache_storage.py @@ -16,6 +16,7 @@ from typing import Any, Iterator, Optional, Set from twisted.internet import threads +from twisted.internet.interfaces import IDelayedCall from hathor.indexes import IndexesManager from hathor.transaction import BaseTransaction @@ -59,6 +60,7 @@ def __init__(self, store: 'BaseTransactionStorage', reactor: Reactor, interval: self.interval = interval self.capacity = capacity self.flush_deferred = None + self._flush_call_later: Optional[IDelayedCall] = None self._clone_if_needed = _clone_if_needed self.cache = OrderedDict() # dirty_txs has the txs that have been modified but are not persisted yet @@ -94,7 +96,17 @@ def set_migration_state(self, migration_name: str, state: MigrationState) -> Non def pre_init(self) -> None: # XXX: not calling self.store.pre_init() because it would run `BaseTransactionStorage.pre_init` twice. super().pre_init() - self.reactor.callLater(self.interval, self._start_flush_thread) + self._schedule_next_flush() + + def _clean_up(self) -> None: + if self._flush_call_later and self._flush_call_later.active(): + self._flush_call_later.cancel() + self._flush_call_later = None + + def _schedule_next_flush(self) -> None: + if self._flush_call_later and self._flush_call_later.active(): + self._flush_call_later.cancel() + self._flush_call_later = self.reactor.callLater(self.interval, self._start_flush_thread) def _enable_weakref(self) -> None: super()._enable_weakref() @@ -112,12 +124,12 @@ def _start_flush_thread(self) -> None: self.flush_deferred = deferred def _cb_flush_thread(self, flushed_txs: Set[bytes]) -> None: - self.reactor.callLater(self.interval, self._start_flush_thread) + self._schedule_next_flush() self.flush_deferred = None def _err_flush_thread(self, reason: Any) -> None: self.log.error('error flushing transactions', reason=reason) - self.reactor.callLater(self.interval, self._start_flush_thread) + self._schedule_next_flush() self.flush_deferred = None def _flush_to_storage(self, dirty_txs_copy: Set[bytes]) -> None: diff --git a/hathor/transaction/storage/rocksdb_storage.py b/hathor/transaction/storage/rocksdb_storage.py index 5daa51815..d6e4900f0 100644 --- a/hathor/transaction/storage/rocksdb_storage.py +++ b/hathor/transaction/storage/rocksdb_storage.py @@ -65,9 +65,9 @@ def _load_from_bytes(self, tx_data: bytes, meta_data: bytes) -> 'BaseTransaction def _build_indexes_manager(self) -> IndexesManager: if self._use_memory_indexes: - return MemoryIndexesManager() + return MemoryIndexesManager(self.reactor) else: - return RocksDBIndexesManager(self._db) + return RocksDBIndexesManager(self.reactor, self._db) def _tx_to_bytes(self, tx: 'BaseTransaction') -> bytes: return bytes(tx) diff --git a/hathor/transaction/storage/transaction_storage.py b/hathor/transaction/storage/transaction_storage.py index 8970cc8ee..15dd16de8 100644 --- a/hathor/transaction/storage/transaction_storage.py +++ b/hathor/transaction/storage/transaction_storage.py @@ -17,11 +17,12 @@ from collections import defaultdict, deque from contextlib import AbstractContextManager from threading import Lock -from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Set, Tuple, Type, cast +from typing import Any, Dict, Generator, Iterator, List, NamedTuple, Optional, Set, Tuple, Type, cast from weakref import WeakValueDictionary from intervaltree.interval import Interval from structlog import get_logger +from twisted.internet.defer import inlineCallbacks from hathor.conf import HathorSettings from hathor.indexes import IndexesManager, MemoryIndexesManager @@ -86,6 +87,9 @@ class TransactionStorage(ABC): _migrations: List[BaseMigration] def __init__(self) -> None: + from hathor.util import Reactor, reactor as twisted_reactor + self.reactor: Reactor = twisted_reactor + # Weakref is used to guarantee that there is only one instance of each transaction in memory. self._tx_weakref: WeakValueDictionary[bytes, BaseTransaction] = WeakValueDictionary() self._tx_weakref_disabled: bool = False @@ -750,7 +754,8 @@ def get_newer_txs_after(self, timestamp: int, hash_bytes: bytes, count: int) -> raise NotImplementedError @abstractmethod - def _manually_initialize(self) -> None: + @inlineCallbacks + def _manually_initialize(self) -> Generator[Any, Any, None]: # XXX: maybe refactor, this is actually part of the public interface """Caches must be initialized. This function should not be called, because usually the HathorManager will handle all this initialization. @@ -1067,7 +1072,7 @@ def _save_transaction(self, tx: BaseTransaction, *, only_metadata: bool = False) raise NotImplementedError def _build_indexes_manager(self) -> IndexesManager: - return MemoryIndexesManager() + return MemoryIndexesManager(self.reactor) def reset_indexes(self) -> None: """Reset all indexes. This function should not be called unless you know what you are doing.""" @@ -1178,12 +1183,14 @@ def get_newer_txs_after(self, timestamp: int, hash_bytes: bytes, count: int) -> txs = [self.get_transaction(tx_hash) for tx_hash in tx_hashes] return txs, has_more - def _manually_initialize(self) -> None: - self._manually_initialize_indexes() + @inlineCallbacks + def _manually_initialize(self) -> Generator[Any, Any, None]: + yield self._manually_initialize_indexes() - def _manually_initialize_indexes(self) -> None: + @inlineCallbacks + def _manually_initialize_indexes(self) -> Generator[Any, Any, None]: if self.indexes is not None: - self.indexes._manually_initialize(self) + yield self.indexes._manually_initialize(self) def _topological_sort_timestamp_index(self) -> Iterator[BaseTransaction]: assert self.indexes is not None diff --git a/tests/tx/test_cache_storage.py b/tests/tx/test_cache_storage.py index bae11ca40..3a6cd48ac 100644 --- a/tests/tx/test_cache_storage.py +++ b/tests/tx/test_cache_storage.py @@ -1,3 +1,5 @@ +from twisted.internet.defer import inlineCallbacks + from hathor.daa import TestMode, _set_test_mode from hathor.transaction import Transaction, TransactionMetadata from hathor.transaction.storage import TransactionCacheStorage, TransactionMemoryStorage @@ -10,12 +12,13 @@ class BaseCacheStorageTest(unittest.TestCase): __test__ = False + @inlineCallbacks def setUp(self): super().setUp() store = TransactionMemoryStorage(with_index=False) self.cache_storage = TransactionCacheStorage(store, self.clock, capacity=5) - self.cache_storage._manually_initialize() + yield self.cache_storage._manually_initialize() self.cache_storage.pre_init() self.genesis = self.cache_storage.get_all_genesis() @@ -25,10 +28,11 @@ def setUp(self): # Save genesis metadata self.cache_storage.save_transaction(self.genesis_txs[0], only_metadata=True) - self.manager = self.create_peer('testnet', tx_storage=self.cache_storage, unlock_wallet=True) + self.manager = yield self.create_peer('testnet', tx_storage=self.cache_storage, unlock_wallet=True) def tearDown(self): super().tearDown() + self.cache_storage._clean_up() def _get_new_tx(self, nonce): from hathor.transaction.validation_state import ValidationState diff --git a/tests/tx/test_indexes4.py b/tests/tx/test_indexes4.py index 8b31ba4e0..b74f56c4d 100644 --- a/tests/tx/test_indexes4.py +++ b/tests/tx/test_indexes4.py @@ -1,3 +1,5 @@ +from twisted.internet.defer import inlineCallbacks + from hathor.crypto.util import decode_address from hathor.transaction import Transaction from hathor.transaction.storage import TransactionMemoryStorage @@ -53,6 +55,7 @@ def _build_randomized_blockchain(self, *, utxo_index=False): assert manager.propagate_tx(tx) return manager + @inlineCallbacks def test_index_initialization(self): from copy import deepcopy @@ -79,9 +82,9 @@ def test_index_initialization(self): base_utxo_index = deepcopy(tx_storage.indexes.utxo._index) # reset the indexes and force a re-initialization of all indexes - tx_storage._manually_initialize() + yield tx_storage._manually_initialize() tx_storage.indexes.enable_address_index(self.manager.pubsub) - tx_storage._manually_initialize_indexes() + yield tx_storage._manually_initialize_indexes() reinit_all_tips_tree = tx_storage.indexes.all_tips.tree.copy() reinit_block_tips_tree = tx_storage.indexes.block_tips.tree.copy() @@ -96,9 +99,9 @@ def test_index_initialization(self): self.assertEqual(reinit_utxo_index, base_utxo_index) # reset again - tx_storage._manually_initialize() + yield tx_storage._manually_initialize() tx_storage.indexes.enable_address_index(self.manager.pubsub) - tx_storage._manually_initialize_indexes() + yield tx_storage._manually_initialize_indexes() newinit_all_tips_tree = tx_storage.indexes.all_tips.tree.copy() newinit_block_tips_tree = tx_storage.indexes.block_tips.tree.copy() diff --git a/tests/unittest.py b/tests/unittest.py index 476e8bff7..c09205ae7 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -248,8 +248,11 @@ def create_peer(self, network, peer_id=None, wallet=None, tx_storage=None, unloc manager.avg_time_between_blocks = 0.0001 if start_manager: - manager.start() + # We don't need to yield because run_to_completion() will run until + # the node is fully initialized. + deferred = manager.start() self.run_to_completion() + self.assertTrue(deferred.called) return manager def run_to_completion(self):