From 0d77da86bbe8ef04ef7f40e94278b8793a3af9b6 Mon Sep 17 00:00:00 2001 From: Zhihui Xia Date: Wed, 9 Aug 2023 16:06:41 -0700 Subject: [PATCH] add mqtt5to3 adapter --- awscrt/mqtt.py | 21 ++++-- awscrt/mqtt5.py | 74 +++++++++++++++++++ crt/aws-c-mqtt | 2 +- source/module.c | 1 + source/mqtt_client_connection.c | 102 ++++++++++++++++++++++++++ source/mqtt_client_connection.h | 1 + test/test_mqtt5.py | 126 ++++++++++++++++++++++++++++++++ 7 files changed, 320 insertions(+), 7 deletions(-) diff --git a/awscrt/mqtt.py b/awscrt/mqtt.py index 6990aa34f..97f918429 100644 --- a/awscrt/mqtt.py +++ b/awscrt/mqtt.py @@ -15,6 +15,7 @@ from awscrt.http import HttpProxyOptions, HttpRequest from awscrt.io import ClientBootstrap, ClientTlsContext, SocketOptions from dataclasses import dataclass +import mqtt5 class QoS(IntEnum): @@ -350,7 +351,7 @@ def __init__(self, on_connection_closed=None ): - assert isinstance(client, Client) + assert isinstance(client, Client) or isinstance(client, mqtt5.Client) assert callable(on_connection_interrupted) or on_connection_interrupted is None assert callable(on_connection_resumed) or on_connection_resumed is None assert isinstance(will, Will) or will is None @@ -376,6 +377,7 @@ def __init__(self, # init-only self.client = client + self._client_version = 5 if isinstance(client, mqtt5.Client) else 3 self._on_connection_interrupted_cb = on_connection_interrupted self._on_connection_resumed_cb = on_connection_resumed self._use_websockets = use_websockets @@ -400,11 +402,18 @@ def __init__(self, self.socket_options = socket_options if socket_options else SocketOptions() self.proxy_options = proxy_options if proxy_options else websocket_proxy_options - self._binding = _awscrt.mqtt_client_connection_new( - self, - client, - use_websockets, - ) + if self._client_version == 3: + self._binding = _awscrt.mqtt_client_connection_new( + self, + client, + use_websockets, + ) + elif self._client_version == 5: + self._binding = _awscrt.mqtt_client_connection_new_from_mqtt5_client( + self, + client, + use_websockets, + ) def _check_uses_old_message_callback_signature(self, callback): # The callback used to have fewer args. Passing only those args, if it diff --git a/awscrt/mqtt5.py b/awscrt/mqtt5.py index 1de9d2ef7..d932b3060 100644 --- a/awscrt/mqtt5.py +++ b/awscrt/mqtt5.py @@ -15,6 +15,7 @@ from dataclasses import dataclass from collections.abc import Sequence from inspect import signature +from mqtt import Connection class QoS(IntEnum): @@ -1629,6 +1630,31 @@ def _on_lifecycle_disconnection( disconnect_packet=None, exception=exceptions.from_code(error_code))) +@dataclass +class _Mqtt5to3AdapterOptions: + """This internal class stores the options that required for creating a new Mqtt3 connection from the mqtt5 client + Args: + host_name (str): Host name of the MQTT server to connect to. + port (int): Network port of the MQTT server to connect to. + client_id (str): A unique string identifying the client to the server. Used to restore session state between connections. If left empty, the broker will auto-assign a unique client id. When reconnecting, the mqtt5 client will always use the auto-assigned client id. + socket_options (SocketOptions): The socket properties of the underlying MQTT connections made by the client or None if defaults are used. + min_reconnect_delay_ms (int): The minimum amount of time to wait to reconnect after a disconnect. Exponential backoff is performed with jitter after each connection failure. + max_reconnect_delay_ms (int): The maximum amount of time to wait to reconnect after a disconnect. Exponential backoff is performed with jitter after each connection failure. + ping_timeout_ms (int): The time interval to wait after sending a PINGREQ for a PINGRESP to arrive. If one does not arrive, the client will close the current connection. + keep_alive_secs (int): The keep alive value, in seconds, A PING will automatically be sent at this interval. + ack_timeout_secs (int): The time interval to wait for an ack after sending a QoS 1+ PUBLISH, SUBSCRIBE, or UNSUBSCRIBE before failing the operation. + clean_session (bool): Whether or not to start a clean session with each reconnect. + """ + host_name: str + port: int + client_id: str + socket_options: SocketOptions + min_reconnect_delay_ms: int + max_reconnect_delay_ms: int + ping_timeout_ms : int + keep_alive_secs : int + ack_timeout_secs: int + clean_session: bool class Client(NativeResource): """This class wraps the aws-c-mqtt MQTT5 client to provide the basic MQTT5 pub/sub functionalities via the AWS Common Runtime @@ -1646,6 +1672,8 @@ def __init__(self, client_options: ClientOptions): core = _ClientCore(client_options) + self.client_options = client_options + bootstrap = client_options.bootstrap if not bootstrap: @@ -1710,6 +1738,19 @@ def __init__(self, client_options: ClientOptions): websocket_is_none, core) + # Store the options for mqtt3 adapter + self.adapter_options = _Mqtt5to3AdapterOptions(hostname = client_options.host_name, + port = client_options.port, + client_id=connect_options.client_id + socket_options = socket_options, + min_reconnect_delay_ms=client_options.min_reconnect_delay_ms, + max_reconnect_delay_ms=client_options.max_reconnect_delay_ms, + ping_timeout_ms=client_options.ping_timeout_ms, + keep_alive_secs=connect_options.keep_alive_interval_sec, + ack_timeout_secs=client_options.ack_timeout_sec, + clean_session=client_options.session_behavior < ClientSessionBehaviorType.REJOIN_ALWAYS) + + def start(self): """Notifies the MQTT5 client that you want it maintain connectivity to the configured endpoint. The client will attempt to stay connected using the properties of the reconnect-related parameters in the mqtt5 client configuration. @@ -1853,3 +1894,36 @@ def get_stats(self): result = _awscrt.mqtt5_client_get_stats(self._binding) return OperationStatisticsData(result[0], result[1], result[2], result[3]) + + + def new_connection(self, on_connection_interrupted = None, on_connection_resumed=None, + on_connection_success=None, on_connection_failure=None, on_connection_closed=None): + return Connection( + self, + self.adapter_options.host_name, + self.adapter_options.port, + self.adapter_options.client_id, + clean_session=self.adapter_options.clean_session, + on_connection_interrupted=on_connection_interrupted, + on_connection_resumed=on_connection_resumed, + on_connection_success=on_connection_success, + on_connection_failure=on_connection_failure, + on_connection_closed=on_connection_closed, + reconnect_min_timeout_secs=self.adapter_options.min_reconnect_delay_ms, + reconnect_max_timeout_secs=self.adapter_options.max_reconnect_delay_ms, + keep_alive_secs=self.adapter_options.keep_alive_secs, + ping_timeout_ms=self.adapter_options.ping_timeout_ms, + protocol_operation_timeout_ms=self.adapter_options.ack_timeout_sec * 1000, + socket_options = self.adapter_options.socket_options, + + # For the arugments below, set it to `None` will directly use the options from mqtt5 client underlying. + will=None, + will=None, + username=None, + password=None, + # Similar to previous options, set it False will use mqtt5 setup for websockets. It is not necessary means the websocket is disabled. + use_websockets=False, + websocket_proxy_options=None, + websocket_handshake_transform=None, + proxy_options=None + ) diff --git a/crt/aws-c-mqtt b/crt/aws-c-mqtt index d5c268f70..b777be4aa 160000 --- a/crt/aws-c-mqtt +++ b/crt/aws-c-mqtt @@ -1 +1 @@ -Subproject commit d5c268f70aeccf38e75d3e74ce4eb9629df02e2a +Subproject commit b777be4aa52d233391781256d40006d353e848b9 diff --git a/source/module.c b/source/module.c index a971bb6d0..0ba82f057 100644 --- a/source/module.c +++ b/source/module.c @@ -688,6 +688,7 @@ static PyMethodDef s_module_methods[] = { /* MQTT Client Connection */ AWS_PY_METHOD_DEF(mqtt_client_connection_new, METH_VARARGS), + AWS_PY_METHOD_DEF(mqtt_client_connection_new_from_mqtt5_client, METH_VARARGS), AWS_PY_METHOD_DEF(mqtt_client_connection_connect, METH_VARARGS), AWS_PY_METHOD_DEF(mqtt_client_connection_reconnect, METH_VARARGS), AWS_PY_METHOD_DEF(mqtt_client_connection_publish, METH_VARARGS), diff --git a/source/mqtt_client_connection.c b/source/mqtt_client_connection.c index 5cf5e6f96..db992a8d3 100644 --- a/source/mqtt_client_connection.c +++ b/source/mqtt_client_connection.c @@ -3,6 +3,7 @@ * SPDX-License-Identifier: Apache-2.0. */ #include "mqtt_client_connection.h" +#include "mqtt5_client.h" #include "http.h" #include "io.h" @@ -367,6 +368,107 @@ PyObject *aws_py_mqtt_client_connection_new(PyObject *self, PyObject *args) { return NULL; } +PyObject *aws_py_mqtt_client_connection_new_from_mqtt5_client(PyObject *self, PyObject *args) { + (void)self; + + struct aws_allocator *allocator = aws_py_get_allocator(); + + PyObject *self_py; + PyObject *client_py; + PyObject *use_websocket_py; + if (!PyArg_ParseTuple(args, "OOO", &self_py, &client_py, &use_websocket_py)) { + return NULL; + } + + struct aws_mqtt5_client *client = aws_py_get_mqtt5_client(client_py); + if (!client) { + return NULL; + } + + struct mqtt_connection_binding *py_connection = + aws_mem_calloc(allocator, 1, sizeof(struct mqtt_connection_binding)); + if (!py_connection) { + PyErr_SetAwsLastError(); + return NULL; + } + + /* From hereon, we need to clean up if errors occur */ + + py_connection->native = aws_mqtt_client_connection_new_from_mqtt5_client(client); + if (!py_connection->native) { + PyErr_SetAwsLastError(); + goto connection_new_failed; + } + + if (aws_mqtt_client_connection_set_connection_result_handlers( + py_connection->native, s_on_connection_success, py_connection, s_on_connection_failure, py_connection)) { + PyErr_SetAwsLastError(); + goto set_connection_handlers_failed; + } + + if (aws_mqtt_client_connection_set_connection_interruption_handlers( + py_connection->native, + s_on_connection_interrupted, + py_connection, + s_on_connection_resumed, + py_connection)) { + + PyErr_SetAwsLastError(); + goto set_interruption_failed; + } + + if (aws_mqtt_client_connection_set_connection_closed_handler( + py_connection->native, s_on_connection_closed, py_connection)) { + PyErr_SetAwsLastError(); + goto set_interruption_failed; + } + + if (PyObject_IsTrue(use_websocket_py)) { + if (aws_mqtt_client_connection_use_websockets( + py_connection->native, + s_ws_handshake_transform, + py_connection /*transform userdata*/, + NULL /*validator*/, + NULL /*validator userdata*/)) { + + PyErr_SetAwsLastError(); + goto use_websockets_failed; + } + } + + PyObject *self_proxy = PyWeakref_NewProxy(self_py, NULL); + if (!self_proxy) { + goto proxy_new_failed; + } + + PyObject *capsule = + PyCapsule_New(py_connection, s_capsule_name_mqtt_client_connection, s_mqtt_python_connection_destructor); + if (!capsule) { + goto capsule_new_failed; + } + + /* From hereon, nothing will fail */ + + py_connection->self_capsule = capsule; + py_connection->self_proxy = self_proxy; + + py_connection->client = client_py; + Py_INCREF(py_connection->client); + + return capsule; + +capsule_new_failed: + Py_DECREF(self_proxy); +proxy_new_failed: +use_websockets_failed: +set_interruption_failed: +set_connection_handlers_failed: + aws_mqtt_client_connection_release(py_connection->native); +connection_new_failed: + aws_mem_release(allocator, py_connection); + return NULL; +} + struct aws_mqtt_client_connection *aws_py_get_mqtt_client_connection(PyObject *mqtt_connection) { AWS_PY_RETURN_NATIVE_FROM_BINDING( mqtt_connection, s_capsule_name_mqtt_client_connection, "Connection", mqtt_connection_binding); diff --git a/source/mqtt_client_connection.h b/source/mqtt_client_connection.h index 6bdc05017..6dfcbd90c 100644 --- a/source/mqtt_client_connection.h +++ b/source/mqtt_client_connection.h @@ -12,6 +12,7 @@ #include "module.h" PyObject *aws_py_mqtt_client_connection_new(PyObject *self, PyObject *args); +PyObject *aws_py_mqtt_client_connection_new_from_mqtt5_client(PyObject *self, PyObject *args); PyObject *aws_py_mqtt_client_connection_connect(PyObject *self, PyObject *args); PyObject *aws_py_mqtt_client_connection_reconnect(PyObject *self, PyObject *args); PyObject *aws_py_mqtt_client_connection_publish(PyObject *self, PyObject *args); diff --git a/test/test_mqtt5.py b/test/test_mqtt5.py index d225534cc..33a52099e 100644 --- a/test/test_mqtt5.py +++ b/test/test_mqtt5.py @@ -1549,6 +1549,132 @@ def test_operation_statistics_uc1(self): client.stop() callbacks.future_stopped.result(TIMEOUT) + # ============================================================== + # 5to3 ADAPTER TEST CASES + # ============================================================== + def test_5to3Adapter_connection_creation_minimum(self): + client5 = self._create_client() + connection = client5.new_connection() + + def test_5to3Adapter_connection_creation_maximum(self): + input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") + + user_properties = [] + user_properties.append(mqtt5.UserProperty(name="name1", value="value1")) + user_properties.append(mqtt5.UserProperty(name="name2", value="value2")) + + publish_packet = mqtt5.PublishPacket( + payload="TEST_PAYLOAD", + qos=mqtt5.QoS.AT_LEAST_ONCE, + retain=False, + topic="TEST_TOPIC", + payload_format_indicator=mqtt5.PayloadFormatIndicator.AWS_MQTT5_PFI_UTF8, + message_expiry_interval_sec=10, + topic_alias=1, + response_topic="TEST_RESPONSE_TOPIC", + correlation_data="TEST_CORRELATION_DATA", + content_type="TEST_CONTENT_TYPE", + user_properties=user_properties + ) + + connect_options = mqtt5.ConnectPacket( + keep_alive_interval_sec=10, + client_id="TEST_CLIENT", + username="USERNAME", + password="PASSWORD", + session_expiry_interval_sec=100, + request_response_information=1, + request_problem_information=1, + receive_maximum=1000, + maximum_packet_size=10000, + will_delay_interval_sec=1000, + will=publish_packet, + user_properties=user_properties + ) + client_options = mqtt5.ClientOptions( + host_name=input_host_name, + port=8883, + connect_options=connect_options, + session_behavior=mqtt5.ClientSessionBehaviorType.CLEAN, + extended_validation_and_flow_control_options=mqtt5.ExtendedValidationAndFlowControlOptions.AWS_IOT_CORE_DEFAULTS, + offline_queue_behavior=mqtt5.ClientOperationQueueBehaviorType.FAIL_ALL_ON_DISCONNECT, + retry_jitter_mode=mqtt5.ExponentialBackoffJitterMode.DECORRELATED, + min_reconnect_delay_ms=100, + max_reconnect_delay_ms=50000, + min_connected_time_to_reset_reconnect_delay_ms=1000, + ping_timeout_ms=1000, + connack_timeout_ms=1000, + ack_timeout_sec=100) + client = self._create_client(client_options=client_options) + connection = client.new_connection() + + def test_5to3Adapter_direct_connect_minimum(self): + input_host_name = _get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_HOST") + input_port = int(_get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_PORT")) + + client_options = mqtt5.ClientOptions( + host_name=input_host_name, + port=input_port + ) + callbacks = Mqtt5TestCallbacks() + client = self._create_client(client_options=client_options, callbacks=callbacks) + + connection = client.new_connection() + connection.connect().result(TIMEOUT) + connection.disconnect().result(TIMEOUT) + + def test_5to3Adapter_direct_connect_minimum(self): + input_host_name = _get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_HOST") + input_port = int(_get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_PORT")) + + client_options = mqtt5.ClientOptions( + host_name=input_host_name, + port=input_port + ) + callbacks = Mqtt5TestCallbacks() + client = self._create_client(client_options=client_options, callbacks=callbacks) + + connection = client.new_connection() + connection.connect().result(TIMEOUT) + connection.disconnect().result(TIMEOUT) + + def test_5to3Adapter_websocket_connect_minimum(self): + input_host_name = _get_env_variable("AWS_TEST_MQTT5_WS_MQTT_HOST") + input_port = int(_get_env_variable("AWS_TEST_MQTT5_WS_MQTT_PORT")) + + client_options = mqtt5.ClientOptions( + host_name=input_host_name, + port=input_port + ) + callbacks = Mqtt5TestCallbacks() + client_options.websocket_handshake_transform = callbacks.ws_handshake_transform + + client = self._create_client(client_options=client_options, callbacks=callbacks) + connection = client.new_connection() + connection.connect(TIMEOUT) + callbacks.future_connection_success.result(TIMEOUT) + connection.disconnect(TIMEOUT) + + def test_5to3Adapter_direct_connect_mutual_tls(self): + input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") + input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") + input_key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY") + + client_options = mqtt5.ClientOptions( + host_name=input_host_name, + port=8883 + ) + tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path( + input_cert, + input_key + ) + client_options.tls_ctx = io.ClientTlsContext(tls_ctx_options) + + client = self._create_client(client_options=client_options, callbacks=callbacks) + connection = client.new_connection() + connection.connect().result(TIMEOUT) + connection.disconnect().result(TIMEOUT) + if __name__ == 'main': unittest.main()