Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
c7cb2ed
feat(events): add websocket endpoint to stream events
jansegre Oct 27, 2022
4ed1bc8
Merge branch 'dev' into feat/websocket-event
glevco Jan 25, 2023
1214b44
feat(events): add TODO and other minor changes
glevco Jan 25, 2023
37338bf
feat(events): add EventWebsocketFactory to EventManager
glevco Jan 26, 2023
6f2a3ef
feat(events): refactor EventManager
glevco Jan 26, 2023
d6356c5
feat(events): fix EventManager instantiation on HathorManager
glevco Jan 26, 2023
4721c90
feat(events): start protocol and factory refactor (wip)
glevco Jan 26, 2023
69bebcf
feat(events): create request and response classes
glevco Jan 26, 2023
6b0f730
feat(events): finish protocol and factory refactor
glevco Jan 26, 2023
5f9afb6
feat(events): Update EventStorage interface
glevco Jan 27, 2023
0a80138
feat(events): update event WebSocket architecture
glevco Jan 27, 2023
fff146d
feat(pydantic-install): install pydantic
glevco Feb 2, 2023
fff6589
Merge branch 'feat/pydantic-install' into feat/websocket-event
glevco Feb 2, 2023
f6671df
feat(pydantic-install): add missing docstrings
glevco Feb 3, 2023
c8111a1
feat(events): change request to use pydantic and update response schema
glevco Feb 3, 2023
bdad86a
feat(events): revert EventStorage changes
glevco Feb 3, 2023
624b250
feat(events): remove unused settings
glevco Feb 3, 2023
67455ca
feat(events): update factory and protocol logic
glevco Feb 3, 2023
a247dae
feat(events): convert dataclasses to pydantic
glevco Feb 3, 2023
ace89e3
feat(events): lint code
glevco Feb 3, 2023
9148a59
feat(events): update builder and manager
glevco Feb 3, 2023
265cf37
feat(events): change test utils so filepath is not hardcoded
glevco Feb 4, 2023
cf326ec
feat(events): change builder to accept custom settings
glevco Feb 4, 2023
cf4a5d9
feat(events): fix typing circular problem
glevco Feb 4, 2023
829fa02
feat(events): implement --x-enable-event-queue tests
glevco Feb 4, 2023
6850a96
feat(events): fix builder typing
glevco Feb 4, 2023
65ae0b6
feat(events): fix builder typing
glevco Feb 4, 2023
1d1e1b3
feat(events): update EventStorage tests
glevco Feb 4, 2023
4010e38
feat(events): change protocol name
glevco Feb 5, 2023
2223690
feat(events): implement factory and protocol tests
glevco Feb 5, 2023
d0038c3
feat(events): remove incorrect assert
glevco Feb 5, 2023
97963fa
feat(events): update test_event_manager
glevco Feb 5, 2023
fb3bd4d
feat(events): update test_event_reorg
glevco Feb 5, 2023
2980155
feat(events): fix builder condition
glevco Feb 6, 2023
6aa4bfe
feat(events): implement request validation and tests
glevco Feb 6, 2023
767c424
feat(events): lint code
glevco Feb 6, 2023
cead9fd
feat(events): resolve TODOs and improve tests
glevco Feb 6, 2023
c7b7e1c
feat(events): add docstrings
glevco Feb 6, 2023
1fa95bc
feat(events): change test utils so filepath is not hardcoded
glevco Feb 4, 2023
3f555df
feat(events): change builder to accept custom settings
glevco Feb 4, 2023
d757f62
feat(events): fix builder typing
glevco Feb 4, 2023
4545545
feat(events): fix builder typing
glevco Feb 4, 2023
38c7472
feat(pydantic-install): fix mypy plugin
glevco Feb 6, 2023
a43c591
Merge branch 'feat/pydantic-install' into chore/test-improvements
glevco Feb 6, 2023
ffa129a
Merge branch 'chore/test-improvements' into feat/websocket-event
glevco Feb 6, 2023
1dd7031
feat(events): add event ws start/stop on manager
glevco Feb 6, 2023
79a7008
feat(events): fix bug and add missing test
glevco Feb 6, 2023
9d57608
feat(events): fix object bleeding
glevco Feb 6, 2023
a295397
feat(events): improve logs
glevco Feb 6, 2023
8485233
feat(events): fix lint issues
glevco Feb 6, 2023
209013c
feat(events): remove unused event
glevco Feb 6, 2023
b8c0f9c
Merge branch 'dev' into chore/test-improvements
glevco Feb 7, 2023
9ad7bfe
feat(test-improvements): rollback builder changes
glevco Feb 7, 2023
a5b432e
feat(test-improvements): move filepath code into function
glevco Feb 7, 2023
4e58e7b
Merge branch 'chore/test-improvements' into feat/websocket-event
glevco Feb 7, 2023
4fb69fe
feat(events): fix builder tests
glevco Feb 7, 2023
77c5bad
feat(events): change HathorManager to receive EventManager
glevco Feb 7, 2023
8e1538e
feat(events): improve logging
glevco Feb 7, 2023
f6f9cf2
feat(events): change window_size_increment to window_size
glevco Feb 7, 2023
a5f7efb
feat(events): change window_size_increment to window_size
glevco Feb 7, 2023
d8ae406
Merge remote-tracking branch 'origin/feat/websocket-event' into feat/…
glevco Feb 7, 2023
f4fe231
feat(events): change event factory to use callLater
glevco Feb 7, 2023
7f90392
feat(events): change event factory to use callLater
glevco Feb 7, 2023
bfb4204
Merge branch 'dev' into feat/websocket-event
glevco Feb 7, 2023
d733c9d
feat(events): lint code
glevco Feb 8, 2023
bae46cf
feat(events): fix global pubsub on tests
glevco Feb 8, 2023
6d5e29b
feat(events): update request and response models
glevco Feb 8, 2023
739fd12
feat(events): update factory and protocol according to code review
glevco Feb 8, 2023
da5af83
feat(events): implement stream (in)active responses
glevco Feb 9, 2023
01f2982
feat(events): refactor code from factory to protocol
glevco Feb 9, 2023
aa94e53
feat(events): refactor responses
glevco Feb 9, 2023
c5826b9
feat(events): fix ack validation
glevco Feb 9, 2023
976a5e1
feat(events): lint code
glevco Feb 9, 2023
39d32a5
feat(events): update factory tests
glevco Feb 9, 2023
760e212
feat(events): update protocol tests
glevco Feb 9, 2023
f7226c7
feat(events): remove Python 3.10 specific code
glevco Feb 9, 2023
e0fe22b
feat(events): lint code
glevco Feb 9, 2023
671427e
feat(events): fix typing
glevco Feb 10, 2023
69fb2da
feat(events): add more python retrocompatibility changes
glevco Feb 10, 2023
5a46fbd
feat(events): implement sendMessage exception handling
glevco Feb 10, 2023
0c4107d
Merge branch 'dev' into feat/websocket-event
glevco Feb 15, 2023
7f2cdcc
feat(events): implement BaseEvent tests
glevco Feb 15, 2023
a119653
feat(events): remove __future__
glevco Feb 15, 2023
a09d15d
refactor(indexes): Create a generic TxGroupIndex and uses it on Addre…
msbrogli Feb 9, 2023
f4a4b13
feat(events): remove NETWORK_NEW_TX_VOIDED event (#529)
glevco Feb 17, 2023
6ce3aec
Merge branch 'dev' into feat/websocket-event
glevco Feb 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions hathor/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from twisted.web import server
from twisted.web.resource import Resource

from hathor.event import EventManager
from hathor.exception import BuilderError
from hathor.indexes import IndexesManager
from hathor.manager import HathorManager
Expand Down Expand Up @@ -56,6 +57,7 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
from hathor.conf.get_settings import get_settings_module
from hathor.daa import TestMode, _set_test_mode
from hathor.event.storage import EventMemoryStorage, EventRocksDBStorage, EventStorage
from hathor.event.websocket.factory import EventWebsocketFactory
from hathor.p2p.netfilter.utils import add_peer_id_blacklist
from hathor.p2p.peer_discovery import BootstrapPeerDiscovery, DNSPeerDiscovery
from hathor.storage import RocksDBStorage
Expand Down Expand Up @@ -90,13 +92,15 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa

tx_storage: TransactionStorage
rocksdb_storage: RocksDBStorage
event_storage: Optional[EventStorage] = None
self.event_storage: Optional[EventStorage] = None
self.event_ws_factory: Optional[EventWebsocketFactory] = None

if args.memory_storage:
self.check_or_raise(not args.data, '--data should not be used with --memory-storage')
# if using MemoryStorage, no need to have cache
tx_storage = TransactionMemoryStorage()
if args.x_enable_event_queue:
event_storage = EventMemoryStorage()
self.event_storage = EventMemoryStorage()
self.check_or_raise(not args.x_rocksdb_indexes, 'RocksDB indexes require RocksDB data')
self.log.info('with storage', storage_class=type(tx_storage).__name__)
else:
Expand All @@ -109,7 +113,7 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
with_index=(not args.cache),
use_memory_indexes=args.memory_indexes)
if args.x_enable_event_queue:
event_storage = EventRocksDBStorage(rocksdb_storage)
self.event_storage = EventRocksDBStorage(rocksdb_storage)

