Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to share same client instance? #12

Closed
MinChanSike opened this issue Oct 11, 2023 · 3 comments
Closed

How to share same client instance? #12

MinChanSike opened this issue Oct 11, 2023 · 3 comments

Comments

@MinChanSike
Copy link

MinChanSike commented Oct 11, 2023

Hi, Thanks for creating a great libraries.
I have question regarding MQTTnet.Rx.Client package to share same client connection for all message subscription and publishing.
I see multiple client connections connected to the server. How can be use to share same client instance?

 internal class Program {
	static async Task Main(string[] args) {
		var serverPort = 2883;
		var mqttServerOptions = new MqttServerOptionsBuilder()
								.WithDefaultEndpointPort(serverPort)
								.WithDefaultEndpoint().Build();

		var mqttFactory = new MqttFactory();
		var mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions);
		mqttServer.ClientConnectedAsync += (args) => {
			Console.WriteLine($"SERVER: ClientConnectedAsync => clientId:{args.ClientId}");
			return Task.CompletedTask;
		};
		mqttServer.ClientDisconnectedAsync += (args) => {
			Console.WriteLine($"SERVER: ClientDisconnectedAsync => clientId:{args.ClientId}");
			return Task.CompletedTask;
		};
		Console.WriteLine("Starting MQTT server...");
		await mqttServer.StartAsync();
		Console.WriteLine("MQTT server is started.");
		Console.WriteLine("------------------------------------\n");


		var obsClient = Create.ManagedMqttClient()
							  .WithManagedClientOptions(options => {
								  options.WithClientOptions(c => {
									  c.WithTcpServer("localhost", serverPort);
									  //c.WithClientId("Client01");
								  });
								  options.WithAutoReconnectDelay(TimeSpan.FromSeconds(2));
							  });

		obsClient.Subscribe(i => {
			i.Connected().Subscribe((args) => {
				Console.WriteLine($"{DateTime.Now.Dump()}\t CLIENT: Connected with server.");
			});
			i.Disconnected().Subscribe((args) => {
				Console.WriteLine($"{DateTime.Now.Dump()}\t CLIENT: Disconnected with server.");
			});
		});

		var s1 = obsClient.SubscribeToTopic("FromMilliseconds")
				 .Subscribe(r => {
					 Console.WriteLine($"\tCLIENT S1: {r.ReasonCode} [{r.ApplicationMessage.Topic}] value : {r.ApplicationMessage.ConvertPayloadToString()}");
				 });

		var s2 = obsClient.SubscribeToTopic("FromMilliseconds")
				 .Subscribe(r => {
					 Console.WriteLine($"\tCLIENT S2: {r.ReasonCode} [{r.ApplicationMessage.Topic}] value : {r.ApplicationMessage.ConvertPayloadToString()}");
				 });

		Subject<(string topic, string payload)> _message = new();

		Observable.Interval(TimeSpan.FromMilliseconds(1000)).Subscribe(i => _message.OnNext(("FromMilliseconds", "{" + $"payload: {i}" + "}")));

		obsClient.PublishMessage(_message).Subscribe(r => {
			//Console.WriteLine($"Publish result [{r.Exception == null}], Id:{r.ApplicationMessage.Id}, Topic:{r.ApplicationMessage.ApplicationMessage.Topic}");
		});



		await Task.Delay(3000);
		s2.Dispose();
		Console.WriteLine("Dispose S2 ---------------");

		Console.Read();
	}
}

Console Output1

Starting MQTT server...
MQTT server is started.
------------------------------------

SERVER: ClientConnectedAsync => clientId:96e2cb5dce544559b823f12b17285864
SERVER: ClientConnectedAsync => clientId:936ca9900f3c4f599d4e7a63b32890ab
SERVER: ClientConnectedAsync => clientId:c3a72d8bfba64a5cbb523bb34fcc3094
SERVER: ClientConnectedAsync => clientId:77782d2f2f124550bee68adff8d80940
03:29:20.4536    CLIENT: Connected with server.
        CLIENT S2: Success [FromMilliseconds] value : {payload: 0}
        CLIENT S1: Success [FromMilliseconds] value : {payload: 0}
        CLIENT S1: Success [FromMilliseconds] value : {payload: 1}
        CLIENT S2: Success [FromMilliseconds] value : {payload: 1}
        CLIENT S1: Success [FromMilliseconds] value : {payload: 2}
        CLIENT S2: Success [FromMilliseconds] value : {payload: 2}
Dispose S2 ---------------
SERVER: ClientDisconnectedAsync => clientId:c3a72d8bfba64a5cbb523bb34fcc3094
        CLIENT S1: Success [FromMilliseconds] value : {payload: 3}
        CLIENT S1: Success [FromMilliseconds] value : {payload: 4}
        CLIENT S1: Success [FromMilliseconds] value : {payload: 5}

