Skip to content

Commit

Permalink
change correlation to byte
Browse files Browse the repository at this point in the history
  • Loading branch information
ridomin committed Apr 3, 2023
1 parent e1f97d8 commit 63a0963
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@ public GenericCommand(IMqttClient c)
GenericCommandRequest req = new()
{
CommandName = cmdName,
RequestPayload = reqPayload
RequestPayload = reqPayload,
CorrelationId = m.ApplicationMessage.CorrelationData
};

GenericCommandResponse response = await OnCmdDelegate.Invoke(req);
await connection.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(responseTopic)
.WithPayload(_serializer.ToBytes(response.ReponsePayload))
.WithUserProperty("status", response.Status.ToString())
.WithCorrelationData(m.ApplicationMessage.CorrelationData ?? Guid.Empty.ToByteArray())
.WithCorrelationData(m.ApplicationMessage.CorrelationData)
.Build());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class GenericCommandClient
TaskCompletionSource<GenericCommandResponse>? _tcs;
string? _commandName;
string _remoteClientId;
Guid corr = Guid.NewGuid();
byte[]? corr = new byte[] { };


string requestTopicPattern = "device/{clientId}/commands/{commandName}";
Expand All @@ -32,7 +32,7 @@ public GenericCommandClient(IMqttClient client)
var expectedTopic = responseTopicSuccess.Replace("{clientId}", _remoteClientId).Replace("{commandName}", _commandName);
if (topic.StartsWith(expectedTopic))
{
if (m.ApplicationMessage.CorrelationData != null && corr != new Guid(m.ApplicationMessage.CorrelationData))
if (m.ApplicationMessage.CorrelationData != null && !corr.SequenceEqual(m.ApplicationMessage.CorrelationData))
{
_tcs!.SetException(new ApplicationException("Invalid correlation data"));
}
Expand Down Expand Up @@ -64,7 +64,7 @@ public async Task<GenericCommandResponse> InvokeAsync(string clientId, GenericCo
_tcs = new TaskCompletionSource<GenericCommandResponse>();
_remoteClientId = clientId;
_commandName = request.CommandName;

corr = request.CorrelationId!;
string commandTopic = requestTopicPattern.Replace("{clientId}", _remoteClientId).Replace("{commandName}", _commandName);
var responseTopic = responseTopicSub.Replace("{clientId}", _remoteClientId).Replace("{commandName}", _commandName);
_ =_mqttClient.SubscribeAsync(responseTopic, Protocol.MqttQualityOfServiceLevel.AtMostOnce, ct);
Expand All @@ -74,7 +74,7 @@ public async Task<GenericCommandResponse> InvokeAsync(string clientId, GenericCo
.WithTopic(commandTopic)
.WithPayload(_serializer.ToBytes(request.RequestPayload))
.WithResponseTopic(responseTopicSuccess.Replace("{clientId}", _remoteClientId).Replace("{commandName}", _commandName))
.WithCorrelationData(corr.ToByteArray())
.WithCorrelationData(request.CorrelationId)
.Build());

return await _tcs.Task.TimeoutAfter(TimeSpan.FromSeconds(5));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient.Untyped
using System.Text.Json.Serialization;

namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient.Untyped
{
public class GenericCommandRequest
{
public string? CommandName { get; set; }
public object? RequestPayload { get; set; }
[JsonIgnore]
public byte[]? CorrelationId { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@ public async Task InvokeCommandWithDefaultTopics()
Consumer consumer = new(consumerClient);

var respOne = await consumer.mqttCommand.InvokeAsync("deviceOne",
new GenericCommandRequest() { CommandName = "echo", RequestPayload = "Hello One" });
new GenericCommandRequest() { CommandName = "echo", RequestPayload = "Hello One", CorrelationId = new byte[] { 1 } });

Assert.Equal("Hello OneHello One", respOne.ReponsePayload!.ToString());

var respTwo = await consumer.mqttCommand.InvokeAsync("deviceTwo",
new GenericCommandRequest() { CommandName = "echo", RequestPayload = "Hello Two Loooonger " });
new GenericCommandRequest() { CommandName = "echo", RequestPayload = "Hello Two Loooonger ", CorrelationId = new byte[] { 2 } });
Assert.Equal("Hello Two Loooonger Hello Two Loooonger ", respTwo.ReponsePayload!.ToString());

await producerClientOne.DisconnectAsync();
Expand Down

0 comments on commit 63a0963

Please sign in to comment.