Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/generic commands (#100) #101

Merged
merged 20 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/ConnectionSettings.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Connection settings can be established using the API or parsing a connection str
- `CaFile` Path to the CA certificate required to stablish the TLS session
- `GatewayHostName` Allows to connect to IoT Hub through a IoTEdge ($edgeHub) gateway (aka Transparent Gateway)
- `MqttVersion` Sets the MqttProtocolVersion, allowed values are 5 and 3 (will use 3.1.1)
- `MqttGatewayHostName` Allows to connect to IoTHub through a mqtt broker. Requires a gateway to implement the transparent gateway pattern.

## Sample Connection Strings

Expand Down
8 changes: 4 additions & 4 deletions samples/iothub-sample/Device.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public Device(ILogger<Device> logger, IConfiguration configuration)

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var connectionSettings = new ConnectionSettings(_configuration.GetConnectionString("cs1"));
var connectionSettings = new ConnectionSettings(_configuration.GetConnectionString("cs"));
_logger.LogWarning("Connecting to: {connectionSettings}", connectionSettings);

var client = new HubMqttClient(await HubDpsFactory.CreateFromConnectionSettingsAsync(connectionSettings, stoppingToken));
Expand All @@ -34,15 +34,15 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
var twin = await client.GetTwinAsync(stoppingToken);
Console.WriteLine(twin);

client.OnCommandReceived = m =>
client.OnCommandReceived = async m =>
{
Console.WriteLine(m.CommandName);
Console.WriteLine(m.CommandPayload);
return new GenericCommandResponse()
return await Task.FromResult(new GenericCommandResponse()
{
Status = 200,
ReponsePayload = JsonSerializer.Serialize(new { myResponse = "whatever" })
};
});
};

client.OnPropertyUpdateReceived = m =>
Expand Down
4 changes: 2 additions & 2 deletions samples/mqtt-commands/Device.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
IMqttClient connection = await BrokerClientFactory.CreateFromConnectionSettingsAsync(_configuration.GetConnectionString("Broker")!, false, stoppingToken);
_logger.LogWarning("Connected to {cs}", BrokerClientFactory.ComputedSettings);
GenericCommand cmd = new GenericCommand(connection);
cmd.OnCmdDelegate += req =>
cmd.OnCmdDelegate += async req =>
{
_logger.LogInformation($"Received command {req.CommandName} with payload {req.CommandPayload}");
return new GenericCommandResponse() { Status = 200, ReponsePayload = $"{req.CommandPayload} {req.CommandPayload}" };
return await Task.FromResult(new GenericCommandResponse() { Status = 200, ReponsePayload = $"{req.CommandPayload} {req.CommandPayload}" });
};

//while (!stoppingToken.IsCancellationRequested)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public static MqttClientOptionsBuilder WithAzureIoTHubCredentialsSas(this MqttCl
audience = $"{hostName}/devices/{deviceId}/modules/{moduleId}";
}

if (!string.IsNullOrEmpty(gatewayHostName))
{
audience = $"{hostName}/devices/{deviceId}";
}


