Skip to content

Commit

Permalink
Merge pull request #24 from authorizon/increase_verbosity
Browse files Browse the repository at this point in the history
Increase logs verbosity to make it easier to follow along execution paths
  • Loading branch information
asafc authored Oct 10, 2021
2 parents a621f77 + 680346f commit b37ed79
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 13 deletions.
13 changes: 6 additions & 7 deletions fastapi_websocket_pubsub/event_broadcaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,12 @@ async def __aenter__(self):
if self._event_broadcaster._share_count == 1:
# We have our first publisher
# Init the broadcast used for sharing (reading has its own)
self._event_broadcaster._acquire_sharing_broadcast_channel()
logger.info("Subscribing to ALL TOPICS, and sharing messages with broadcast channel")
self._event_broadcaster._acquire_sharing_broadcast_channel()
logger.info("Subscribing to ALL_TOPICS, and sharing messages with broadcast channel")
# Subscribe to internal events form our own event notifier and broadcast them
await self._event_broadcaster._subscribe_to_all_topics()
else:
logger.info(f"Did not subscribe to ALL_TOPICS: share count == {self._event_broadcaster._share_count}")
return self

async def __aexit__(self, exc_type, exc, tb):
Expand Down Expand Up @@ -149,8 +151,7 @@ async def __broadcast_notifications__(self, subscription: Subscription, data):
subscription (Subscription): the subscription that got triggered
data: the event data
"""
logger.info("Broadcasting incoming event",
{'topic': subscription.topic, 'notifier_id': self._id})
logger.info("Broadcasting incoming event: {}".format({'topic': subscription.topic, 'notifier_id': self._id}))
note = BroadcastNotification(notifier_id=self._id, topics=[
subscription.topic], data=data)
# Publish event to broadcast
Expand Down Expand Up @@ -238,9 +239,7 @@ async def __read_notifications__(self):
event.message)
# Avoid re-publishing our own broadcasts
if notification.notifier_id != self._id:
logger.debug("Handling incoming broadcast event",
{'topics': notification.topics,
'src': notification.notifier_id})
logger.info("Handling incoming broadcast event: {}".format({'topics': notification.topics, 'src': notification.notifier_id}))
# Notify subscribers of message received from broadcast
await self._notifier.notify(notification.topics, notification.data, notifier_id=self._id)
except:
Expand Down
8 changes: 5 additions & 3 deletions fastapi_websocket_pubsub/event_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ async def unsubscribe(self, subscriber_id: SubscriberId, topics: Union[TopicList
async def trigger_callback(self, data, topic: Topic, subscriber_id: SubscriberId, subscription: Subscription):
await subscription.callback(subscription, data)

async def callback_subscribers(self, subscribers: Dict[SubscriberId, Subscription],
async def callback_subscribers(self, subscribers: Dict[SubscriberId, List[Subscription]],
topic: Topic,
data, notifier_id: SubscriberId = None, override_topic=False):
"""
Expand All @@ -144,19 +144,21 @@ async def callback_subscribers(self, subscribers: Dict[SubscriberId, Subscriptio
try:
# Don't notify the notifier
if subscriber_id != notifier_id:
logger.info(f"calling subscription callbacks for sub_id={subscriber_id} with topic={topic}")
for subscription in subscriptions:
if override_topic:
# Report actual topic instead of ALL_TOPICS (or whatever is saved in the subscription)
event = subscription.copy()
event.topic = topic
original_topic = 'ALL_TOPICS' if (subscription.topic == ALL_TOPICS) else subscription.topic
logger.info(f"calling subscription callbacks: topic={topic} ({original_topic}), subscription_id={subscription.id}, subscriber_id={subscriber_id}")
else:
event = subscription
logger.info(f"calling subscription callbacks: topic={topic}, subscription_id={subscription.id}, subscriber_id={subscriber_id}")
# call callback with subscription-info and provided data
await self.trigger_callback(data, topic, subscriber_id, event)
except:
logger.exception(f"Failed to notify subscriber sub_id={subscriber_id} with topic={topic}")


async def notify(self, topics: Union[TopicList, Topic], data=None, notifier_id=None):
"""
Expand Down
4 changes: 4 additions & 0 deletions fastapi_websocket_pubsub/pub_sub_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
from fastapi_websocket_rpc import WebsocketRPCEndpoint
from fastapi_websocket_rpc.rpc_channel import RpcChannel

from .logger import get_logger
from .event_broadcaster import EventBroadcaster
from .event_notifier import ALL_TOPICS, EventCallback, EventNotifier, Subscription, Topic, TopicList
from .rpc_event_methods import RpcEventServerMethods
from .websocket_rpc_event_notifier import WebSocketRpcEventNotifier

logger = get_logger('PubSubEndpoint')

class PubSubEndpoint:
"""
Expand Down Expand Up @@ -65,7 +67,9 @@ async def publish(self, topics: Union[TopicList, Topic], data=None):
"""
# if we have a broadcaster make sure we share with it (no matter where this call comes from)
# sharing here means - the broadcaster listens in to the notifier as well
logger.info(f"Publishing message to topics: {topics}")
if self.broadcaster is not None:
logger.info(f"Acquiring broadcaster sharing context")
async with self.broadcaster.get_context(listen=False, share=True):
await self.notifier.notify(topics, data, notifier_id=self._id)
# otherwise just notify
Expand Down
2 changes: 1 addition & 1 deletion fastapi_websocket_pubsub/rpc_event_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async def subscribe(self, topics: TopicList = []) -> bool:
async def callback(subscription: Subscription, data):
# remove the actual function
sub = subscription.copy(exclude={"callback"})
self.logger.debug("Notifying other side",
self.logger.info("Notifying other side: {}".format({"subscriber_id": subscription.subscriber_id, "subscription_id": subscription.id, "topic": subscription.topic}),
{"subscription":subscription,
"data":data, "channel_id": self.channel.id})
await self.channel.other.notify(subscription=sub, data=data)
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
fastapi-websocket-rpc>=0.1.18
fastapi-websocket-rpc>=0.1.20
broadcaster==0.2.0
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def get_requirements(env=""):

setup(
name='fastapi_websocket_pubsub',
version='0.1.18',
version='0.1.19',
author='Or Weis',
author_email="[email protected]",
description="A fast and durable PubSub channel over Websockets (using fastapi-websockets-rpc).",
Expand Down

0 comments on commit b37ed79

Please sign in to comment.