self.log.info('with storage', storage_class=type(tx_storage).__name__, path=args.data)
if args.cache:
Expand All @@ -135,6 +139,17 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa

pubsub = PubSubManager(reactor)

event_manager: Optional[EventManager] = None
if args.x_enable_event_queue:
assert self.event_storage is not None, 'cannot create EventManager without EventStorage'
self.event_ws_factory = EventWebsocketFactory(reactor, self.event_storage)
event_manager = EventManager(
event_storage=self.event_storage,
event_ws_factory=self.event_ws_factory,
pubsub=pubsub,
reactor=reactor
)

if args.wallet_index and tx_storage.indexes is not None:
self.log.debug('enable wallet indexes')
self.enable_wallet_index(tx_storage.indexes, pubsub)
Expand All @@ -150,7 +165,7 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
network=network,
hostname=hostname,
tx_storage=tx_storage,
event_storage=event_storage,
event_manager=event_manager,
wallet=self.wallet,
stratum_port=args.stratum,
ssl=True,
Expand Down Expand Up @@ -481,6 +496,10 @@ def create_resources(self, args: Namespace) -> server.Site:

ws_factory.subscribe(self.manager.pubsub)

# Event websocket resource
if args.x_enable_event_queue and self.event_ws_factory is not None:
root.putChild(b'event_ws', WebSocketResource(self.event_ws_factory))

