Skip to content

Commit

Permalink
rid wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ridomin committed Apr 3, 2023
1 parent 63a0963 commit e3a23ab
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ public static async Task<IMqttClient> CreateFromConnectionSettingsAsync(Connecti
BirthConvention.BirthTopic(mqtt.Options.ClientId),
birthPayload,
Protocol.MqttQualityOfServiceLevel.AtLeastOnce, true, cancellationToken); //hack to disable retained in registry
if (pubAck.ReasonCode != MqttClientPublishReasonCode.Success)
{
throw new ApplicationException($"Error publishing Birth {cs}");
}
//if (pubAck.ReasonCode != MqttClientPublishReasonCode.Success)
//{
// throw new ApplicationException($"Error publishing Birth {cs}");
//}
}
return mqtt;
}
Expand Down
12 changes: 6 additions & 6 deletions src/MQTTnet.Extensions.MultiCloud.BrokerIoTClient/Command.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ public Command(IMqttClient client, string name)
: base(client, name)
{
SubscribeTopicPattern = "device/{clientId}/commands/{name}";
RequestTopicPattern = "device/{clientId}/commands/{name}";
ResponseTopicPattern = "device/{clientId}/commands/{name}/resp";
RequestTopicPattern = "device/{clientId}/commands/{name}?$rid={rid}";
ResponseTopicPattern = "device/{clientId}/commands/{name}/resp?$rid={rid}";
}
}

Expand All @@ -20,8 +20,8 @@ public Command(IMqttClient client, string name)
: base(client, name)
{
SubscribeTopicPattern = "device/{clientId}/commands/{name}";
RequestTopicPattern = "device/{clientId}/commands/{name}";
ResponseTopicPattern = "device/{clientId}/commands/{name}/resp";
RequestTopicPattern = "device/{clientId}/commands/{name}?$rid={rid}";
ResponseTopicPattern = "device/{clientId}/commands/{name}/resp?$rid={rid}";
}
}

Expand All @@ -31,8 +31,8 @@ public Command(IMqttClient client, string name)
: base(client, name)
{
SubscribeTopicPattern = "device/{clientId}/commands/{name}";
RequestTopicPattern = "device/{clientId}/commands/{name}";
ResponseTopicPattern = "device/{clientId}/commands/{name}/resp";
RequestTopicPattern = "device/{clientId}/commands/{name}?$rid={rid}";
ResponseTopicPattern = "device/{clientId}/commands/{name}/resp?$rid={rid}";
}
}

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.Serializers;
using MQTTnet.Server;
using System.Text;

namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient.Untyped;

Expand All @@ -14,9 +15,9 @@ public class GenericCommandClient
byte[]? corr = new byte[] { };


string requestTopicPattern = "device/{clientId}/commands/{commandName}";
string requestTopicPattern = "device/{clientId}/commands/{commandName}?$rid={rid}";
string responseTopicSub = "device/{clientId}/commands/{commandName}/+";
string responseTopicSuccess = "device/{clientId}/commands/{commandName}/resp";
string responseTopicSuccess = "device/{clientId}/commands/{commandName}/resp?$rid={rid}";


public GenericCommandClient(IMqttClient client)
Expand Down Expand Up @@ -47,7 +48,7 @@ public GenericCommandClient(IMqttClient client)
Status = status,
ReponsePayload = respPayload
};
_tcs!.SetResult(resp);
_tcs!.TrySetResult(resp);
}
else
{
Expand All @@ -59,25 +60,31 @@ public GenericCommandClient(IMqttClient client)
};
}

public async Task<GenericCommandResponse> InvokeAsync(string clientId, GenericCommandRequest request, CancellationToken ct = default)
public Task<GenericCommandResponse> InvokeAsync(string clientId, GenericCommandRequest request, CancellationToken ct = default)
{
_tcs = new TaskCompletionSource<GenericCommandResponse>();
_remoteClientId = clientId;
_commandName = request.CommandName;
corr = request.CorrelationId!;
string commandTopic = requestTopicPattern.Replace("{clientId}", _remoteClientId).Replace("{commandName}", _commandName);
var responseTopic = responseTopicSub.Replace("{clientId}", _remoteClientId).Replace("{commandName}", _commandName);
string commandTopic = requestTopicPattern
.Replace("{clientId}", _remoteClientId)
.Replace("{commandName}", _commandName)
.Replace("{rid}", Encoding.UTF8.GetString(corr));
var responseTopic = responseTopicSub
.Replace("{clientId}", _remoteClientId)
.Replace("{commandName}", _commandName)
.Replace("{rid}", Encoding.UTF8.GetString(corr));
_ =_mqttClient.SubscribeAsync(responseTopic, Protocol.MqttQualityOfServiceLevel.AtMostOnce, ct);

_ = _mqttClient.PublishAsync(
new MqttApplicationMessageBuilder()
.WithTopic(commandTopic)
.WithPayload(_serializer.ToBytes(request.RequestPayload))
.WithResponseTopic(responseTopicSuccess.Replace("{clientId}", _remoteClientId).Replace("{commandName}", _commandName))
.WithResponseTopic(responseTopic)
.WithCorrelationData(request.CorrelationId)
.Build());

return await _tcs.Task.TimeoutAfter(TimeSpan.FromSeconds(5));
return _tcs.Task.TimeoutAfter(TimeSpan.FromSeconds(5));

}
}

0 comments on commit e3a23ab

Please sign in to comment.