Skip to content

Commit

Permalink
Merge pull request #107 from brentru/fix-subscribe-callbacks
Browse files Browse the repository at this point in the history
MQTT_Client: Set QoS Level, Handle Subscription Callbacks
  • Loading branch information
brentru authored Dec 23, 2019
2 parents dccf731 + 611823b commit 7f7d23f
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 20 deletions.
2 changes: 1 addition & 1 deletion Adafruit_IO/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "2.4.1"
__version__ = "2.4.0"
56 changes: 37 additions & 19 deletions Adafruit_IO/mqtt_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (c) 2014 Adafruit Industries
# Author: Tony DiCola
# Copyright (c) 2020 Adafruit Industries
# Author: Tony DiCola, Brent Rubell

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -43,9 +43,10 @@ class MQTTClient(object):
def __init__(self, username, key, service_host='io.adafruit.com', secure=True):
"""Create instance of MQTT client.
:param username: Adafruit.IO Username for your account.
:param key: Adafruit IO access key (AIO Key) for your account.
:param secure: (optional, boolean) Switches secure/insecure connections
:param username: Adafruit.IO Username for your account.
:param key: Adafruit IO access key (AIO Key) for your account.
:param secure: (optional, boolean) Switches secure/insecure connections
"""
self._username = username
self._service_host = service_host
Expand All @@ -70,6 +71,7 @@ def __init__(self, username, key, service_host='io.adafruit.com', secure=True):
self._client.on_connect = self._mqtt_connect
self._client.on_disconnect = self._mqtt_disconnect
self._client.on_message = self._mqtt_message
self._client.on_subscribe = self._mqtt_subscribe
self._connected = False


Expand All @@ -95,7 +97,7 @@ def _mqtt_disconnect(self, client, userdata, rc):
# log the RC as an error. Continue on to call any disconnect handler
# so clients can potentially recover gracefully.
if rc != 0:
print("Unexpected disconnection.")
print('Unexpected disconnection.')
raise MQTTError(rc)
print('Disconnected from Adafruit IO!')
# Call the on_disconnect callback if available.
Expand All @@ -105,6 +107,7 @@ def _mqtt_disconnect(self, client, userdata, rc):
def _mqtt_message(self, client, userdata, msg):
"""Parse out the topic and call on_message callback
assume topic looks like `username/topic/id`
"""
logger.debug('Client on_message called.')
parsed_topic = msg.topic.split('/')
Expand All @@ -124,15 +127,19 @@ def _mqtt_message(self, client, userdata, msg):
else:
raise ValueError('on_message not defined')
self.on_message(self, topic, payload)
def _mqtt_subscribe(client, userdata, mid, granted_qos):

def _mqtt_subscribe(self, client, userdata, mid, granted_qos):
"""Called when broker responds to a subscribe request."""
logger.debug('Client called on_subscribe')
if self.on_subscribe is not None:
self.on_subscribe(self, userdata, mid, granted_qos)

def connect(self, **kwargs):
"""Connect to the Adafruit.IO service. Must be called before any loop
or publish operations are called. Will raise an exception if a
connection cannot be made. Optional keyword arguments will be passed
to paho-mqtt client connect function.
"""
# Skip calling connect if already connected.
if self._connected:
Expand All @@ -145,6 +152,7 @@ def connect(self, **kwargs):

def is_connected(self):
"""Returns True if connected to Adafruit.IO and False if not connected.
"""
return self._connected

Expand All @@ -157,9 +165,9 @@ def loop_background(self, stop=None):
"""Starts a background thread to listen for messages from Adafruit.IO
and call the appropriate callbacks when feed events occur. Will return
immediately and will not block execution. Should only be called once.
Params:
- stop: boolean, stops the execution of the background loop.
:param bool stop: Stops the execution of the background loop.
"""
if stop:
self._client.loop_stop()
Expand All @@ -174,6 +182,7 @@ def loop_blocking(self):
listen and respond to Adafruit.IO feed events. If you need to do other
processing, consider using the loop_background function to run a loop
in the background.
"""
self._client.loop_forever()

Expand All @@ -185,28 +194,36 @@ def loop(self, timeout_sec=1.0):
The optional timeout_sec parameter specifies at most how long to block
execution waiting for messages when this function is called. The default
is one second.
"""
self._client.loop(timeout=timeout_sec)

def subscribe(self, feed_id, feed_user=None):
def subscribe(self, feed_id, feed_user=None, qos=0):
"""Subscribe to changes on the specified feed. When the feed is updated
the on_message function will be called with the feed_id and new value.
Params:
- feed_id: The id of the feed to subscribe to.
- feed_user (optional): The user id of the feed. Used for feed sharing functionality.
:param str feed_id: The key of the feed to subscribe to.
:param str feed_user: Optional, identifies feed owner. Used for feed sharing.
:param int qos: The QoS to use when subscribing. Defaults to 0.
"""
if qos > 1:
raise MQTTError("Adafruit IO only supports a QoS level of 0 or 1.")
if feed_user is not None:
(res, mid) = self._client.subscribe('{0}/feeds/{1}'.format(feed_user, feed_id))
(res, mid) = self._client.subscribe('{0}/feeds/{1}'.format(feed_user, feed_id, qos=qos))
else:
(res, mid) = self._client.subscribe('{0}/feeds/{1}'.format(self._username, feed_id))
(res, mid) = self._client.subscribe('{0}/feeds/{1}'.format(self._username, feed_id), qos=qos)
return res, mid

def subscribe_group(self, group_id):
def subscribe_group(self, group_id, qos=0):
"""Subscribe to changes on the specified group. When the group is updated
the on_message function will be called with the group_id and the new value.
:param str feed_id: The key of the feed to subscribe to.
:param int qos: The QoS to use when subscribing. Defaults to 0.
"""
self._client.subscribe('{0}/groups/{1}'.format(self._username, group_id))
self._client.subscribe('{0}/groups/{1}'.format(self._username, group_id), qos=qos)

def subscribe_randomizer(self, randomizer_id):
"""Subscribe to changes on a specified random data stream from
Expand All @@ -216,6 +233,7 @@ def subscribe_randomizer(self, randomizer_id):
every client that is subscribed to the same topic.
:param int randomizer_id: ID of the random word record you want data for.
"""
self._client.subscribe('{0}/integration/words/{1}'.format(self._username, randomizer_id))

Expand Down
5 changes: 5 additions & 0 deletions examples/mqtt/mqtt_subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ def connected(client):
# Subscribe to changes on a feed named DemoFeed.
client.subscribe(FEED_ID)

def subscribe(client, userdata, mid, granted_qos):
# This method is called when the client subscribes to a new feed.
print('Subscribed to {0} with QoS {1}'.format(FEED_ID, granted_qos[0]))

def disconnected(client):
# Disconnected function will be called when the client disconnects.
print('Disconnected from Adafruit IO!')
Expand All @@ -50,6 +54,7 @@ def message(client, feed_id, payload):
client.on_connect = connected
client.on_disconnect = disconnected
client.on_message = message
client.on_subscribe = subscribe

# Connect to the Adafruit IO server.
client.connect()
Expand Down

0 comments on commit 7f7d23f

Please sign in to comment.