Skip to content

Commit

Permalink
Feat/generic commands (#102)
Browse files Browse the repository at this point in the history
* generic client

* add generic command client

* status in user properties

* all green 1 by 1

---------

Co-authored-by: ridomin <[email protected]>
  • Loading branch information
rido-min and ridomin authored Mar 28, 2023
1 parent 6c4d747 commit 9885dd7
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,27 @@ public GenericCommand(IMqttClient c)
var segments = topic.Split('/');
var cmdName = segments[3];

if (_serializer.TryReadFromBytes(m.ApplicationMessage.Payload, string.Empty, out GenericCommandRequest req))
if (_serializer.TryReadFromBytes(m.ApplicationMessage.Payload, string.Empty, out object reqPayload))
{
var responseTopic = m.ApplicationMessage.ResponseTopic ?? $"{topic}/resp";

if (OnCmdDelegate != null && req != null)
if (OnCmdDelegate != null && reqPayload != null)
{
var tp = TopicParser.ParseTopic(topic);
//var tp = TopicParser.ParseTopic(topic);
GenericCommandRequest req = new()
{
CommandName = cmdName,
CommandPayload = reqPayload
};


GenericCommandResponse response = await OnCmdDelegate.Invoke(req);
await connection.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(responseTopic)
.WithPayload(_serializer.ToBytes(response))

.WithPayload(_serializer.ToBytes(response.ReponsePayload))
.WithUserProperty("status", response.Status.ToString())

.WithCorrelationData(m.ApplicationMessage.CorrelationData ?? Guid.Empty.ToByteArray())
.Build());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,89 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.Binders;

using MQTTnet.Extensions.MultiCloud.Serializers;
using MQTTnet.Server;
using System.Xml.Linq;

namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient.Untyped;

public class GenericCommandClient : RequestResponseBinder<GenericCommandRequest, GenericCommandResponse>
public class GenericCommandClient //: RequestResponseBinder<GenericCommandRequest, GenericCommandResponse>
{
public GenericCommandClient(IMqttClient client) : base(client, string.Empty, false)

readonly IMqttClient _mqttClient;
readonly IMessageSerializer _serializer;
TaskCompletionSource<GenericCommandResponse>? _tcs;
string? _commandName;
string _remoteClientId;
Guid corr = Guid.NewGuid();


string requestTopicPattern = "device/{clientId}/commands/{commandName}";
string responseTopicSub = "device/{clientId}/commands/{commandName}/+";
string responseTopicSuccess = "device/{clientId}/commands/{commandName}/resp";
//protected string responseTopicFailure = "device/{clientId}/commands/{commandName}/err";


public GenericCommandClient(IMqttClient client) //: base(client, string.Empty, false)
{
_mqttClient = client;
_remoteClientId = string.Empty;
_serializer = new UTF8JsonSerializer();

_mqttClient.ApplicationMessageReceivedAsync += async m =>
{
var topic = m.ApplicationMessage.Topic;

var expectedTopic = responseTopicSuccess.Replace("{clientId}", _remoteClientId).Replace("{commandName}", _commandName);
if (topic.StartsWith(expectedTopic))
{
if (m.ApplicationMessage.CorrelationData != null && corr != new Guid(m.ApplicationMessage.CorrelationData))
{
_tcs!.SetException(new ApplicationException("Invalid correlation data"));
}

//int status = m.ApplicationMessage.UserProperties.Contains(new Packets.MqttUserProperty("status", "200")) ? 200 : 500;
var up = m.ApplicationMessage.UserProperties.FirstOrDefault(p => p.Name.Equals("status"));
int status = up != null ? int.Parse(up.Value) : 500;

if (_serializer.TryReadFromBytes(m.ApplicationMessage.Payload, string.Empty, out string respPayload))
{
GenericCommandResponse resp = new()
{
Status = status,
ReponsePayload = respPayload
};
_tcs!.SetResult(resp);
}
else
{
_tcs!.SetException(new ApplicationException("Cannot deserialize bytes"));
}

}
await Task.Yield();
};
}

public async Task<GenericCommandResponse> InvokeAsync(string clientId, GenericCommandRequest request, CancellationToken ct = default)
{

_tcs = new TaskCompletionSource<GenericCommandResponse>();
_remoteClientId = clientId;
_commandName = request.CommandName;
string commandTopic = requestTopicPattern.Replace("{clientId}", _remoteClientId).Replace("{commandName}", _commandName);
var responseTopic = responseTopicSub.Replace("{clientId}", _remoteClientId).Replace("{commandName}", _commandName);
await _mqttClient.SubscribeAsync(responseTopic, Protocol.MqttQualityOfServiceLevel.AtMostOnce, ct);

var pubAck = await _mqttClient.PublishAsync(
new MqttApplicationMessageBuilder()
.WithTopic(commandTopic)
.WithPayload(_serializer.ToBytes(request.CommandPayload))
.WithResponseTopic(responseTopicSuccess.Replace("{clientId}", _remoteClientId).Replace("{commandName}", _commandName))
.WithCorrelationData(corr.ToByteArray())
.Build());
if (!pubAck.IsSuccess)
{
throw new ApplicationException("Error publishing Request Message");
}
return await _tcs.Task.TimeoutAfter(TimeSpan.FromSeconds(5));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
public class GenericCommandRequest
{
public string? CommandName { get; set; }
public string? CommandPayload { get; set; }
public object? CommandPayload { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public static T FromString<T>(string s) => JsonSerializer.Deserialize<T>(s,

public byte[] ToBytes<T>(T payload, string name = "")
{
if (payload is null) return new byte[0];

if (string.IsNullOrEmpty(name))
{
if (typeof(T) == typeof(string))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ public Consumer(IMqttClient client)
[Fact]
public async Task InvokeCommandWithDefaultTopics()
{
IMqttClient producerClientOne = await BrokerClientFactory.CreateFromConnectionSettingsAsync(TestCS("deviceOne"));
IMqttClient producerClientTwo = await BrokerClientFactory.CreateFromConnectionSettingsAsync(TestCS("deviceTwo"));
IMqttClient producerClientOne = await BrokerClientFactory.CreateFromConnectionSettingsAsync(TestCS("device1"));
IMqttClient producerClientTwo = await BrokerClientFactory.CreateFromConnectionSettingsAsync(TestCS("device2"));
_ = new Producer(producerClientOne);
_ = new Producer(producerClientTwo);

IMqttClient consumerClient = await BrokerClientFactory.CreateFromConnectionSettingsAsync(TestCS("consumer"));
Consumer consumer = new(consumerClient);
var respOne = await consumer.echoCommand.InvokeAsync("deviceOne", "Hello One");
var respTwo = await consumer.echoCommand.InvokeAsync("deviceTwo", "Hello Two Loooonger ");
var respOne = await consumer.echoCommand.InvokeAsync("device1", "Hello One");
var respTwo = await consumer.echoCommand.InvokeAsync("device2", "Hello Two Loooonger ");

Assert.Equal("Hello OneHello One", respOne);
Assert.Equal("Hello Two Loooonger Hello Two Loooonger ", respTwo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public Consumer(IMqttClient client)
[Fact]
public async Task ReadProperties()
{
IMqttClient producerClientOne = await BrokerClientFactory.CreateFromConnectionSettingsAsync(TestCS("deviceOne"));
IMqttClient producerClientTwo = await BrokerClientFactory.CreateFromConnectionSettingsAsync(TestCS("deviceTwo"));
IMqttClient producerClientOne = await BrokerClientFactory.CreateFromConnectionSettingsAsync(TestCS("device4"));
IMqttClient producerClientTwo = await BrokerClientFactory.CreateFromConnectionSettingsAsync(TestCS("device5"));
var p1 = new Producer(producerClientOne);
var p2 = new Producer(producerClientTwo);

Expand Down Expand Up @@ -86,7 +86,7 @@ public async Task ReadProperties()

DateTime now = DateTime.Now;
await p1.Started.SendMessageAsync(now);
await consumer.Interval.UpdatePropertyAsync("deviceOne", 23);
await consumer.Interval.UpdatePropertyAsync("device4", 23);
await Task.Delay(500);
Assert.Equal(23, intervalNewValue);
Assert.Equal(now, startedRead);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,38 @@ internal class Producer
{
readonly IMqttClient mqttClient;

public GenericCommand EchoCommand;

public GenericCommand genCommand;


public Producer(IMqttClient client)
{
mqttClient = client;

EchoCommand = new GenericCommand(mqttClient)

genCommand = new GenericCommand(mqttClient)
{
OnCmdDelegate = async m =>
OnCmdDelegate = async req =>
{
await Task.Delay(m.CommandPayload!.Length * 100);
await Console.Out.WriteLineAsync("[Producer] Running Generic Command in client: " + client.Options.ClientId);
return await Task.FromResult(
new GenericCommandResponse()
{
Status = 200,
ReponsePayload = m.CommandPayload + m.CommandPayload
});
if (req.CommandName == "echo")
{
await Task.Delay(req.CommandPayload!.ToString()!.Length * 100);
return await Task.FromResult(
new GenericCommandResponse()
{
Status = 200,
ReponsePayload = req.CommandPayload.ToString() + req.CommandPayload.ToString()
});
}
else
{
return await Task.FromResult(
new GenericCommandResponse()
{
Status = 400
});
}
}
};
}
Expand Down Expand Up @@ -69,11 +83,35 @@ public async Task InvokeCommandWithDefaultTopics()

IMqttClient consumerClient = await BrokerClientFactory.CreateFromConnectionSettingsAsync(TestCS("consumer"));
Consumer consumer = new(consumerClient);
var respOne = await consumer.mqttCommand.InvokeAsync("deviceOne", new GenericCommandRequest() { CommandName = "echo", CommandPayload = "Hello One" });
var respTwo = await consumer.mqttCommand.InvokeAsync("deviceTwo", new GenericCommandRequest() { CommandName = "echo", CommandPayload = "Hello Two Loooonger " });


var respOne = await consumer.mqttCommand.InvokeAsync("deviceOne",
new GenericCommandRequest() { CommandName = "echo", CommandPayload = "Hello One" });
Assert.Equal("Hello OneHello One", respOne.ReponsePayload);

var respTwo = await consumer.mqttCommand.InvokeAsync("deviceTwo",
new GenericCommandRequest() { CommandName = "echo", CommandPayload = "Hello Two Loooonger " });
Assert.Equal("Hello Two Loooonger Hello Two Loooonger ", respTwo.ReponsePayload);

await producerClientOne.DisconnectAsync();
await producerClientTwo.DisconnectAsync();
await consumerClient.DisconnectAsync();
}

[Fact]
public async Task NotImplementedReturns400()
{
IMqttClient producerClientOne = await BrokerClientFactory.CreateFromConnectionSettingsAsync(TestCS("deviceThree"));
_ = new Producer(producerClientOne);

IMqttClient consumerClient = await BrokerClientFactory.CreateFromConnectionSettingsAsync(TestCS("consumer2"));
Consumer consumer = new(consumerClient);

var respOne = await consumer.mqttCommand.InvokeAsync("deviceThree",
new GenericCommandRequest() { CommandName = "notimpl", CommandPayload = "Hello One" });
Assert.Equal(400, respOne.Status);

await producerClientOne.DisconnectAsync();
await consumerClient.DisconnectAsync();
}
}

0 comments on commit 9885dd7

Please sign in to comment.