If I try to use constant clientId in MqttClientOptionsBuilder c.WithClientId("Client01");

Console Output2

Starting MQTT server...
MQTT server is started.
------------------------------------

SERVER: ClientDisconnectedAsync => clientId:Client01
SERVER: ClientDisconnectedAsync => clientId:Client01
SERVER: ClientDisconnectedAsync => clientId:Client01
SERVER: ClientConnectedAsync => clientId:Client01
SERVER: ClientConnectedAsync => clientId:Client01
SERVER: ClientConnectedAsync => clientId:Client01
SERVER: ClientConnectedAsync => clientId:Client01
03:30:48.6046    CLIENT: Connected with server.
03:30:48.6194    CLIENT: Disconnected with server.
SERVER: ClientDisconnectedAsync => clientId:Client01
SERVER: ClientDisconnectedAsync => clientId:Client01
SERVER: ClientConnectedAsync => clientId:Client01
SERVER: ClientConnectedAsync => clientId:Client01
SERVER: ClientDisconnectedAsync => clientId:Client01
SERVER: ClientConnectedAsync => clientId:Client01
03:30:50.6232    CLIENT: Connected with server.
SERVER: ClientDisconnectedAsync => clientId:Client01
SERVER: ClientConnectedAsync => clientId:Client01
SERVER: ClientDisconnectedAsync => clientId:Client01
SERVER: ClientConnectedAsync => clientId:Client01
03:30:50.6594    CLIENT: Disconnected with server.
SERVER: ClientDisconnectedAsync => clientId:Client01
SERVER: ClientConnectedAsync => clientId:Client01
Dispose S2 ---------------
SERVER: ClientDisconnectedAsync => clientId:Client01
SERVER: ClientConnectedAsync => clientId:Client01
03:30:51.6368    CLIENT: Connected with server.
ChrisPulman added a commit that referenced this issue Oct 15, 2023
@ChrisPulman
Copy link
Owner

@MinChanSike Thank you for demonstrating this, I have now updated the functionality to correct this with V1.2.1.
Please give feedback to confirm that this has resolved the query.

@ChrisPulman
Copy link
Owner

This is the output from my testing having added a further Subscription.

Starting MQTT server...
MQTT server is started.
------------------------------------

SERVER: ClientConnectedAsync => clientId:000f2f3e00d54cfb9377bc4d11342475
[DateTime]:
"2023-10-15T03:15:28.2584982+01:00"
15/10/2023 03:15:28       CLIENT: Connected with server.
        CLIENT S1: Success [FromMilliseconds] value : {payload: 0}
        CLIENT S2: Success [FromMilliseconds] value : {payload: 0}
        CLIENT S3: Success [FromMilliseconds] value : {payload: 0}
        CLIENT S1: Success [FromMilliseconds] value : {payload: 1}
        CLIENT S2: Success [FromMilliseconds] value : {payload: 1}
        CLIENT S3: Success [FromMilliseconds] value : {payload: 1}
        CLIENT S1: Success [FromMilliseconds] value : {payload: 2}
        CLIENT S2: Success [FromMilliseconds] value : {payload: 2}
        CLIENT S3: Success [FromMilliseconds] value : {payload: 2}
Dispose S2 ---------------
        CLIENT S1: Success [FromMilliseconds] value : {payload: 3}
        CLIENT S3: Success [FromMilliseconds] value : {payload: 3}
        CLIENT S1: Success [FromMilliseconds] value : {payload: 4}
        CLIENT S3: Success [FromMilliseconds] value : {payload: 4}
        CLIENT S1: Success [FromMilliseconds] value : {payload: 5}
        CLIENT S3: Success [FromMilliseconds] value : {payload: 5}
        CLIENT S1: Success [FromMilliseconds] value : {payload: 6}
        CLIENT S3: Success [FromMilliseconds] value : {payload: 6}
        CLIENT S1: Success [FromMilliseconds] value : {payload: 7}
        CLIENT S3: Success [FromMilliseconds] value : {payload: 7}
Dispose S1 ---------------
        CLIENT S3: Success [FromMilliseconds] value : {payload: 8}
        CLIENT S3: Success [FromMilliseconds] value : {payload: 9}
        CLIENT S3: Success [FromMilliseconds] value : {payload: 10}
        CLIENT S3: Success [FromMilliseconds] value : {payload: 11}
        CLIENT S3: Success [FromMilliseconds] value : {payload: 12}
        CLIENT S3: Success [FromMilliseconds] value : {payload: 13}
        CLIENT S3: Success [FromMilliseconds] value : {payload: 14}

@ChrisPulman
Copy link
Owner

I believe that this is now resolved, please feel free to open this issue if there's anything I missed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants