Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 64 additions & 38 deletions hathor/builder/resources_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@

import os
from argparse import Namespace
from functools import partial
from typing import TYPE_CHECKING, Any, Dict, Optional

from autobahn.twisted.resource import WebSocketResource
from structlog import get_logger
from twisted.web import server
from twisted.web.resource import Resource

from hathor.conf import HathorSettings
from hathor.event.resources.event import EventResource
from hathor.exception import BuilderError
from hathor.prometheus import PrometheusMetricsExporter
from hathor.pubsub import EventArguments, HathorEvents

if TYPE_CHECKING:
from hathor.event.websocket.factory import EventWebsocketFactory
Expand All @@ -45,9 +48,27 @@ def __init__(self, manager: 'HathorManager', event_ws_factory: Optional['EventWe
def build(self, args: Namespace) -> Optional[server.Site]:
if args.prometheus:
self.create_prometheus(args)
if args.status:
return self.create_resources(args)
return None
if not args.status:
return None

root = Resource()
self.build_before_loading(args, root)

self.manager.pubsub.subscribe(HathorEvents.LOAD_FINISHED, partial(self.on_load_finished, args, root))

settings = HathorSettings()

real_root = Resource()
real_root.putChild(settings.API_VERSION_PREFIX.encode('ascii'), root)

from hathor.profiler.site import SiteProfiler
status_server = SiteProfiler(real_root)

return status_server

def on_load_finished(self, args: Namespace, root: Resource, ev_key: HathorEvents, ev_args: EventArguments) -> None:
assert self._built_status is False
self.build_after_loading(args, root)

def create_prometheus(self, args: Namespace) -> PrometheusMetricsExporter:
kwargs: Dict[str, Any] = {
Expand All @@ -66,8 +87,9 @@ def create_prometheus(self, args: Namespace) -> PrometheusMetricsExporter:
self._built_prometheus = True
return prometheus

def create_resources(self, args: Namespace) -> server.Site:
from hathor.conf import HathorSettings
def build_before_loading(self, args: Namespace, root: Resource) -> None:
"""This method builds the resources that are safe to run while the full node is
loading."""
from hathor.debug_resources import (
DebugCrashResource,
DebugLogResource,
Expand All @@ -76,6 +98,43 @@ def create_resources(self, args: Namespace) -> server.Site:
DebugRaiseResource,
DebugRejectResource,
)
from hathor.profiler import get_cpu_profiler
from hathor.profiler.resources import CPUProfilerResource, ProfilerResource
from hathor.version_resource import VersionResource

cpu = get_cpu_profiler()

resources = []

if args.enable_debug_api:
debug_resource = Resource()
root.putChild(b'_debug', debug_resource)
resources.extend([
(b'log', DebugLogResource(), debug_resource),
(b'raise', DebugRaiseResource(), debug_resource),
(b'reject', DebugRejectResource(), debug_resource),
(b'print', DebugPrintResource(), debug_resource),
])

if args.enable_crash_api:
crash_resource = Resource()
root.putChild(b'_crash', crash_resource)
resources.extend([
(b'exit', DebugCrashResource(), crash_resource),
(b'mess_around', DebugMessAroundResource(self.manager), crash_resource),
])

resources.extend([
(b'version', VersionResource(self.manager), root),
(b'profiler', ProfilerResource(self.manager), root),
(b'top', CPUProfilerResource(self.manager, cpu), root),
])

for url_path, resource, parent in resources:
parent.putChild(url_path, resource)

def build_after_loading(self, args: Namespace, root: Resource) -> None:
"""This method builds all other resources after the loading is complete."""
from hathor.mining.ws import MiningWebsocketFactory
from hathor.p2p.resources import (
AddPeersResource,
Expand All @@ -85,8 +144,6 @@ def create_resources(self, args: Namespace) -> server.Site:
NetfilterRuleResource,
StatusResource,
)
from hathor.profiler import get_cpu_profiler
from hathor.profiler.resources import CPUProfilerResource, ProfilerResource
from hathor.transaction.resources import (
BlockAtHeightResource,
CreateTxResource,
Expand All @@ -104,7 +161,6 @@ def create_resources(self, args: Namespace) -> server.Site:
UtxoSearchResource,
ValidateAddressResource,
)
from hathor.version_resource import VersionResource
from hathor.wallet.resources import (
AddressResource,
BalanceResource,
Expand All @@ -130,11 +186,7 @@ def create_resources(self, args: Namespace) -> server.Site:
)
from hathor.websocket import HathorAdminWebsocketFactory, WebsocketStatsResource

settings = HathorSettings()
cpu = get_cpu_profiler()

# TODO get this from a file. How should we do with the factory?
root = Resource()
wallet_resource = Resource()
root.putChild(b'wallet', wallet_resource)
thin_wallet_resource = Resource()
Expand All @@ -153,7 +205,6 @@ def create_resources(self, args: Namespace) -> server.Site:

resources = [
(b'status', StatusResource(self.manager), root),
(b'version', VersionResource(self.manager), root),
(b'create_tx', CreateTxResource(self.manager), root),
(b'decode_tx', DecodeTxResource(self.manager), root),
(b'validate_address', ValidateAddressResource(self.manager), root),
Expand All @@ -165,8 +216,6 @@ def create_resources(self, args: Namespace) -> server.Site:
(b'block_at_height', BlockAtHeightResource(self.manager), root),
(b'transaction_acc_weight', TransactionAccWeightResource(self.manager), root),
(b'dashboard_tx', DashboardTransactionResource(self.manager), root),
(b'profiler', ProfilerResource(self.manager), root),
(b'top', CPUProfilerResource(self.manager, cpu), root),
(b'mempool', MempoolResource(self.manager), root),
# mining
(b'mining', MiningResource(self.manager), root),
Expand Down Expand Up @@ -196,23 +245,6 @@ def create_resources(self, args: Namespace) -> server.Site:
(b'utxo_search', UtxoSearchResource(self.manager), root),
])

if args.enable_debug_api:
debug_resource = Resource()
root.putChild(b'_debug', debug_resource)
resources.extend([
(b'log', DebugLogResource(), debug_resource),
(b'raise', DebugRaiseResource(), debug_resource),
(b'reject', DebugRejectResource(), debug_resource),
(b'print', DebugPrintResource(), debug_resource),
])
if args.enable_crash_api:
crash_resource = Resource()
root.putChild(b'_crash', crash_resource)
resources.extend([
(b'exit', DebugCrashResource(), crash_resource),
(b'mess_around', DebugMessAroundResource(self.manager), crash_resource),
])

for url_path, resource, parent in resources:
parent.putChild(url_path, resource)

Expand Down Expand Up @@ -257,15 +289,9 @@ def create_resources(self, args: Namespace) -> server.Site:
# Websocket stats resource
root.putChild(b'websocket_stats', WebsocketStatsResource(ws_factory))

real_root = Resource()
real_root.putChild(settings.API_VERSION_PREFIX.encode('ascii'), root)

from hathor.profiler.site import SiteProfiler
status_server = SiteProfiler(real_root)
self.log.info('with status', listen=args.status, with_wallet_api=with_wallet_api)

# Set websocket factory in metrics
self.manager.metrics.websocket_factory = ws_factory

self._built_status = True
return status_server
22 changes: 14 additions & 8 deletions hathor/cli/run_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
import os
import sys
from argparse import SUPPRESS, ArgumentParser, Namespace
from functools import partial
from typing import Any, Callable, List, Tuple

from pydantic import ValidationError
from structlog import get_logger

from hathor.conf import TESTNET_SETTINGS_FILEPATH, HathorSettings
from hathor.exception import PreInitializationError
from hathor.pubsub import HathorEvents

logger = get_logger()
# LOGGING_CAPTURE_STDOUT = True
Expand Down Expand Up @@ -141,10 +143,6 @@ def prepare(self, args: Namespace, *, register_resources: bool = True) -> None:

self.tx_storage = self.manager.tx_storage
self.wallet = self.manager.wallet
self.start_manager(args)

if args.stratum:
self.reactor.listenTCP(args.stratum, self.manager.stratum_factory)

if register_resources:
resources_builder = ResourcesBuilder(self.manager, builder.event_ws_factory)
Expand All @@ -171,6 +169,13 @@ def prepare(self, args: Namespace, *, register_resources: bool = True) -> None:
rocksdb_storage=getattr(builder, 'rocksdb_storage', None),
stratum_factory=self.manager.stratum_factory,
)
if args.sysctl:
self.init_sysctl(args.sysctl)

def on_load_finished(self, args: Namespace) -> None:
# resources_builder.build_after_loading(args, root)
if args.stratum:
self.reactor.listenTCP(args.stratum, self.manager.stratum_factory)

def start_sentry_if_possible(self, args: Namespace) -> None:
"""Start Sentry integration if possible."""
Expand All @@ -194,8 +199,8 @@ def start_sentry_if_possible(self, args: Namespace) -> None:
)

