Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hathor/cli/events_simulator/events_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def execute(args: Namespace) -> None:

log.info('Started simulating events', scenario=args.scenario, seed=simulator.seed)

forwarding_ws_factory.start()
forwarding_ws_factory.start(stream_id='simulator')
scenario.simulate(simulator, manager)
reactor.listenTCP(args.port, site)
reactor.run()
Expand Down
10 changes: 8 additions & 2 deletions hathor/event/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

from typing import Callable, Iterator, Optional
from uuid import uuid4

from structlog import get_logger

Expand All @@ -23,7 +24,7 @@
from hathor.event.websocket import EventWebsocketFactory
from hathor.pubsub import EventArguments, HathorEvents, PubSubManager
from hathor.transaction import BaseTransaction
from hathor.util import Reactor, progress
from hathor.util import Reactor, not_none, progress
from hathor.utils.iter import batch_iterator

logger = get_logger()
Expand Down Expand Up @@ -55,6 +56,7 @@ class EventManager:
_peer_id: str
_is_running: bool = False
_previous_node_state: Optional[NodeState] = None
_stream_id: Optional[str] = None
_last_event: Optional[BaseEvent] = None
_last_existing_group_id: Optional[int] = None

Expand Down Expand Up @@ -86,16 +88,20 @@ def start(self, peer_id: str) -> None:

if self._should_reload_events():
self._event_storage.reset_events()
self._stream_id = str(uuid4())
self._event_storage.save_stream_id(self._stream_id)
else:
self._last_event = self._event_storage.get_last_event()
self._last_existing_group_id = self._event_storage.get_last_group_id()
self._stream_id = not_none(self._event_storage.get_stream_id())

self._assert_closed_event_group()
self._subscribe_events()

self._peer_id = peer_id
self._event_ws_factory.start()
self._event_ws_factory.start(stream_id=not_none(self._stream_id))
self._is_running = True
self.log.info('Starting Event Manager', stream_id=self._stream_id)

def stop(self) -> None:
"""Stops the EventManager."""
Expand Down
14 changes: 12 additions & 2 deletions hathor/event/storage/event_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ def iter_from_event(self, key: int) -> Iterator[BaseEvent]:
@abstractmethod
def reset_events(self) -> None:
"""
Reset event-related data: events, last_event, and last_group_id.
Reset event-related data: events, last_event, last_group_id, and stream_id.
This should be used to clear old events from the database when reloading events.
"""
raise NotImplementedError

@abstractmethod
def reset_all(self) -> None:
"""
Reset all data and metadata: events, last_event, last_group_id, node_state, and event_queue_enabled.
Reset all data and metadata: events, last_event, last_group_id, stream_id, node_state, and event_queue_enabled.
This should be used for a full wipe out of the event storage.
"""
raise NotImplementedError
Expand All @@ -85,3 +85,13 @@ def save_event_queue_state(self, enabled: bool) -> None:
def get_event_queue_state(self) -> bool:
"""Get whether the event queue feature is enabled from the storage"""
raise NotImplementedError

@abstractmethod
def save_stream_id(self, stream_id: str) -> None:
"""Save the Stream ID."""
raise NotImplementedError

@abstractmethod
def get_stream_id(self) -> Optional[str]:
"""Get the Stream ID."""
raise NotImplementedError
8 changes: 8 additions & 0 deletions hathor/event/storage/memory_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(self) -> None:
self._events: list[BaseEvent] = []
self._last_event: Optional[BaseEvent] = None
self._last_group_id: Optional[int] = None
self._stream_id: Optional[str] = None
self._node_state: Optional[NodeState] = None
self._event_queue_enabled: bool = False

Expand Down Expand Up @@ -66,6 +67,7 @@ def reset_events(self) -> None:
self._events = []
self._last_event = None
self._last_group_id = None
self._stream_id = None

def reset_all(self) -> None:
self.reset_events()
Expand All @@ -83,3 +85,9 @@ def save_event_queue_state(self, enabled: bool) -> None:

