Skip to content

Commit

Permalink
Merge pull request #1366 Add TLS support to messaging
Browse files Browse the repository at this point in the history
  • Loading branch information
ukanga authored Apr 16, 2018
2 parents 506741f + 39f4dee commit d2aa659
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 30 deletions.
7 changes: 6 additions & 1 deletion onadata/apps/messaging/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,14 @@ NOTIFICATION_BACKENDS = {
'BACKEND': 'onadata.apps.messaging.backends.mqtt.MQTTBackend',
'OPTIONS': {
'HOST': 'localhost', # the MQTT host
'PORT': 1883, # the MQTT port
'QOS': 0, # the MQTT QoS level
'RETAIN': False, # MQTT retain messages option
'TOPIC_BASE': 'onadata' # the topic root
'TOPIC_BASE': 'onadata', # the topic root
'SECURE': False, # whether to attempt a secure connection
'CA_CERT_FILE': 'path to Certificate Authority certificate files',
'CERT_FILE': 'file path to PEM encoded client certificate',
'KEY_FILE': 'file path to PEM encoded client private key'
}
},
}
Expand Down
32 changes: 22 additions & 10 deletions onadata/apps/messaging/backends/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from __future__ import unicode_literals

import json
import ssl

import paho.mqtt.client as paho
import paho.mqtt.publish as publish

from onadata.apps.messaging.backends.base import BaseBackend
from onadata.apps.messaging.constants import PROJECT, USER, XFORM
Expand Down Expand Up @@ -62,16 +63,27 @@ def __init__(self, options=None):
if not options:
raise Exception("MQTT Backend expects configuration options.")

host = options.get('HOST')
if not host:
self.host = options.get('HOST')
if not self.host:
raise Exception("An MQTT host is required.")
self.port = options.get('PORT')
self.cert_info = None
secure = options.get('SECURE', False)
if secure:
if options.get('CA_CERT_FILE') is None:
raise Exception("The Certificate Authority certificate file "
"is required.")
self.cert_info = dict(
ca_certs=options.get('CA_CERT_FILE'),
certfile=options.get('CERT_FILE'),
keyfile=options.get('KEY_FILE'),
tls_version=ssl.PROTOCOL_TLSv1_2,
cert_reqs=ssl.CERT_NONE)

self.qos = options.get('QOS', 0)
self.retain = options.get('RETAIN', False)
self.topic_base = options.get('TOPIC_BASE', 'onadata')

self.client = paho.Client()
self.client.connect(host)

def get_topic(self, instance):
"""
Constructs the message topic
Expand All @@ -96,8 +108,8 @@ def send(self, instance):
"""
topic = self.get_topic(instance)
payload = get_payload(instance)
result = self.client.publish(
topic, payload=payload, qos=self.qos, retain=self.retain)
self.client.disconnect()
# send it

return result
return publish.single(topic, payload=payload, hostname=self.host,
port=self.port, tls=self.cert_info, qos=self.qos,
retain=self.retain)
62 changes: 43 additions & 19 deletions onadata/apps/messaging/tests/test_backends_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
from __future__ import unicode_literals

import json
import ssl

from django.test import TestCase

from mock import MagicMock
from mock import MagicMock, patch

from onadata.apps.messaging.backends.mqtt import (MQTTBackend, get_payload,
get_target_metadata)
Expand Down Expand Up @@ -43,32 +44,33 @@ def test_get_target_metadata(self):

# User objects
user = _create_user('John')
user_metadata = {'id': user.pk,
'name': user.get_full_name()}
self.assertEqual(json.dumps(user_metadata),
json.dumps(get_target_metadata(user)))
user_metadata = {'id': user.pk, 'name': user.get_full_name()}
self.assertEqual(
json.dumps(user_metadata), json.dumps(get_target_metadata(user)))

# XForm objects
xform = MagicMock()
xform.pk = 1337
xform.name = 'Test Form'
xform.id_string = 'Test_Form_ID'
xform._meta.model_name = XFORM
xform_metadata = {'id': 1337,
'name': 'Test Form',
'form_id': 'Test_Form_ID'}
self.assertEqual(json.dumps(xform_metadata),
json.dumps(get_target_metadata(xform)))
xform_metadata = {
'id': 1337,
'name': 'Test Form',
'form_id': 'Test_Form_ID'
}
self.assertEqual(
json.dumps(xform_metadata), json.dumps(get_target_metadata(xform)))

# Project objects
project = MagicMock()
project.pk = 7331
project.name = 'Test Project'
project._meta.model_name = PROJECT
project_metadata = {'id': 7331,
'name': 'Test Project'}
self.assertEqual(json.dumps(project_metadata),
json.dumps(get_target_metadata(project)))
project_metadata = {'id': 7331, 'name': 'Test Project'}
self.assertEqual(
json.dumps(project_metadata),
json.dumps(get_target_metadata(project)))

def test_mqtt_get_payload(self):
"""
Expand All @@ -89,7 +91,7 @@ def test_mqtt_get_payload(self):
'context': {
'type': to_user._meta.model_name,
'metadata': {
'id': to_user.pk,
'id': to_user.pk,
'name': to_user.get_full_name()
}
},
Expand All @@ -98,13 +100,35 @@ def test_mqtt_get_payload(self):
}
self.assertEqual(json.dumps(payload), get_payload(instance))

def test_mqqt_send(self):
@patch('onadata.apps.messaging.backends.mqtt.publish.single')
def test_mqtt_send(self, mocked):
"""
Test MQTT Backend send method
"""
from_user = _create_user('Bob')
to_user = _create_user('Alice')
instance = _create_message(from_user, to_user, 'I love oov')
mqtt = MQTTBackend(options={'HOST': 'localhost'})
result = mqtt.send(instance=instance)
self.assertTrue(result.is_published())
mqtt = MQTTBackend(options={
'HOST': 'localhost',
'PORT': 8883,
'SECURE': True,
'CA_CERT_FILE': 'cacert.pem',
'CERT_FILE': 'emq.pem',
'KEY_FILE': 'emq.key'
})
mqtt.send(instance=instance)
self.assertTrue(mocked.called)
args, kwargs = mocked.call_args_list[0]
self.assertEquals(mqtt.get_topic(instance), args[0])
self.assertEquals(get_payload(instance), kwargs['payload'])
self.assertEquals('localhost', kwargs['hostname'])
self.assertEquals(8883, kwargs['port'])
self.assertEquals(0, kwargs['qos'])
self.assertEquals(False, kwargs['retain'])
self.assertDictEqual(
dict(ca_certs='cacert.pem',
certfile='emq.pem',
keyfile='emq.key',
tls_version=ssl.PROTOCOL_TLSv1_2,
cert_reqs=ssl.CERT_NONE),
kwargs['tls'])

0 comments on commit d2aa659

Please sign in to comment.