def start_manager(self, args: Namespace) -> None:
self.start_sentry_if_possible(args)
self.manager.start()
self.manager.pubsub.subscribe(HathorEvents.LOAD_FINISHED, partial(self.on_load_finished, args))
self.reactor.callLater(0, self.manager.start)

def register_signal_handlers(self, args: Namespace) -> None:
"""Register signal handlers."""
Expand Down Expand Up @@ -334,8 +339,9 @@ def __init__(self, *, argv=None):

self.prepare(args)
self.register_signal_handlers(args)
if args.sysctl:
self.init_sysctl(args.sysctl)
self.prepare(args)
self.start_sentry_if_possible(args)
self.start_manager(args)

def init_sysctl(self, description: str) -> None:
"""Initialize sysctl and listen for connections.
Expand Down
7 changes: 5 additions & 2 deletions hathor/indexes/base_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
from typing import TYPE_CHECKING, Optional

from structlog import get_logger
from twisted.internet import defer
from twisted.internet.defer import Deferred

from hathor.indexes.scope import Scope
from hathor.transaction.base_transaction import BaseTransaction

if TYPE_CHECKING: # pragma: no cover
from hathor.indexes.manager import IndexesManager
from hathor.util import Reactor

logger = get_logger()

Expand All @@ -35,12 +38,12 @@ class BaseIndex(ABC):
def __init__(self) -> None:
self.log = logger.new()

def init_start(self, indexes_manager: 'IndexesManager') -> None:
def init_start(self, reactor: 'Reactor', indexes_manager: 'IndexesManager') -> Deferred:
""" This method will always be called when starting the index manager, regardless of initialization state.

It comes with a no-op implementation by default because usually indexes will not need this.
"""
pass
return defer.succeed(None)

@abstractmethod
def get_scope(self) -> Scope:
Expand Down
Loading