# Websocket stats resource
root.putChild(b'websocket_stats', WebsocketStatsResource(ws_factory))

Expand Down
1 change: 1 addition & 0 deletions hathor/conf/unittests.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@
GENESIS_TX2_HASH=bytes.fromhex('33e14cb555a96967841dcbe0f95e9eab5810481d01de8f4f73afb8cce365e869'),
REWARD_SPEND_MIN_BLOCKS=10,
SLOW_ASSERTS=True,
ENABLE_EVENT_QUEUE_FEATURE=True,
)
14 changes: 8 additions & 6 deletions hathor/event/base_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,26 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass
from typing import Dict, Optional

from pydantic import NonNegativeInt

@dataclass
class BaseEvent:
from hathor.utils.pydantic import BaseModel


class BaseEvent(BaseModel):
# Full node id, because different full nodes can have different sequences of events
peer_id: str
# Event unique id, determines event order
id: int
id: NonNegativeInt
# Timestamp in which the event was emitted, this follows the unix_timestamp format, it's only informative, events
# aren't guaranteed to always have sequential timestamps, for example, if the system clock changes between two
# events it's possible that timestamps will temporarily decrease.
timestamp: float
# One of the event types
type: str
type: str # TODO: Convert type and data to enum and classes
# Variable for event type
data: Dict
# Used to link events, for example, many TX_METADATA_CHANGED will have the same group_id when they belong to the
# same reorg process
group_id: Optional[int] = None
group_id: Optional[NonNegativeInt] = None
149 changes: 111 additions & 38 deletions hathor/event/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from hathor.event.base_event import BaseEvent
from hathor.event.storage import EventStorage
from hathor.event.websocket import EventWebsocketFactory
from hathor.pubsub import EventArguments, HathorEvents, PubSubManager
from hathor.util import Reactor

