Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions hathor/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@

from collections import defaultdict, deque
from enum import Enum
from typing import TYPE_CHECKING, Any, Callable
from typing import TYPE_CHECKING, Any, Callable, Optional

from twisted.internet.interfaces import IReactorFromThreads
from structlog import get_logger
from twisted.internet.interfaces import IDelayedCall, IReactorFromThreads
from twisted.python.threadable import isInIOThread

from hathor.util import Reactor
Expand All @@ -25,6 +26,8 @@
if TYPE_CHECKING:
from hathor.transaction import BaseTransaction, Block

logger = get_logger()


class HathorEvents(Enum):
"""
Expand Down Expand Up @@ -170,6 +173,9 @@ def __init__(self, reactor: Reactor) -> None:
self._subscribers = defaultdict(list)
self.queue: deque[tuple[PubSubCallable, HathorEvents, EventArguments]] = deque()
self.reactor = reactor
self.log = logger.new()

self._call_later_id: Optional[IDelayedCall] = None

def subscribe(self, key: HathorEvents, fn: PubSubCallable) -> None:
"""Subscribe to a specific event.
Expand All @@ -193,22 +199,36 @@ def _call_next(self) -> None:
"""Execute next call if it exists."""
if not self.queue:
return
fn, key, args = self.queue.popleft()
fn(key, args)
if self.queue:

self.log.debug('running pubsub call_next', len=len(self.queue))

try:
while self.queue:
fn, key, args = self.queue.popleft()
fn(key, args)
except Exception:
self.log.error('event processing failed', key=key, args=args)
raise
finally:
self._schedule_call_next()

def _schedule_call_next(self) -> None:
"""Schedule next call's execution."""
assert self.reactor.running

if not self.queue:
return

if not isInIOThread() and (threaded_reactor := verified_cast(IReactorFromThreads, self.reactor)):
# We're taking a conservative approach, since not all functions might need to run
# on the main thread [yan 2019-02-20]
threaded_reactor.callFromThread(self._call_next)
return

self.reactor.callLater(0, self._call_next)
if self._call_later_id and self._call_later_id.active():
return

self._call_later_id = self.reactor.callLater(0, self._call_next)

def publish(self, key: HathorEvents, **kwargs: Any) -> None:
"""Publish a new event.
Expand All @@ -224,7 +244,5 @@ def publish(self, key: HathorEvents, **kwargs: Any) -> None:
if not self.reactor.running:
fn(key, args)
else:
is_empty = bool(not self.queue)
self.queue.append((fn, key, args))
if is_empty:
self._schedule_call_next()
self._schedule_call_next()
43 changes: 2 additions & 41 deletions tests/pubsub/test_pubsub.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,8 @@
import threading
import time

from twisted.internet import threads
from twisted.python import threadable

from hathor.pubsub import HathorEvents, PubSubManager
from hathor.util import reactor
from tests import unittest


class PubSubTestCase(unittest.TestCase):
def _waitForThread(self):
"""
The reactor's threadpool is only available when the reactor is running,
so to have a sane behavior during the tests we make a dummy
L{threads.deferToThread} call.
"""
# copied from twisted/test/test_threads.py [yan]
return threads.deferToThread(time.sleep, 0)

def test_pubsub_thread(self):
""" Test pubsub function is always called in reactor thread.
"""
def _on_new_event(*args):
self.assertTrue(threadable.isInIOThread())

pubsub = PubSubManager(reactor)
pubsub.subscribe(HathorEvents.NETWORK_NEW_TX_ACCEPTED, _on_new_event)

def cb(_ignore):
waiter = threading.Event()

def threadedFunc():
self.assertFalse(threadable.isInIOThread())
pubsub.publish(HathorEvents.NETWORK_NEW_TX_ACCEPTED)
waiter.set()

reactor.callInThread(threadedFunc)
waiter.wait(20)
self.assertTrue(waiter.isSet())
from tests.unittest import TestCase

return self._waitForThread().addCallback(cb)

class PubSubTestCase(TestCase):
def test_duplicate_subscribe(self):
def noop():
pass
Expand Down