diff --git a/hathor/pubsub.py b/hathor/pubsub.py index b9c5506c3..f598f4998 100644 --- a/hathor/pubsub.py +++ b/hathor/pubsub.py @@ -14,9 +14,10 @@ from collections import defaultdict, deque from enum import Enum -from typing import TYPE_CHECKING, Any, Callable +from typing import TYPE_CHECKING, Any, Callable, Optional -from twisted.internet.interfaces import IReactorFromThreads +from structlog import get_logger +from twisted.internet.interfaces import IDelayedCall, IReactorFromThreads from twisted.python.threadable import isInIOThread from hathor.util import Reactor @@ -25,6 +26,8 @@ if TYPE_CHECKING: from hathor.transaction import BaseTransaction, Block +logger = get_logger() + class HathorEvents(Enum): """ @@ -170,6 +173,9 @@ def __init__(self, reactor: Reactor) -> None: self._subscribers = defaultdict(list) self.queue: deque[tuple[PubSubCallable, HathorEvents, EventArguments]] = deque() self.reactor = reactor + self.log = logger.new() + + self._call_later_id: Optional[IDelayedCall] = None def subscribe(self, key: HathorEvents, fn: PubSubCallable) -> None: """Subscribe to a specific event. @@ -193,22 +199,36 @@ def _call_next(self) -> None: """Execute next call if it exists.""" if not self.queue: return - fn, key, args = self.queue.popleft() - fn(key, args) - if self.queue: + + self.log.debug('running pubsub call_next', len=len(self.queue)) + + try: + while self.queue: + fn, key, args = self.queue.popleft() + fn(key, args) + except Exception: + self.log.error('event processing failed', key=key, args=args) + raise + finally: self._schedule_call_next() def _schedule_call_next(self) -> None: """Schedule next call's execution.""" assert self.reactor.running + if not self.queue: + return + if not isInIOThread() and (threaded_reactor := verified_cast(IReactorFromThreads, self.reactor)): # We're taking a conservative approach, since not all functions might need to run # on the main thread [yan 2019-02-20] threaded_reactor.callFromThread(self._call_next) return - self.reactor.callLater(0, self._call_next) + if self._call_later_id and self._call_later_id.active(): + return + + self._call_later_id = self.reactor.callLater(0, self._call_next) def publish(self, key: HathorEvents, **kwargs: Any) -> None: """Publish a new event. @@ -224,7 +244,5 @@ def publish(self, key: HathorEvents, **kwargs: Any) -> None: if not self.reactor.running: fn(key, args) else: - is_empty = bool(not self.queue) self.queue.append((fn, key, args)) - if is_empty: - self._schedule_call_next() + self._schedule_call_next() diff --git a/tests/pubsub/test_pubsub.py b/tests/pubsub/test_pubsub.py index fc41fca44..2d3d1ef62 100644 --- a/tests/pubsub/test_pubsub.py +++ b/tests/pubsub/test_pubsub.py @@ -1,47 +1,8 @@ -import threading -import time - -from twisted.internet import threads -from twisted.python import threadable - from hathor.pubsub import HathorEvents, PubSubManager -from hathor.util import reactor -from tests import unittest - - -class PubSubTestCase(unittest.TestCase): - def _waitForThread(self): - """ - The reactor's threadpool is only available when the reactor is running, - so to have a sane behavior during the tests we make a dummy - L{threads.deferToThread} call. - """ - # copied from twisted/test/test_threads.py [yan] - return threads.deferToThread(time.sleep, 0) - - def test_pubsub_thread(self): - """ Test pubsub function is always called in reactor thread. - """ - def _on_new_event(*args): - self.assertTrue(threadable.isInIOThread()) - - pubsub = PubSubManager(reactor) - pubsub.subscribe(HathorEvents.NETWORK_NEW_TX_ACCEPTED, _on_new_event) - - def cb(_ignore): - waiter = threading.Event() - - def threadedFunc(): - self.assertFalse(threadable.isInIOThread()) - pubsub.publish(HathorEvents.NETWORK_NEW_TX_ACCEPTED) - waiter.set() - - reactor.callInThread(threadedFunc) - waiter.wait(20) - self.assertTrue(waiter.isSet()) +from tests.unittest import TestCase - return self._waitForThread().addCallback(cb) +class PubSubTestCase(TestCase): def test_duplicate_subscribe(self): def noop(): pass