def get_event_queue_state(self) -> bool:
return self._event_queue_enabled

def save_stream_id(self, stream_id: str) -> None:
self._stream_id = stream_id

def get_stream_id(self) -> Optional[str]:
return self._stream_id
16 changes: 16 additions & 0 deletions hathor/event/storage/rocksdb_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
_KEY_LAST_GROUP_ID = b'last-group-id'
_KEY_NODE_STATE = b'node-state'
_KEY_EVENT_QUEUE_ENABLED = b'event-queue-enabled'
_KEY_STREAM_ID = b'stream-id'


class EventRocksDBStorage(EventStorage):
Expand Down Expand Up @@ -112,6 +113,7 @@ def reset_events(self) -> None:
self._last_group_id = None

self._db.delete((self._cf_meta, _KEY_LAST_GROUP_ID))
self._db.delete((self._cf_meta, _KEY_STREAM_ID))
self._db.drop_column_family(self._cf_event)

self._cf_event = self._rocksdb_storage.get_or_create_column_family(_CF_NAME_EVENT)
Expand Down Expand Up @@ -147,3 +149,17 @@ def get_event_queue_state(self) -> bool:
return False

return bool.from_bytes(enabled_bytes, byteorder='big')

def save_stream_id(self, stream_id: str) -> None:
self._db.put(
(self._cf_meta, _KEY_STREAM_ID),
stream_id.encode('utf8')
)

def get_stream_id(self) -> Optional[str]:
stream_id_bytes: bytes = self._db.get((self._cf_meta, _KEY_STREAM_ID))

if stream_id_bytes is None:
return None

return stream_id_bytes.decode('utf8')
16 changes: 12 additions & 4 deletions hathor/event/websocket/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from hathor.event.storage import EventStorage
from hathor.event.websocket.protocol import EventWebsocketProtocol
from hathor.event.websocket.response import EventResponse, InvalidRequestType
from hathor.util import Reactor
from hathor.util import Reactor, not_none

logger = get_logger()

Expand All @@ -37,6 +37,9 @@ class EventWebsocketFactory(WebSocketServerFactory):
# The last event id broadcast by this factory.
_latest_event_id: Optional[int] = None

# The unique stream ID
_stream_id: Optional[str] = None

def __init__(self, reactor: Reactor, event_storage: EventStorage):
super().__init__()
self.log = logger.new()
Expand All @@ -49,13 +52,14 @@ def __init__(self, reactor: Reactor, event_storage: EventStorage):
if latest_event is not None:
self._latest_event_id = latest_event.id

def start(self):
def start(self, *, stream_id: str) -> None:
"""Start the WebSocket server. Required to be able to send events."""
assert self._is_running is False, 'Cannot start, EventWebsocketFactory is already running'

self._is_running = True
self._stream_id = stream_id

def stop(self):
def stop(self) -> None:
"""Stop the WebSocket server. No events can be sent."""
assert self._is_running is True, 'Cannot stop, EventWebsocketFactory is not running'

Expand Down Expand Up @@ -108,6 +112,10 @@ def _send_event_to_connection(self, connection: EventWebsocketProtocol, event: B

assert self._latest_event_id is not None, '_latest_event_id must be set.'

response = EventResponse(event=event, latest_event_id=self._latest_event_id)
response = EventResponse(
event=event,
latest_event_id=self._latest_event_id,
stream_id=not_none(self._stream_id)
)

connection.send_event_response(response)
2 changes: 2 additions & 0 deletions hathor/event/websocket/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ class EventResponse(Response):
type: The type of the response.
event: The event.
latest_event_id: The ID of the latest event known by the server.
stream_id: The ID of the current stream.
"""

type: str = Field(default='EVENT', const=True)
event: BaseEvent
latest_event_id: NonNegativeInt
stream_id: str


class InvalidRequestType(Enum):
Expand Down
Loading