From e3a23ab72faba2351540f6f8af50c5f476d8c939 Mon Sep 17 00:00:00 2001 From: ridomin Date: Mon, 3 Apr 2023 12:20:35 -0700 Subject: [PATCH] rid wip --- .../BrokerClientFactory.cs | 8 +++---- .../Command.cs | 12 +++++----- .../Untyped/GenericCommandClient.cs | 23 ++++++++++++------- 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/src/MQTTnet.Extensions.MultiCloud.BrokerIoTClient/BrokerClientFactory.cs b/src/MQTTnet.Extensions.MultiCloud.BrokerIoTClient/BrokerClientFactory.cs index 3800211..d1e30ee 100644 --- a/src/MQTTnet.Extensions.MultiCloud.BrokerIoTClient/BrokerClientFactory.cs +++ b/src/MQTTnet.Extensions.MultiCloud.BrokerIoTClient/BrokerClientFactory.cs @@ -36,10 +36,10 @@ public static async Task CreateFromConnectionSettingsAsync(Connecti BirthConvention.BirthTopic(mqtt.Options.ClientId), birthPayload, Protocol.MqttQualityOfServiceLevel.AtLeastOnce, true, cancellationToken); //hack to disable retained in registry - if (pubAck.ReasonCode != MqttClientPublishReasonCode.Success) - { - throw new ApplicationException($"Error publishing Birth {cs}"); - } + //if (pubAck.ReasonCode != MqttClientPublishReasonCode.Success) + //{ + // throw new ApplicationException($"Error publishing Birth {cs}"); + //} } return mqtt; } diff --git a/src/MQTTnet.Extensions.MultiCloud.BrokerIoTClient/Command.cs b/src/MQTTnet.Extensions.MultiCloud.BrokerIoTClient/Command.cs index 357363d..f8ad777 100644 --- a/src/MQTTnet.Extensions.MultiCloud.BrokerIoTClient/Command.cs +++ b/src/MQTTnet.Extensions.MultiCloud.BrokerIoTClient/Command.cs @@ -9,8 +9,8 @@ public Command(IMqttClient client, string name) : base(client, name) { SubscribeTopicPattern = "device/{clientId}/commands/{name}"; - RequestTopicPattern = "device/{clientId}/commands/{name}"; - ResponseTopicPattern = "device/{clientId}/commands/{name}/resp"; + RequestTopicPattern = "device/{clientId}/commands/{name}?$rid={rid}"; + ResponseTopicPattern = "device/{clientId}/commands/{name}/resp?$rid={rid}"; } } @@ -20,8 +20,8 @@ public Command(IMqttClient client, string name) : base(client, name) { SubscribeTopicPattern = "device/{clientId}/commands/{name}"; - RequestTopicPattern = "device/{clientId}/commands/{name}"; - ResponseTopicPattern = "device/{clientId}/commands/{name}/resp"; + RequestTopicPattern = "device/{clientId}/commands/{name}?$rid={rid}"; + ResponseTopicPattern = "device/{clientId}/commands/{name}/resp?$rid={rid}"; } } @@ -31,8 +31,8 @@ public Command(IMqttClient client, string name) : base(client, name) { SubscribeTopicPattern = "device/{clientId}/commands/{name}"; - RequestTopicPattern = "device/{clientId}/commands/{name}"; - ResponseTopicPattern = "device/{clientId}/commands/{name}/resp"; + RequestTopicPattern = "device/{clientId}/commands/{name}?$rid={rid}"; + ResponseTopicPattern = "device/{clientId}/commands/{name}/resp?$rid={rid}"; } } diff --git a/src/MQTTnet.Extensions.MultiCloud.BrokerIoTClient/Untyped/GenericCommandClient.cs b/src/MQTTnet.Extensions.MultiCloud.BrokerIoTClient/Untyped/GenericCommandClient.cs index 252b5c8..cf6ec65 100644 --- a/src/MQTTnet.Extensions.MultiCloud.BrokerIoTClient/Untyped/GenericCommandClient.cs +++ b/src/MQTTnet.Extensions.MultiCloud.BrokerIoTClient/Untyped/GenericCommandClient.cs @@ -1,6 +1,7 @@ using MQTTnet.Client; using MQTTnet.Extensions.MultiCloud.Serializers; using MQTTnet.Server; +using System.Text; namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient.Untyped; @@ -14,9 +15,9 @@ public class GenericCommandClient byte[]? corr = new byte[] { }; - string requestTopicPattern = "device/{clientId}/commands/{commandName}"; + string requestTopicPattern = "device/{clientId}/commands/{commandName}?$rid={rid}"; string responseTopicSub = "device/{clientId}/commands/{commandName}/+"; - string responseTopicSuccess = "device/{clientId}/commands/{commandName}/resp"; + string responseTopicSuccess = "device/{clientId}/commands/{commandName}/resp?$rid={rid}"; public GenericCommandClient(IMqttClient client) @@ -47,7 +48,7 @@ public GenericCommandClient(IMqttClient client) Status = status, ReponsePayload = respPayload }; - _tcs!.SetResult(resp); + _tcs!.TrySetResult(resp); } else { @@ -59,25 +60,31 @@ public GenericCommandClient(IMqttClient client) }; } - public async Task InvokeAsync(string clientId, GenericCommandRequest request, CancellationToken ct = default) + public Task InvokeAsync(string clientId, GenericCommandRequest request, CancellationToken ct = default) { _tcs = new TaskCompletionSource(); _remoteClientId = clientId; _commandName = request.CommandName; corr = request.CorrelationId!; - string commandTopic = requestTopicPattern.Replace("{clientId}", _remoteClientId).Replace("{commandName}", _commandName); - var responseTopic = responseTopicSub.Replace("{clientId}", _remoteClientId).Replace("{commandName}", _commandName); + string commandTopic = requestTopicPattern + .Replace("{clientId}", _remoteClientId) + .Replace("{commandName}", _commandName) + .Replace("{rid}", Encoding.UTF8.GetString(corr)); + var responseTopic = responseTopicSub + .Replace("{clientId}", _remoteClientId) + .Replace("{commandName}", _commandName) + .Replace("{rid}", Encoding.UTF8.GetString(corr)); _ =_mqttClient.SubscribeAsync(responseTopic, Protocol.MqttQualityOfServiceLevel.AtMostOnce, ct); _ = _mqttClient.PublishAsync( new MqttApplicationMessageBuilder() .WithTopic(commandTopic) .WithPayload(_serializer.ToBytes(request.RequestPayload)) - .WithResponseTopic(responseTopicSuccess.Replace("{clientId}", _remoteClientId).Replace("{commandName}", _commandName)) + .WithResponseTopic(responseTopic) .WithCorrelationData(request.CorrelationId) .Build()); - return await _tcs.Task.TimeoutAfter(TimeSpan.FromSeconds(5)); + return _tcs.Task.TimeoutAfter(TimeSpan.FromSeconds(5)); } }