(string username, string password) = SasAuth.GenerateHubSasCredentials(hostName, target, sasKey, audience, modelId, sasMinutes, gatewayHostName);
builder.WithCredentials(username, password);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public async Task<DpsStatus> ProvisionDeviceIdentity()
{
var putTopic = $"$dps/registrations/PUT/iotdps-register/?$rid={rid++}";
var registrationId = mqttClient.Options.ClientId;
var bytes = new UTF8JsonSerializer().ToBytes(new { registrationId, payload = new { modelId } });
var bytes = new Utf8JsonSerializer().ToBytes(new { registrationId, payload = new { modelId } });
var puback = await mqttClient.PublishBinaryAsync(putTopic, bytes);
if (puback.ReasonCode != MqttClientPublishReasonCode.Success)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.AzureIoTClient.Untyped;
using MQTTnet.Extensions.MultiCloud.Serializers;
using System.Text.Json;
using System.Text.Json.Nodes;

namespace MQTTnet.Extensions.MultiCloud.AzureIoTClient
Expand All @@ -16,7 +17,7 @@ public class HubMqttClient : IHubMqttClient
private readonly UpdateTwinBinder<object> updateTwinBinder;

private readonly GenericDesiredUpdatePropertyBinder genericDesiredUpdateProperty;
private readonly GenericCommand command;
private readonly IGenericCommand command;

public HubMqttClient(IMqttClient c)
{
Expand All @@ -29,7 +30,7 @@ public HubMqttClient(IMqttClient c)
genericDesiredUpdateProperty = new GenericDesiredUpdatePropertyBinder(c, updateTwinBinder!);
}

public Func<GenericCommandRequest, GenericCommandResponse> OnCommandReceived
public Func<IGenericCommandRequest, Task<IGenericCommandResponse>> OnCommandReceived
{
get => command.OnCmdDelegate!;
set => command.OnCmdDelegate = value;
Expand All @@ -49,7 +50,7 @@ public async Task<string> GetTwinAsync(CancellationToken cancellationToken = def

public async Task<int> UpdateTwinAsync(object payload, CancellationToken cancellationToken = default)
{
var twin = await updateTwinBinder.InvokeAsync(Connection.Options.ClientId, payload, cancellationToken);
var twin = await updateTwinBinder.InvokeAsync(Connection.Options.ClientId, JsonSerializer.Serialize(payload), cancellationToken);
return twin;
}

Expand All @@ -61,7 +62,7 @@ public async Task<MqttClientPublishResult> SendTelemetryAsync(object payload, Ca
clientSegment = clientSegment.Replace("/", "/modules/");
}
return await Connection.PublishBinaryAsync($"devices/{clientSegment}/messages/events/",
new UTF8JsonSerializer().ToBytes(payload),
new Utf8JsonSerializer().ToBytes(payload),
Protocol.MqttQualityOfServiceLevel.AtLeastOnce,
false, t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace MQTTnet.Extensions.MultiCloud.AzureIoTClient
public interface IHubMqttClient
{
IMqttClient Connection { get; set; }
Func<GenericCommandRequest, GenericCommandResponse> OnCommandReceived { get; set; }
Func<IGenericCommandRequest, Task<IGenericCommandResponse>> OnCommandReceived { get; set; }
Func<JsonNode, GenericPropertyAck> OnPropertyUpdateReceived { get; set; }

Task<string> GetTwinAsync(CancellationToken cancellationToken = default);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.Binders;
using System.Text;

namespace MQTTnet.Extensions.MultiCloud.AzureIoTClient.Untyped
{
public class GenericCommand
public class GenericCommand : IGenericCommand
{
private readonly IMqttClient connection;
public Func<GenericCommandRequest, GenericCommandResponse>? OnCmdDelegate { get; set; }
public Func<IGenericCommandRequest, Task<IGenericCommandResponse>>? OnCmdDelegate { get; set; }

public GenericCommand(IMqttClient c)
{
Expand All @@ -20,7 +19,7 @@ public GenericCommand(IMqttClient c)
{
var segments = topic.Split('/');
var cmdName = segments[3];
string msg = Encoding.UTF8.GetString(m.ApplicationMessage.Payload);
string msg = m.ApplicationMessage.ConvertPayloadToString();
GenericCommandRequest req = new()
{
CommandName = cmdName,
Expand All @@ -29,7 +28,7 @@ public GenericCommand(IMqttClient c)
if (OnCmdDelegate != null && req != null)
{
var tp = TopicParser.ParseTopic(topic);
GenericCommandResponse response = OnCmdDelegate.Invoke(req);
IGenericCommandResponse response = await OnCmdDelegate.Invoke(req);
_ = connection.PublishStringAsync($"$iothub/methods/res/{response.Status}/?$rid={tp.Rid}", response.ReponsePayload);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace MQTTnet.Extensions.MultiCloud.AzureIoTClient.Untyped
{
public class GenericCommandRequest
public class GenericCommandRequest : IGenericCommandRequest
{
public string? CommandName { get; set; }
public string? CommandPayload { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

namespace MQTTnet.Extensions.MultiCloud.AzureIoTClient.Untyped
{
public class GenericCommandResponse : BaseCommandResponse
public class GenericCommandResponse : IGenericCommandResponse
{
[JsonPropertyName("payload")]
public string? ReponsePayload { get; set; }

[JsonIgnore]
public int Status { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public static async Task<IMqttClient> CreateFromConnectionSettingsAsync(Connecti
}
if (withBirth)
{
var birthPayload = new UTF8JsonSerializer().ToBytes(
var birthPayload = new Utf8JsonSerializer().ToBytes(
new BirthConvention.BirthMessage(BirthConvention.ConnectionStatus.online)
{
ModelId = cs.ModelId
Expand All @@ -36,10 +36,10 @@ public static async Task<IMqttClient> CreateFromConnectionSettingsAsync(Connecti
BirthConvention.BirthTopic(mqtt.Options.ClientId),
birthPayload,
Protocol.MqttQualityOfServiceLevel.AtLeastOnce, true, cancellationToken); //hack to disable retained in registry
if (pubAck.ReasonCode != MqttClientPublishReasonCode.Success)
{
throw new ApplicationException($"Error publishing Birth {cs}");
}
//if (pubAck.ReasonCode != MqttClientPublishReasonCode.Success)
//{
// throw new ApplicationException($"Error publishing Birth {cs}");
//}
}
return mqtt;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.Binders;

namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient
namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient;

public class CommandClient<T, TResp> : RequestResponseBinder<T, TResp>
{
public class CommandClient<T, TResp> : RequestResponseBinder<T, TResp>
public CommandClient(IMqttClient client, string commandName)
: base(client, commandName, false)
{
public CommandClient(IMqttClient client, string commandName)
: base(client, commandName, false)
{
requestTopicPattern = "device/{clientId}/commands/{commandName}";
responseTopicSuccess = "device/{clientId}/commands/{commandName}/resp";
}
requestTopicPattern = "device/{clientId}/commands/{commandName}";
responseTopicSuccess = "device/{clientId}/commands/{commandName}/resp";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class PropertyClient<T>

public Action<string,T>? OnPropertyUpdated { get; set; } = null;

public PropertyClient(IMqttClient client, string name) : this(client, name, new UTF8JsonSerializer()) { }
public PropertyClient(IMqttClient client, string name) : this(client, name, new Utf8JsonSerializer()) { }

public PropertyClient(IMqttClient client, string name, IMessageSerializer messageSerializer)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public ReadOnlyProperty(IMqttClient mqttClient, string name)
{
if (m.ApplicationMessage.Topic == _topic)
{
var ser = new UTF8JsonSerializer();
var ser = new Utf8JsonSerializer();
if (ser.TryReadFromBytes(m.ApplicationMessage.Payload, _name, out T propVal))
{
Value = propVal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class TelemetryClient<T>
public Action<string,T>? OnTelemetry { get; set; }

public TelemetryClient(IMqttClient client, string name)
: this(client, name, new UTF8JsonSerializer())
: this(client, name, new Utf8JsonSerializer())
{

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,48 +1,51 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.Binders;
using System.Text;
using MQTTnet.Extensions.MultiCloud.Serializers;

namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient.Untyped
namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient.Untyped;

public class GenericCommand : IGenericCommand
{
public class GenericCommand
{
private readonly IMqttClient connection;
public Func<GenericCommandRequest, GenericCommandResponse>? OnCmdDelegate { get; set; }
private readonly IMqttClient connection;
private readonly IMessageSerializer _serializer;

public GenericCommand(IMqttClient c)
public Func<IGenericCommandRequest, Task<IGenericCommandResponse>>? OnCmdDelegate { get; set; }

public GenericCommand(IMqttClient c)
{
_serializer = new Utf8JsonSerializer();
connection = c;
_ = connection.SubscribeWithReplyAsync($"device/{c.Options.ClientId}/commands/+");
connection.ApplicationMessageReceivedAsync += async m =>
{
connection = c;
_ = connection.SubscribeWithReplyAsync($"device/{c.Options.ClientId}/commands/+");
connection.ApplicationMessageReceivedAsync += async m =>
var topic = m.ApplicationMessage.Topic;
if (topic.StartsWith($"device/{c.Options.ClientId}/commands/"))
{
var topic = m.ApplicationMessage.Topic;
if (topic.StartsWith($"device/{c.Options.ClientId}/commands/"))
{
var segments = topic.Split('/');
var cmdName = segments[3];
string msg = Encoding.UTF8.GetString(m.ApplicationMessage.Payload);
var segments = topic.Split('/');
var cmdName = segments[3];

if (_serializer.TryReadFromBytes(m.ApplicationMessage.Payload, string.Empty, out string reqPayload))
{
var responseTopic = m.ApplicationMessage.ResponseTopic ?? $"{topic}/resp";

GenericCommandRequest req = new()
if (OnCmdDelegate != null)
{
CommandName = cmdName,
CommandPayload = msg
};
if (OnCmdDelegate != null && req != null)
{
var tp = TopicParser.ParseTopic(topic);
GenericCommandResponse response = OnCmdDelegate.Invoke(req);
GenericCommandRequest req = new()
{
CommandName = cmdName,
CommandPayload = reqPayload,
//CorrelationId = m.ApplicationMessage.CorrelationData
};

IGenericCommandResponse response = await OnCmdDelegate.Invoke(req);
await connection.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(responseTopic)
.WithPayload(Encoding.UTF8.GetBytes(response.ReponsePayload!))
.WithCorrelationData(m.ApplicationMessage.CorrelationData ?? Guid.Empty.ToByteArray())
.WithPayload(_serializer.ToBytes(response.ReponsePayload))
.WithUserProperty("status", response.Status.ToString())
.WithCorrelationData(m.ApplicationMessage.CorrelationData)
.Build());
}
}

};
}
// public async Task InitSubscriptionsAsync() => await connection.SubscribeWithReplyAsync("$iothub/methods/POST/#");
}
};
}
}
Loading