Skip to content

Commit

Permalink
Feat/generic commands (#100)
Browse files Browse the repository at this point in the history
* generic client

* add generic command client

---------

Co-authored-by: ridomin <[email protected]>
  • Loading branch information
rido-min and ridomin authored Mar 28, 2023
1 parent fdcb43a commit 6c4d747
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 28 deletions.
4 changes: 2 additions & 2 deletions samples/mqtt-commands/Device.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
IMqttClient connection = await BrokerClientFactory.CreateFromConnectionSettingsAsync(_configuration.GetConnectionString("Broker")!, false, stoppingToken);
_logger.LogWarning("Connected to {cs}", BrokerClientFactory.ComputedSettings);
GenericCommand cmd = new GenericCommand(connection);
cmd.OnCmdDelegate += req =>
cmd.OnCmdDelegate += async req =>
{
_logger.LogInformation($"Received command {req.CommandName} with payload {req.CommandPayload}");
return new GenericCommandResponse() { Status = 200, ReponsePayload = $"{req.CommandPayload} {req.CommandPayload}" };
return await Task.FromResult(new GenericCommandResponse() { Status = 200, ReponsePayload = $"{req.CommandPayload} {req.CommandPayload}" });
};

//while (!stoppingToken.IsCancellationRequested)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.Binders;

namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient
namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient;

public class CommandClient<T, TResp> : RequestResponseBinder<T, TResp>
{
public class CommandClient<T, TResp> : RequestResponseBinder<T, TResp>
public CommandClient(IMqttClient client, string commandName)
: base(client, commandName, false)
{
public CommandClient(IMqttClient client, string commandName)
: base(client, commandName, false)
{
requestTopicPattern = "device/{clientId}/commands/{commandName}";
responseTopicSuccess = "device/{clientId}/commands/{commandName}/resp";
}
requestTopicPattern = "device/{clientId}/commands/{commandName}";
responseTopicSuccess = "device/{clientId}/commands/{commandName}/resp";
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.Binders;
using MQTTnet.Extensions.MultiCloud.Serializers;
using System.Text;
using System.Text.Json;

namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient.Untyped
{
public class GenericCommand
{
private readonly IMqttClient connection;
public Func<GenericCommandRequest, GenericCommandResponse>? OnCmdDelegate { get; set; }
private readonly IMessageSerializer _serializer;

public Func<GenericCommandRequest, Task<GenericCommandResponse>>? OnCmdDelegate { get; set; }

public GenericCommand(IMqttClient c)
{
_serializer = new UTF8JsonSerializer();
connection = c;
_ = connection.SubscribeWithReplyAsync($"device/{c.Options.ClientId}/commands/+");
connection.ApplicationMessageReceivedAsync += async m =>
Expand All @@ -20,24 +25,22 @@ public GenericCommand(IMqttClient c)
{
var segments = topic.Split('/');
var cmdName = segments[3];
string msg = Encoding.UTF8.GetString(m.ApplicationMessage.Payload);

var responseTopic = m.ApplicationMessage.ResponseTopic ?? $"{topic}/resp";

GenericCommandRequest req = new()
{
CommandName = cmdName,
CommandPayload = msg
};
if (OnCmdDelegate != null && req != null)
if (_serializer.TryReadFromBytes(m.ApplicationMessage.Payload, string.Empty, out GenericCommandRequest req))
{
var tp = TopicParser.ParseTopic(topic);
GenericCommandResponse response = OnCmdDelegate.Invoke(req);
await connection.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(responseTopic)
.WithPayload(Encoding.UTF8.GetBytes(response.ReponsePayload!))
.WithCorrelationData(m.ApplicationMessage.CorrelationData ?? Guid.Empty.ToByteArray())
.Build());
var responseTopic = m.ApplicationMessage.ResponseTopic ?? $"{topic}/resp";

if (OnCmdDelegate != null && req != null)
{
var tp = TopicParser.ParseTopic(topic);

GenericCommandResponse response = await OnCmdDelegate.Invoke(req);
await connection.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(responseTopic)
.WithPayload(_serializer.ToBytes(response))
.WithCorrelationData(m.ApplicationMessage.CorrelationData ?? Guid.Empty.ToByteArray())
.Build());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.Binders;
using MQTTnet.Extensions.MultiCloud.Serializers;
using MQTTnet.Server;
using System.Xml.Linq;

namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient.Untyped;

public class GenericCommandClient : RequestResponseBinder<GenericCommandRequest, GenericCommandResponse>
{
public GenericCommandClient(IMqttClient client) : base(client, string.Empty, false)
{

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ public class RequestResponseBinder<T, TResp>
protected string responseTopicSuccess = "device/{clientId}/commands/{commandName}/resp";
protected string responseTopicFailure = "device/{clientId}/commands/{commandName}/err";
protected bool requireNotEmptyPayload = true;
string remoteClientId = string.Empty;
readonly bool _unwrap = true;
Guid corr = Guid.NewGuid();

protected Func<string, TResp>? VersionExtractor { get; set; }

string remoteClientId = string.Empty;
readonly IMessageSerializer _serializer;

public RequestResponseBinder(IMqttClient client, string name, bool unwrap)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.BrokerIoTClient;
using MQTTnet.Extensions.MultiCloud.BrokerIoTClient.Untyped;
using MQTTnet.Extensions.MultiCloud.Connections;

namespace MQTTnet.Extensions.MultiCloud.IntegrationTests.e2e;

public class GenericCommandE2EFixture
{
private static ConnectionSettings TestCS(string clientId)
{
return new ConnectionSettings
{
HostName = "localhost",
UseTls = false,
TcpPort = 1883,
UserName = "user",
Password = "password",
ClientId = clientId
};
}

internal class Producer
{
readonly IMqttClient mqttClient;

public GenericCommand EchoCommand;

public Producer(IMqttClient client)
{
mqttClient = client;

EchoCommand = new GenericCommand(mqttClient)
{
OnCmdDelegate = async m =>
{
await Task.Delay(m.CommandPayload!.Length * 100);
await Console.Out.WriteLineAsync("[Producer] Running Generic Command in client: " + client.Options.ClientId);
return await Task.FromResult(
new GenericCommandResponse()
{
Status = 200,
ReponsePayload = m.CommandPayload + m.CommandPayload
});
}
};
}
}

internal class Consumer
{
readonly IMqttClient mqttClient;
public GenericCommandClient mqttCommand;

public Consumer(IMqttClient client)
{
mqttClient = client;
mqttCommand = new GenericCommandClient(mqttClient);
}
}

[Fact]
public async Task InvokeCommandWithDefaultTopics()
{
IMqttClient producerClientOne = await BrokerClientFactory.CreateFromConnectionSettingsAsync(TestCS("deviceOne"));
IMqttClient producerClientTwo = await BrokerClientFactory.CreateFromConnectionSettingsAsync(TestCS("deviceTwo"));
_ = new Producer(producerClientOne);
_ = new Producer(producerClientTwo);

IMqttClient consumerClient = await BrokerClientFactory.CreateFromConnectionSettingsAsync(TestCS("consumer"));
Consumer consumer = new(consumerClient);
var respOne = await consumer.mqttCommand.InvokeAsync("deviceOne", new GenericCommandRequest() { CommandName = "echo", CommandPayload = "Hello One" });
var respTwo = await consumer.mqttCommand.InvokeAsync("deviceTwo", new GenericCommandRequest() { CommandName = "echo", CommandPayload = "Hello Two Loooonger " });

Assert.Equal("Hello OneHello One", respOne.ReponsePayload);
Assert.Equal("Hello Two Loooonger Hello Two Loooonger ", respTwo.ReponsePayload);

}
}

0 comments on commit 6c4d747

Please sign in to comment.