Expand Down Expand Up @@ -85,61 +86,133 @@ def _extract_reorg(args: EventArguments) -> Dict[str, Any]:
}


def build_event_data(event: HathorEvents, event_args: EventArguments) -> Dict[str, Any]:
def _build_event_data(event_type: HathorEvents, event_args: EventArguments) -> Dict[str, Any]:
"""Extract and build event data from event_args for a given event type."""
event_extract_fn = _EVENT_EXTRACT_MAP.get(event)
event_extract_fn = _EVENT_EXTRACT_MAP.get(event_type)
if event_extract_fn is None:
raise ValueError(f'The given event type ({event}) is not a supported event')
raise ValueError(f'The given event type ({event_type}) is not a supported event')
return event_extract_fn(event_args)


class EventManager:
def __init__(self, event_storage: EventStorage, reactor: Reactor, peer_id: str):
"""Class that manages integration events.

Events are received from PubSub, persisted on the storage and sent to WebSocket clients.
"""

_peer_id: str

def __init__(
self,
event_storage: EventStorage,
event_ws_factory: EventWebsocketFactory,
pubsub: PubSubManager,
reactor: Reactor
):
self.log = logger.new()
self.clock = reactor
self.event_storage = event_storage
last_event = event_storage.get_last_event()
last_event_type = HathorEvents(last_event.type) if last_event is not None else None

self._clock = reactor
self._event_storage = event_storage
self._event_ws_factory = event_ws_factory
self._pubsub = pubsub

self._last_event = self._event_storage.get_last_event()
self._last_existing_group_id = self._event_storage.get_last_group_id()

self._assert_closed_event_group()
self._subscribe_events()

def start(self, peer_id: str) -> None:
self._peer_id = peer_id
self._event_ws_factory.start()

def stop(self):
self._event_ws_factory.stop()

def _assert_closed_event_group(self):
# XXX: we must check that the last event either does not belong to an event group or that it just closed an
# event group, because we cannot resume an open group of events that wasn't properly closed before exit
assert (
last_event is None or
last_event.group_id is None or
last_event_type in _GROUP_END_EVENTS
self._event_group_is_closed()
), 'an unclosed event group was detected, which indicates the node crashed, cannot resume'
self._next_event_id = 0 if last_event is None else last_event.id + 1
last_group_id = event_storage.get_last_group_id()
self._next_group_id = 0 if last_group_id is None else last_group_id + 1
self._current_group_id: Optional[int] = None
self._peer_id = peer_id

def subscribe(self, pubsub: PubSubManager) -> None:
def _event_group_is_closed(self):
return (
self._last_event is None or
self._last_event.group_id is None or
HathorEvents(self._last_event.type) in _GROUP_END_EVENTS
)

def _subscribe_events(self):
""" Subscribe to defined events for the pubsub received
"""
for event in _SUBSCRIBE_EVENTS:
pubsub.subscribe(event, self._persist_event)
self._pubsub.subscribe(event, self._handle_event)

def _handle_event(self, event_type: HathorEvents, event_args: EventArguments) -> None:
create_event_fn: Callable[[HathorEvents, EventArguments], BaseEvent]

