Skip to content

Commit

Permalink
Feat/generic commands (#100) (#101)
Browse files Browse the repository at this point in the history
* Feat/generic commands (#100)

* generic client

* add generic command client

---------

Co-authored-by: ridomin <[email protected]>

* Feat/generic commands (#102)

* generic client

* add generic command client

* status in user properties

* all green 1 by 1

---------

Co-authored-by: ridomin <[email protected]>

* review generic commands

* start 8

* fix obj serializer

* fix serializer tests

* not trhow birth exceptions

* fix audience for gateway

* fix twin serialization

* reviewing generic ser (#103)

Co-authored-by: ridomin <[email protected]>

* change correlation to byte

* rid wip

* Revert "rid wip"

This reverts commit e3a23ab.

* dont check birth pub

* add generic interfaces for commands

* review object serializer

* adds MqttGatewayHostName

* rm status from command

* fix status

* force 17

---------

Co-authored-by: Rido <[email protected]>
  • Loading branch information
ridomin and rido-min authored Apr 17, 2023
1 parent fdcb43a commit e4b0494
Show file tree
Hide file tree
Showing 40 changed files with 472 additions and 119 deletions.
1 change: 1 addition & 0 deletions docs/ConnectionSettings.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Connection settings can be established using the API or parsing a connection str
- `CaFile` Path to the CA certificate required to stablish the TLS session
- `GatewayHostName` Allows to connect to IoT Hub through a IoTEdge ($edgeHub) gateway (aka Transparent Gateway)
- `MqttVersion` Sets the MqttProtocolVersion, allowed values are 5 and 3 (will use 3.1.1)
- `MqttGatewayHostName` Allows to connect to IoTHub through a mqtt broker. Requires a gateway to implement the transparent gateway pattern.

## Sample Connection Strings

Expand Down
8 changes: 4 additions & 4 deletions samples/iothub-sample/Device.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public Device(ILogger<Device> logger, IConfiguration configuration)

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var connectionSettings = new ConnectionSettings(_configuration.GetConnectionString("cs1"));
var connectionSettings = new ConnectionSettings(_configuration.GetConnectionString("cs"));
_logger.LogWarning("Connecting to: {connectionSettings}", connectionSettings);

var client = new HubMqttClient(await HubDpsFactory.CreateFromConnectionSettingsAsync(connectionSettings, stoppingToken));
Expand All @@ -34,15 +34,15 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
var twin = await client.GetTwinAsync(stoppingToken);
Console.WriteLine(twin);

client.OnCommandReceived = m =>
client.OnCommandReceived = async m =>
{
Console.WriteLine(m.CommandName);
Console.WriteLine(m.CommandPayload);
return new GenericCommandResponse()
return await Task.FromResult(new GenericCommandResponse()
{
Status = 200,
ReponsePayload = JsonSerializer.Serialize(new { myResponse = "whatever" })
};
});
};

client.OnPropertyUpdateReceived = m =>
Expand Down
4 changes: 2 additions & 2 deletions samples/mqtt-commands/Device.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
IMqttClient connection = await BrokerClientFactory.CreateFromConnectionSettingsAsync(_configuration.GetConnectionString("Broker")!, false, stoppingToken);
_logger.LogWarning("Connected to {cs}", BrokerClientFactory.ComputedSettings);
GenericCommand cmd = new GenericCommand(connection);
cmd.OnCmdDelegate += req =>
cmd.OnCmdDelegate += async req =>
{
_logger.LogInformation($"Received command {req.CommandName} with payload {req.CommandPayload}");
return new GenericCommandResponse() { Status = 200, ReponsePayload = $"{req.CommandPayload} {req.CommandPayload}" };
return await Task.FromResult(new GenericCommandResponse() { Status = 200, ReponsePayload = $"{req.CommandPayload} {req.CommandPayload}" });
};

//while (!stoppingToken.IsCancellationRequested)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public static MqttClientOptionsBuilder WithAzureIoTHubCredentialsSas(this MqttCl
audience = $"{hostName}/devices/{deviceId}/modules/{moduleId}";
}

if (!string.IsNullOrEmpty(gatewayHostName))
{
audience = $"{hostName}/devices/{deviceId}";
}


(string username, string password) = SasAuth.GenerateHubSasCredentials(hostName, target, sasKey, audience, modelId, sasMinutes, gatewayHostName);
builder.WithCredentials(username, password);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public async Task<DpsStatus> ProvisionDeviceIdentity()
{
var putTopic = $"$dps/registrations/PUT/iotdps-register/?$rid={rid++}";
var registrationId = mqttClient.Options.ClientId;
var bytes = new UTF8JsonSerializer().ToBytes(new { registrationId, payload = new { modelId } });
var bytes = new Utf8JsonSerializer().ToBytes(new { registrationId, payload = new { modelId } });
var puback = await mqttClient.PublishBinaryAsync(putTopic, bytes);
if (puback.ReasonCode != MqttClientPublishReasonCode.Success)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.AzureIoTClient.Untyped;
using MQTTnet.Extensions.MultiCloud.Serializers;
using System.Text.Json;
using System.Text.Json.Nodes;

namespace MQTTnet.Extensions.MultiCloud.AzureIoTClient
Expand All @@ -16,7 +17,7 @@ public class HubMqttClient : IHubMqttClient
private readonly UpdateTwinBinder<object> updateTwinBinder;

private readonly GenericDesiredUpdatePropertyBinder genericDesiredUpdateProperty;
private readonly GenericCommand command;
private readonly IGenericCommand command;

public HubMqttClient(IMqttClient c)
{
Expand All @@ -29,7 +30,7 @@ public HubMqttClient(IMqttClient c)
genericDesiredUpdateProperty = new GenericDesiredUpdatePropertyBinder(c, updateTwinBinder!);
}

public Func<GenericCommandRequest, GenericCommandResponse> OnCommandReceived
public Func<IGenericCommandRequest, Task<IGenericCommandResponse>> OnCommandReceived
{
get => command.OnCmdDelegate!;
set => command.OnCmdDelegate = value;
Expand All @@ -49,7 +50,7 @@ public async Task<string> GetTwinAsync(CancellationToken cancellationToken = def

public async Task<int> UpdateTwinAsync(object payload, CancellationToken cancellationToken = default)
{
var twin = await updateTwinBinder.InvokeAsync(Connection.Options.ClientId, payload, cancellationToken);
var twin = await updateTwinBinder.InvokeAsync(Connection.Options.ClientId, JsonSerializer.Serialize(payload), cancellationToken);
return twin;
}

Expand All @@ -61,7 +62,7 @@ public async Task<MqttClientPublishResult> SendTelemetryAsync(object payload, Ca
clientSegment = clientSegment.Replace("/", "/modules/");
}
return await Connection.PublishBinaryAsync($"devices/{clientSegment}/messages/events/",
new UTF8JsonSerializer().ToBytes(payload),
new Utf8JsonSerializer().ToBytes(payload),
Protocol.MqttQualityOfServiceLevel.AtLeastOnce,
false, t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace MQTTnet.Extensions.MultiCloud.AzureIoTClient
public interface IHubMqttClient
{
IMqttClient Connection { get; set; }
Func<GenericCommandRequest, GenericCommandResponse> OnCommandReceived { get; set; }
Func<IGenericCommandRequest, Task<IGenericCommandResponse>> OnCommandReceived { get; set; }
Func<JsonNode, GenericPropertyAck> OnPropertyUpdateReceived { get; set; }

Task<string> GetTwinAsync(CancellationToken cancellationToken = default);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.Binders;
using System.Text;

namespace MQTTnet.Extensions.MultiCloud.AzureIoTClient.Untyped
{
public class GenericCommand
public class GenericCommand : IGenericCommand
{
private readonly IMqttClient connection;
public Func<GenericCommandRequest, GenericCommandResponse>? OnCmdDelegate { get; set; }
public Func<IGenericCommandRequest, Task<IGenericCommandResponse>>? OnCmdDelegate { get; set; }

public GenericCommand(IMqttClient c)
{
Expand All @@ -20,7 +19,7 @@ public GenericCommand(IMqttClient c)
{
var segments = topic.Split('/');
var cmdName = segments[3];
string msg = Encoding.UTF8.GetString(m.ApplicationMessage.Payload);
string msg = m.ApplicationMessage.ConvertPayloadToString();
GenericCommandRequest req = new()
{
CommandName = cmdName,
Expand All @@ -29,7 +28,7 @@ public GenericCommand(IMqttClient c)
if (OnCmdDelegate != null && req != null)
{
var tp = TopicParser.ParseTopic(topic);
GenericCommandResponse response = OnCmdDelegate.Invoke(req);
IGenericCommandResponse response = await OnCmdDelegate.Invoke(req);
_ = connection.PublishStringAsync($"$iothub/methods/res/{response.Status}/?$rid={tp.Rid}", response.ReponsePayload);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace MQTTnet.Extensions.MultiCloud.AzureIoTClient.Untyped
{
public class GenericCommandRequest
public class GenericCommandRequest : IGenericCommandRequest
{
public string? CommandName { get; set; }
public string? CommandPayload { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

namespace MQTTnet.Extensions.MultiCloud.AzureIoTClient.Untyped
{
public class GenericCommandResponse : BaseCommandResponse
public class GenericCommandResponse : IGenericCommandResponse
{
[JsonPropertyName("payload")]
public string? ReponsePayload { get; set; }

[JsonIgnore]
public int Status { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public static async Task<IMqttClient> CreateFromConnectionSettingsAsync(Connecti
}
if (withBirth)
{
var birthPayload = new UTF8JsonSerializer().ToBytes(
var birthPayload = new Utf8JsonSerializer().ToBytes(
new BirthConvention.BirthMessage(BirthConvention.ConnectionStatus.online)
{
ModelId = cs.ModelId
Expand All @@ -36,10 +36,10 @@ public static async Task<IMqttClient> CreateFromConnectionSettingsAsync(Connecti
BirthConvention.BirthTopic(mqtt.Options.ClientId),
birthPayload,
Protocol.MqttQualityOfServiceLevel.AtLeastOnce, true, cancellationToken); //hack to disable retained in registry
if (pubAck.ReasonCode != MqttClientPublishReasonCode.Success)
{
throw new ApplicationException($"Error publishing Birth {cs}");
}
//if (pubAck.ReasonCode != MqttClientPublishReasonCode.Success)
//{
// throw new ApplicationException($"Error publishing Birth {cs}");
//}
}
return mqtt;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.Binders;

namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient
namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient;

public class CommandClient<T, TResp> : RequestResponseBinder<T, TResp>
{
public class CommandClient<T, TResp> : RequestResponseBinder<T, TResp>
public CommandClient(IMqttClient client, string commandName)
: base(client, commandName, false)
{
public CommandClient(IMqttClient client, string commandName)
: base(client, commandName, false)
{
requestTopicPattern = "device/{clientId}/commands/{commandName}";
responseTopicSuccess = "device/{clientId}/commands/{commandName}/resp";
}
requestTopicPattern = "device/{clientId}/commands/{commandName}";
responseTopicSuccess = "device/{clientId}/commands/{commandName}/resp";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class PropertyClient<T>

public Action<string,T>? OnPropertyUpdated { get; set; } = null;

public PropertyClient(IMqttClient client, string name) : this(client, name, new UTF8JsonSerializer()) { }
public PropertyClient(IMqttClient client, string name) : this(client, name, new Utf8JsonSerializer()) { }

public PropertyClient(IMqttClient client, string name, IMessageSerializer messageSerializer)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public ReadOnlyProperty(IMqttClient mqttClient, string name)
{
if (m.ApplicationMessage.Topic == _topic)
{
var ser = new UTF8JsonSerializer();
var ser = new Utf8JsonSerializer();
if (ser.TryReadFromBytes(m.ApplicationMessage.Payload, _name, out T propVal))
{
Value = propVal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class TelemetryClient<T>
public Action<string,T>? OnTelemetry { get; set; }

public TelemetryClient(IMqttClient client, string name)
: this(client, name, new UTF8JsonSerializer())
: this(client, name, new Utf8JsonSerializer())
{

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,48 +1,51 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.Binders;
using System.Text;
using MQTTnet.Extensions.MultiCloud.Serializers;

namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient.Untyped
namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient.Untyped;

public class GenericCommand : IGenericCommand
{
public class GenericCommand
{
private readonly IMqttClient connection;
public Func<GenericCommandRequest, GenericCommandResponse>? OnCmdDelegate { get; set; }
private readonly IMqttClient connection;
private readonly IMessageSerializer _serializer;

public GenericCommand(IMqttClient c)
public Func<IGenericCommandRequest, Task<IGenericCommandResponse>>? OnCmdDelegate { get; set; }

public GenericCommand(IMqttClient c)
{
_serializer = new Utf8JsonSerializer();
connection = c;
_ = connection.SubscribeWithReplyAsync($"device/{c.Options.ClientId}/commands/+");
connection.ApplicationMessageReceivedAsync += async m =>
{
connection = c;
_ = connection.SubscribeWithReplyAsync($"device/{c.Options.ClientId}/commands/+");
connection.ApplicationMessageReceivedAsync += async m =>
var topic = m.ApplicationMessage.Topic;
if (topic.StartsWith($"device/{c.Options.ClientId}/commands/"))
{
var topic = m.ApplicationMessage.Topic;
if (topic.StartsWith($"device/{c.Options.ClientId}/commands/"))
{
var segments = topic.Split('/');
var cmdName = segments[3];
string msg = Encoding.UTF8.GetString(m.ApplicationMessage.Payload);
var segments = topic.Split('/');
var cmdName = segments[3];

if (_serializer.TryReadFromBytes(m.ApplicationMessage.Payload, string.Empty, out string reqPayload))
{
var responseTopic = m.ApplicationMessage.ResponseTopic ?? $"{topic}/resp";

GenericCommandRequest req = new()
if (OnCmdDelegate != null)
{
CommandName = cmdName,
CommandPayload = msg
};
if (OnCmdDelegate != null && req != null)
{
var tp = TopicParser.ParseTopic(topic);
GenericCommandResponse response = OnCmdDelegate.Invoke(req);
GenericCommandRequest req = new()
{
CommandName = cmdName,
CommandPayload = reqPayload,
//CorrelationId = m.ApplicationMessage.CorrelationData
};

IGenericCommandResponse response = await OnCmdDelegate.Invoke(req);
await connection.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(responseTopic)
.WithPayload(Encoding.UTF8.GetBytes(response.ReponsePayload!))
.WithCorrelationData(m.ApplicationMessage.CorrelationData ?? Guid.Empty.ToByteArray())
.WithPayload(_serializer.ToBytes(response.ReponsePayload))
.WithUserProperty("status", response.Status.ToString())
.WithCorrelationData(m.ApplicationMessage.CorrelationData)
.Build());
}
}

};
}
// public async Task InitSubscriptionsAsync() => await connection.SubscribeWithReplyAsync("$iothub/methods/POST/#");
}
};
}
}
Loading

0 comments on commit e4b0494

Please sign in to comment.