Skip to content

Commit

Permalink
Add test for on_connection_success fired on resume
Browse files Browse the repository at this point in the history
  • Loading branch information
sfodagain committed Jul 13, 2023
1 parent 1ca790c commit c1241e4
Showing 1 changed file with 74 additions and 3 deletions.
77 changes: 74 additions & 3 deletions test/test_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ def _create_connection(
tls_context,
port=8883,
use_static_singletons=False,
client_id=None,
on_connection_success_callback=None,
on_connection_failure_callback=None,
on_connection_closed_callback=None):
on_connection_closed_callback=None,
on_connection_resumed_callback=None):
if use_static_singletons:
client = Client(tls_ctx=tls_context)
else:
Expand All @@ -48,12 +50,13 @@ def _create_connection(

connection = Connection(
client=client,
client_id=create_client_id(),
client_id=client_id if client_id else create_client_id(),
host_name=endpoint,
port=port,
on_connection_closed=on_connection_closed_callback,
on_connection_failure=on_connection_failure_callback,
on_connection_success=on_connection_success_callback)
on_connection_success=on_connection_success_callback,
on_connection_resumed=on_connection_resumed_callback)
return connection

def test_connect_disconnect(self):
Expand Down Expand Up @@ -462,6 +465,74 @@ def on_connection_closed_callback(connection, callback_data: OnConnectionClosedD
failureData = onConnectionFailureFuture.result(TIMEOUT)
self.assertTrue(failureData['error'] is not None)

def test_connect_disconnect_with_callbacks_happy_on_resume(self):
# Check that an on_connection_success callback fires on a resumed connection.

# NOTE Since there is no mocked server available on this level, the only sensible approach is to interrupt
# a connection, and wait for it to be resumed automatically. For that, another connection with the same
# client_id connects to the server and then immediately disconnects.

test_input_endpoint = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_HOST")
test_input_cert = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_RSA_CERT")
test_input_key = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_RSA_KEY")
test_tls_opts = TlsContextOptions.create_client_with_mtls_from_path(test_input_cert, test_input_key)
test_tls = ClientTlsContext(test_tls_opts)

on_connection_success_future = Future()
on_connection_closed_future = Future()
on_connection_resumed_future = Future()

def on_connection_success_callback(connection, callback_data: OnConnectionSuccessData):
on_connection_success_future.set_result(
{'return_code': callback_data.return_code, "session_present": callback_data.session_present})

def on_connection_closed_callback(connection, callback_data: OnConnectionClosedData):
on_connection_closed_future.set_result({})

def on_connection_resumed_callback(connection, return_code: ConnectReturnCode, session_present):
on_connection_resumed_future.set_result(
{'return_code': return_code, "session_present": session_present})

connection = self._create_connection(
endpoint=test_input_endpoint,
tls_context=test_tls,
on_connection_success_callback=on_connection_success_callback,
on_connection_closed_callback=on_connection_closed_callback,
on_connection_resumed_callback=on_connection_resumed_callback)
connection.connect().result(TIMEOUT)
success_data = on_connection_success_future.result(TIMEOUT)
self.assertEqual(success_data['return_code'], ConnectReturnCode.ACCEPTED)
self.assertEqual(success_data['session_present'], False)

on_connection_success_future = Future()

on_connection_success_future_dup = Future()

def on_connection_success_callback_dup(connection, callback_data: OnConnectionSuccessData):
on_connection_success_future_dup.set_result({})

# Reuse the same client_id to displace the first connection.
connection_dup = self._create_connection(
endpoint=test_input_endpoint,
tls_context=test_tls,
client_id=connection.client_id,
on_connection_success_callback=on_connection_success_callback_dup)

connection_dup.connect().result(TIMEOUT)
on_connection_success_future_dup.result(TIMEOUT)
connection_dup.disconnect().result(TIMEOUT)

# After the second client disconnects, the first one should reconnect,
# and on_connection_success callback should be fired once again.
on_connection_resumed_future.result(TIMEOUT)
success_data = on_connection_success_future.result(TIMEOUT)

self.assertEqual(success_data['return_code'], ConnectReturnCode.ACCEPTED)
self.assertEqual(success_data['session_present'], False)

connection.disconnect().result(TIMEOUT)
on_connection_closed_future.result(TIMEOUT)

# ==============================================================
# MOSQUITTO CONNECTION TESTS
# ==============================================================
Expand Down

0 comments on commit c1241e4

Please sign in to comment.