Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/generic commands #100

Merged
merged 2 commits into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions samples/mqtt-commands/Device.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
IMqttClient connection = await BrokerClientFactory.CreateFromConnectionSettingsAsync(_configuration.GetConnectionString("Broker")!, false, stoppingToken);
_logger.LogWarning("Connected to {cs}", BrokerClientFactory.ComputedSettings);
GenericCommand cmd = new GenericCommand(connection);
cmd.OnCmdDelegate += req =>
cmd.OnCmdDelegate += async req =>
{
_logger.LogInformation($"Received command {req.CommandName} with payload {req.CommandPayload}");
return new GenericCommandResponse() { Status = 200, ReponsePayload = $"{req.CommandPayload} {req.CommandPayload}" };
return await Task.FromResult(new GenericCommandResponse() { Status = 200, ReponsePayload = $"{req.CommandPayload} {req.CommandPayload}" });
};

//while (!stoppingToken.IsCancellationRequested)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.Binders;

namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient
namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient;

public class CommandClient<T, TResp> : RequestResponseBinder<T, TResp>
{
public class CommandClient<T, TResp> : RequestResponseBinder<T, TResp>
public CommandClient(IMqttClient client, string commandName)
: base(client, commandName, false)
{
public CommandClient(IMqttClient client, string commandName)
: base(client, commandName, false)
{
requestTopicPattern = "device/{clientId}/commands/{commandName}";
responseTopicSuccess = "device/{clientId}/commands/{commandName}/resp";
}
requestTopicPattern = "device/{clientId}/commands/{commandName}";
responseTopicSuccess = "device/{clientId}/commands/{commandName}/resp";
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.Binders;
using MQTTnet.Extensions.MultiCloud.Serializers;
using System.Text;
using System.Text.Json;

namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient.Untyped
{
public class GenericCommand
{
private readonly IMqttClient connection;
public Func<GenericCommandRequest, GenericCommandResponse>? OnCmdDelegate { get; set; }
private readonly IMessageSerializer _serializer;

public Func<GenericCommandRequest, Task<GenericCommandResponse>>? OnCmdDelegate { get; set; }

public GenericCommand(IMqttClient c)
{
_serializer = new UTF8JsonSerializer();
connection = c;
_ = connection.SubscribeWithReplyAsync($"device/{c.Options.ClientId}/commands/+");
connection.ApplicationMessageReceivedAsync += async m =>
Expand All @@ -20,24 +25,22 @@ public GenericCommand(IMqttClient c)
{
var segments = topic.Split('/');
var cmdName = segments[3];
string msg = Encoding.UTF8.GetString(m.ApplicationMessage.Payload);

var responseTopic = m.ApplicationMessage.ResponseTopic ?? $"{topic}/resp";

GenericCommandRequest req = new()
{
CommandName = cmdName,
CommandPayload = msg
};
if (OnCmdDelegate != null && req != null)
if (_serializer.TryReadFromBytes(m.ApplicationMessage.Payload, string.Empty, out GenericCommandRequest req))
{
var tp = TopicParser.ParseTopic(topic);
GenericCommandResponse response = OnCmdDelegate.Invoke(req);
await connection.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(responseTopic)
.WithPayload(Encoding.UTF8.GetBytes(response.ReponsePayload!))
.WithCorrelationData(m.ApplicationMessage.CorrelationData ?? Guid.Empty.ToByteArray())
.Build());
var responseTopic = m.ApplicationMessage.ResponseTopic ?? $"{topic}/resp";

if (OnCmdDelegate != null && req != null)
{
var tp = TopicParser.ParseTopic(topic);

GenericCommandResponse response = await OnCmdDelegate.Invoke(req);
await connection.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(responseTopic)
.WithPayload(_serializer.ToBytes(response))
.WithCorrelationData(m.ApplicationMessage.CorrelationData ?? Guid.Empty.ToByteArray())
.Build());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.Binders;
using MQTTnet.Extensions.MultiCloud.Serializers;
using MQTTnet.Server;
using System.Xml.Linq;

namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient.Untyped;

public class GenericCommandClient : RequestResponseBinder<GenericCommandRequest, GenericCommandResponse>
{
public GenericCommandClient(IMqttClient client) : base(client, string.Empty, false)
{

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ public class RequestResponseBinder<T, TResp>
protected string responseTopicSuccess = "device/{clientId}/commands/{commandName}/resp";
protected string responseTopicFailure = "device/{clientId}/commands/{commandName}/err";
protected bool requireNotEmptyPayload = true;
string remoteClientId = string.Empty;
readonly bool _unwrap = true;
Guid corr = Guid.NewGuid();

protected Func<string, TResp>? VersionExtractor { get; set; }

string remoteClientId = string.Empty;
readonly IMessageSerializer _serializer;

public RequestResponseBinder(IMqttClient client, string name, bool unwrap)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.BrokerIoTClient;
using MQTTnet.Extensions.MultiCloud.BrokerIoTClient.Untyped;
using MQTTnet.Extensions.MultiCloud.Connections;

namespace MQTTnet.Extensions.MultiCloud.IntegrationTests.e2e;

public class GenericCommandE2EFixture
{
private static ConnectionSettings TestCS(string clientId)
{
return new ConnectionSettings
{
HostName = "localhost",
UseTls = false,
TcpPort = 1883,
UserName = "user",
Password = "password",
ClientId = clientId
};
}

internal class Producer
{
readonly IMqttClient mqttClient;

public GenericCommand EchoCommand;

public Producer(IMqttClient client)
{
mqttClient = client;

EchoCommand = new GenericCommand(mqttClient)
{
OnCmdDelegate = async m =>
{
await Task.Delay(m.CommandPayload!.Length * 100);
await Console.Out.WriteLineAsync("[Producer] Running Generic Command in client: " + client.Options.ClientId);
return await Task.FromResult(
new GenericCommandResponse()
{
Status = 200,
ReponsePayload = m.CommandPayload + m.CommandPayload
});
}
};
}
}

internal class Consumer
{
readonly IMqttClient mqttClient;
public GenericCommandClient mqttCommand;

public Consumer(IMqttClient client)
{
mqttClient = client;
mqttCommand = new GenericCommandClient(mqttClient);
}
}

[Fact]
public async Task InvokeCommandWithDefaultTopics()
{
IMqttClient producerClientOne = await BrokerClientFactory.CreateFromConnectionSettingsAsync(TestCS("deviceOne"));
IMqttClient producerClientTwo = await BrokerClientFactory.CreateFromConnectionSettingsAsync(TestCS("deviceTwo"));
_ = new Producer(producerClientOne);
_ = new Producer(producerClientTwo);

IMqttClient consumerClient = await BrokerClientFactory.CreateFromConnectionSettingsAsync(TestCS("consumer"));
Consumer consumer = new(consumerClient);
var respOne = await consumer.mqttCommand.InvokeAsync("deviceOne", new GenericCommandRequest() { CommandName = "echo", CommandPayload = "Hello One" });
var respTwo = await consumer.mqttCommand.InvokeAsync("deviceTwo", new GenericCommandRequest() { CommandName = "echo", CommandPayload = "Hello Two Loooonger " });

Assert.Equal("Hello OneHello One", respOne.ReponsePayload);
Assert.Equal("Hello Two Loooonger Hello Two Loooonger ", respTwo.ReponsePayload);

}
}