def _persist_event(self, event: HathorEvents, event_args: EventArguments) -> None:
group_id: Optional[int]
if event in _GROUP_START_EVENTS:
assert self._current_group_id is None, 'cannot start an event group before the last one is ended'
group_id = self._next_group_id
if event_type in _GROUP_START_EVENTS:
create_event_fn = self._create_group_start_event
elif event_type in _GROUP_END_EVENTS:
create_event_fn = self._create_group_end_event
else:
group_id = self._current_group_id
if event in _GROUP_END_EVENTS:
assert self._current_group_id is not None, 'cannot end group twice'
event_to_store = BaseEvent(
id=self._next_event_id,
create_event_fn = self._create_non_group_edge_event

event = create_event_fn(event_type, event_args)

self._event_storage.save_event(event)
self._event_ws_factory.broadcast_event(event)

self._last_event = event

def _create_group_start_event(self, event_type: HathorEvents, event_args: EventArguments) -> BaseEvent:
assert self._event_group_is_closed(), 'A new event group cannot be started as one is already in progress.'

new_group_id = 0 if self._last_existing_group_id is None else self._last_existing_group_id + 1

self._last_existing_group_id = new_group_id

return self._create_event(
event_type=event_type,
event_args=event_args,
group_id=new_group_id,
)

def _create_group_end_event(self, event_type: HathorEvents, event_args: EventArguments) -> BaseEvent:
assert self._last_event is not None, 'Cannot end event group if there are no events.'
assert not self._event_group_is_closed(), 'Cannot end event group as none is in progress.'

return self._create_event(
event_type=event_type,
event_args=event_args,
group_id=self._last_event.group_id,
)

def _create_non_group_edge_event(self, event_type: HathorEvents, event_args: EventArguments) -> BaseEvent:
group_id = None

if not self._event_group_is_closed():
assert self._last_event is not None, 'Cannot continue event group if there are no events.'
group_id = self._last_event.group_id

return self._create_event(
event_type=event_type,
event_args=event_args,
group_id=group_id,
)

def _create_event(
self,
event_type: HathorEvents,
event_args: EventArguments,
group_id: Optional[int],
) -> BaseEvent:
return BaseEvent(
id=0 if self._last_event is None else self._last_event.id + 1,
peer_id=self._peer_id,
timestamp=self.clock.seconds(),
type=event.value,
data=build_event_data(event, event_args),
timestamp=self._clock.seconds(),
type=event_type.value,
data=_build_event_data(event_type, event_args),
group_id=group_id,
)
self.event_storage.save_event(event_to_store)
self._next_event_id += 1
if event in _GROUP_START_EVENTS:
self._current_group_id = self._next_group_id
self._next_group_id += 1
if event in _GROUP_END_EVENTS:
self._current_group_id = None
7 changes: 6 additions & 1 deletion hathor/event/storage/event_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

from abc import ABC, abstractmethod
from typing import Optional
from typing import Iterator, Optional

from hathor.event.base_event import BaseEvent

Expand All @@ -38,3 +38,8 @@ def get_last_event(self) -> Optional[BaseEvent]:
def get_last_group_id(self) -> Optional[int]:
""" Get the last group-id that was emitted, this is used to help resume when restarting."""
raise NotImplementedError

@abstractmethod
def iter_from_event(self, key: int) -> Iterator[BaseEvent]:
""" Iterate through events starting from the event with the given key"""
raise NotImplementedError
14 changes: 10 additions & 4 deletions hathor/event/storage/memory_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Optional
from typing import Iterator, List, Optional

from hathor.event.base_event import BaseEvent
from hathor.event.storage.event_storage import EventStorage
Expand All @@ -25,8 +25,6 @@ def __init__(self):
self._last_group_id: Optional[int] = None

def save_event(self, event: BaseEvent) -> None:
if event.id < 0:
raise ValueError('event.id must be non-negative')
if event.id != len(self._events):
raise ValueError('invalid event.id, ids must be sequential and leave no gaps')
self._last_event = event
Expand All @@ -36,7 +34,7 @@ def save_event(self, event: BaseEvent) -> None:

def get_event(self, key: int) -> Optional[BaseEvent]:
if key < 0:
raise ValueError('key must be non-negative')
raise ValueError(f'event.id \'{key}\' must be non-negative')
if key >= len(self._events):
return None
event = self._events[key]
Expand All @@ -48,3 +46,11 @@ def get_last_event(self) -> Optional[BaseEvent]:

def get_last_group_id(self) -> Optional[int]:
return self._last_group_id

def iter_from_event(self, key: int) -> Iterator[BaseEvent]:
if key < 0:
raise ValueError(f'event.id \'{key}\' must be non-negative')

while key < len(self._events):
yield self._events[key]
key += 1
Loading