Skip to content

Commit

Permalink
add mqtt5to3 adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera committed Aug 9, 2023
1 parent 348f79d commit 0d77da8
Show file tree
Hide file tree
Showing 7 changed files with 320 additions and 7 deletions.
21 changes: 15 additions & 6 deletions awscrt/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from awscrt.http import HttpProxyOptions, HttpRequest
from awscrt.io import ClientBootstrap, ClientTlsContext, SocketOptions
from dataclasses import dataclass
import mqtt5


class QoS(IntEnum):
Expand Down Expand Up @@ -350,7 +351,7 @@ def __init__(self,
on_connection_closed=None
):

assert isinstance(client, Client)
assert isinstance(client, Client) or isinstance(client, mqtt5.Client)
assert callable(on_connection_interrupted) or on_connection_interrupted is None
assert callable(on_connection_resumed) or on_connection_resumed is None
assert isinstance(will, Will) or will is None
Expand All @@ -376,6 +377,7 @@ def __init__(self,

# init-only
self.client = client
self._client_version = 5 if isinstance(client, mqtt5.Client) else 3
self._on_connection_interrupted_cb = on_connection_interrupted
self._on_connection_resumed_cb = on_connection_resumed
self._use_websockets = use_websockets
Expand All @@ -400,11 +402,18 @@ def __init__(self,
self.socket_options = socket_options if socket_options else SocketOptions()
self.proxy_options = proxy_options if proxy_options else websocket_proxy_options

self._binding = _awscrt.mqtt_client_connection_new(
self,
client,
use_websockets,
)
if self._client_version == 3:
self._binding = _awscrt.mqtt_client_connection_new(
self,
client,
use_websockets,
)
elif self._client_version == 5:
self._binding = _awscrt.mqtt_client_connection_new_from_mqtt5_client(
self,
client,
use_websockets,
)

def _check_uses_old_message_callback_signature(self, callback):
# The callback used to have fewer args. Passing only those args, if it
Expand Down
74 changes: 74 additions & 0 deletions awscrt/mqtt5.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from dataclasses import dataclass
from collections.abc import Sequence
from inspect import signature
from mqtt import Connection


class QoS(IntEnum):
Expand Down Expand Up @@ -1629,6 +1630,31 @@ def _on_lifecycle_disconnection(
disconnect_packet=None,
exception=exceptions.from_code(error_code)))

@dataclass
class _Mqtt5to3AdapterOptions:
"""This internal class stores the options that required for creating a new Mqtt3 connection from the mqtt5 client
Args:
host_name (str): Host name of the MQTT server to connect to.
port (int): Network port of the MQTT server to connect to.
client_id (str): A unique string identifying the client to the server. Used to restore session state between connections. If left empty, the broker will auto-assign a unique client id. When reconnecting, the mqtt5 client will always use the auto-assigned client id.
socket_options (SocketOptions): The socket properties of the underlying MQTT connections made by the client or None if defaults are used.
min_reconnect_delay_ms (int): The minimum amount of time to wait to reconnect after a disconnect. Exponential backoff is performed with jitter after each connection failure.
max_reconnect_delay_ms (int): The maximum amount of time to wait to reconnect after a disconnect. Exponential backoff is performed with jitter after each connection failure.
ping_timeout_ms (int): The time interval to wait after sending a PINGREQ for a PINGRESP to arrive. If one does not arrive, the client will close the current connection.
keep_alive_secs (int): The keep alive value, in seconds, A PING will automatically be sent at this interval.
ack_timeout_secs (int): The time interval to wait for an ack after sending a QoS 1+ PUBLISH, SUBSCRIBE, or UNSUBSCRIBE before failing the operation.
clean_session (bool): Whether or not to start a clean session with each reconnect.
"""
host_name: str
port: int
client_id: str
socket_options: SocketOptions
min_reconnect_delay_ms: int
max_reconnect_delay_ms: int
ping_timeout_ms : int
keep_alive_secs : int
ack_timeout_secs: int
clean_session: bool

class Client(NativeResource):
"""This class wraps the aws-c-mqtt MQTT5 client to provide the basic MQTT5 pub/sub functionalities via the AWS Common Runtime
Expand All @@ -1646,6 +1672,8 @@ def __init__(self, client_options: ClientOptions):

core = _ClientCore(client_options)

self.client_options = client_options

bootstrap = client_options.bootstrap

if not bootstrap:
Expand Down Expand Up @@ -1710,6 +1738,19 @@ def __init__(self, client_options: ClientOptions):
websocket_is_none,
core)

# Store the options for mqtt3 adapter
self.adapter_options = _Mqtt5to3AdapterOptions(hostname = client_options.host_name,
port = client_options.port,
client_id=connect_options.client_id
socket_options = socket_options,
min_reconnect_delay_ms=client_options.min_reconnect_delay_ms,
max_reconnect_delay_ms=client_options.max_reconnect_delay_ms,
ping_timeout_ms=client_options.ping_timeout_ms,
keep_alive_secs=connect_options.keep_alive_interval_sec,
ack_timeout_secs=client_options.ack_timeout_sec,
clean_session=client_options.session_behavior < ClientSessionBehaviorType.REJOIN_ALWAYS)


def start(self):
"""Notifies the MQTT5 client that you want it maintain connectivity to the configured endpoint.
The client will attempt to stay connected using the properties of the reconnect-related parameters in the mqtt5 client configuration.
Expand Down Expand Up @@ -1853,3 +1894,36 @@ def get_stats(self):

result = _awscrt.mqtt5_client_get_stats(self._binding)
return OperationStatisticsData(result[0], result[1], result[2], result[3])


def new_connection(self, on_connection_interrupted = None, on_connection_resumed=None,
on_connection_success=None, on_connection_failure=None, on_connection_closed=None):
return Connection(
self,
self.adapter_options.host_name,
self.adapter_options.port,
self.adapter_options.client_id,
clean_session=self.adapter_options.clean_session,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
on_connection_success=on_connection_success,
on_connection_failure=on_connection_failure,
on_connection_closed=on_connection_closed,
reconnect_min_timeout_secs=self.adapter_options.min_reconnect_delay_ms,
reconnect_max_timeout_secs=self.adapter_options.max_reconnect_delay_ms,
keep_alive_secs=self.adapter_options.keep_alive_secs,
ping_timeout_ms=self.adapter_options.ping_timeout_ms,
protocol_operation_timeout_ms=self.adapter_options.ack_timeout_sec * 1000,
socket_options = self.adapter_options.socket_options,

# For the arugments below, set it to `None` will directly use the options from mqtt5 client underlying.
will=None,
will=None,
username=None,
password=None,
# Similar to previous options, set it False will use mqtt5 setup for websockets. It is not necessary means the websocket is disabled.
use_websockets=False,
websocket_proxy_options=None,
websocket_handshake_transform=None,
proxy_options=None
)
1 change: 1 addition & 0 deletions source/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,7 @@ static PyMethodDef s_module_methods[] = {

/* MQTT Client Connection */
AWS_PY_METHOD_DEF(mqtt_client_connection_new, METH_VARARGS),
AWS_PY_METHOD_DEF(mqtt_client_connection_new_from_mqtt5_client, METH_VARARGS),
AWS_PY_METHOD_DEF(mqtt_client_connection_connect, METH_VARARGS),
AWS_PY_METHOD_DEF(mqtt_client_connection_reconnect, METH_VARARGS),
AWS_PY_METHOD_DEF(mqtt_client_connection_publish, METH_VARARGS),
Expand Down
102 changes: 102 additions & 0 deletions source/mqtt_client_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* SPDX-License-Identifier: Apache-2.0.
*/
#include "mqtt_client_connection.h"
#include "mqtt5_client.h"

#include "http.h"
#include "io.h"
Expand Down Expand Up @@ -367,6 +368,107 @@ PyObject *aws_py_mqtt_client_connection_new(PyObject *self, PyObject *args) {
return NULL;
}

PyObject *aws_py_mqtt_client_connection_new_from_mqtt5_client(PyObject *self, PyObject *args) {
(void)self;

struct aws_allocator *allocator = aws_py_get_allocator();

PyObject *self_py;
PyObject *client_py;
PyObject *use_websocket_py;
if (!PyArg_ParseTuple(args, "OOO", &self_py, &client_py, &use_websocket_py)) {
return NULL;
}

struct aws_mqtt5_client *client = aws_py_get_mqtt5_client(client_py);
if (!client) {
return NULL;
}

struct mqtt_connection_binding *py_connection =
aws_mem_calloc(allocator, 1, sizeof(struct mqtt_connection_binding));
if (!py_connection) {
PyErr_SetAwsLastError();
return NULL;
}

/* From hereon, we need to clean up if errors occur */

py_connection->native = aws_mqtt_client_connection_new_from_mqtt5_client(client);
if (!py_connection->native) {
PyErr_SetAwsLastError();
goto connection_new_failed;
}

if (aws_mqtt_client_connection_set_connection_result_handlers(
py_connection->native, s_on_connection_success, py_connection, s_on_connection_failure, py_connection)) {
PyErr_SetAwsLastError();
goto set_connection_handlers_failed;
}

if (aws_mqtt_client_connection_set_connection_interruption_handlers(
py_connection->native,
s_on_connection_interrupted,
py_connection,
s_on_connection_resumed,
py_connection)) {

PyErr_SetAwsLastError();
goto set_interruption_failed;
}

if (aws_mqtt_client_connection_set_connection_closed_handler(
py_connection->native, s_on_connection_closed, py_connection)) {
PyErr_SetAwsLastError();
goto set_interruption_failed;
}

if (PyObject_IsTrue(use_websocket_py)) {
if (aws_mqtt_client_connection_use_websockets(
py_connection->native,
s_ws_handshake_transform,
py_connection /*transform userdata*/,
NULL /*validator*/,
NULL /*validator userdata*/)) {

PyErr_SetAwsLastError();
goto use_websockets_failed;
}
}

PyObject *self_proxy = PyWeakref_NewProxy(self_py, NULL);
if (!self_proxy) {
goto proxy_new_failed;
}

PyObject *capsule =
PyCapsule_New(py_connection, s_capsule_name_mqtt_client_connection, s_mqtt_python_connection_destructor);
if (!capsule) {
goto capsule_new_failed;
}

/* From hereon, nothing will fail */

py_connection->self_capsule = capsule;
py_connection->self_proxy = self_proxy;

py_connection->client = client_py;
Py_INCREF(py_connection->client);

return capsule;

capsule_new_failed:
Py_DECREF(self_proxy);
proxy_new_failed:
use_websockets_failed:
set_interruption_failed:
set_connection_handlers_failed:
aws_mqtt_client_connection_release(py_connection->native);
connection_new_failed:
aws_mem_release(allocator, py_connection);
return NULL;
}

struct aws_mqtt_client_connection *aws_py_get_mqtt_client_connection(PyObject *mqtt_connection) {
AWS_PY_RETURN_NATIVE_FROM_BINDING(
mqtt_connection, s_capsule_name_mqtt_client_connection, "Connection", mqtt_connection_binding);
Expand Down
1 change: 1 addition & 0 deletions source/mqtt_client_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "module.h"

PyObject *aws_py_mqtt_client_connection_new(PyObject *self, PyObject *args);
PyObject *aws_py_mqtt_client_connection_new_from_mqtt5_client(PyObject *self, PyObject *args);
PyObject *aws_py_mqtt_client_connection_connect(PyObject *self, PyObject *args);
PyObject *aws_py_mqtt_client_connection_reconnect(PyObject *self, PyObject *args);
PyObject *aws_py_mqtt_client_connection_publish(PyObject *self, PyObject *args);
Expand Down
Loading

0 comments on commit 0d77da8

Please sign in to comment.