Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reviewing generic ser #103

Merged
merged 1 commit into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions samples/iothub-sample/Device.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
var twin = await client.GetTwinAsync(stoppingToken);
Console.WriteLine(twin);

client.OnCommandReceived = m =>
client.OnCommandReceived = async m =>
{
Console.WriteLine(m.CommandName);
Console.WriteLine(m.CommandPayload);
return new GenericCommandResponse()
return await Task.FromResult(new GenericCommandResponse()
{
Status = 200,
ReponsePayload = JsonSerializer.Serialize(new { myResponse = "whatever" })
};
});
};

client.OnPropertyUpdateReceived = m =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public async Task<DpsStatus> ProvisionDeviceIdentity()
{
var putTopic = $"$dps/registrations/PUT/iotdps-register/?$rid={rid++}";
var registrationId = mqttClient.Options.ClientId;
var bytes = new UTF8JsonSerializer().ToBytes(new { registrationId, payload = new { modelId } });
var bytes = new Utf8JsonSerializer().ToBytes(new { registrationId, payload = new { modelId } });
var puback = await mqttClient.PublishBinaryAsync(putTopic, bytes);
if (puback.ReasonCode != MqttClientPublishReasonCode.Success)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public HubMqttClient(IMqttClient c)
genericDesiredUpdateProperty = new GenericDesiredUpdatePropertyBinder(c, updateTwinBinder!);
}

public Func<GenericCommandRequest, GenericCommandResponse> OnCommandReceived
public Func<GenericCommandRequest, Task<GenericCommandResponse>> OnCommandReceived
{
get => command.OnCmdDelegate!;
set => command.OnCmdDelegate = value;
Expand Down Expand Up @@ -62,7 +62,7 @@ public async Task<MqttClientPublishResult> SendTelemetryAsync(object payload, Ca
clientSegment = clientSegment.Replace("/", "/modules/");
}
return await Connection.PublishBinaryAsync($"devices/{clientSegment}/messages/events/",
new UTF8JsonSerializer().ToBytes(payload),
new Utf8JsonSerializer().ToBytes(payload),
Protocol.MqttQualityOfServiceLevel.AtLeastOnce,
false, t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace MQTTnet.Extensions.MultiCloud.AzureIoTClient
public interface IHubMqttClient
{
IMqttClient Connection { get; set; }
Func<GenericCommandRequest, GenericCommandResponse> OnCommandReceived { get; set; }
Func<GenericCommandRequest, Task<GenericCommandResponse>> OnCommandReceived { get; set; }
Func<JsonNode, GenericPropertyAck> OnPropertyUpdateReceived { get; set; }

Task<string> GetTwinAsync(CancellationToken cancellationToken = default);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.Binders;
using System.Text;

namespace MQTTnet.Extensions.MultiCloud.AzureIoTClient.Untyped
{
public class GenericCommand
{
private readonly IMqttClient connection;
public Func<GenericCommandRequest, GenericCommandResponse>? OnCmdDelegate { get; set; }
public Func<GenericCommandRequest, Task<GenericCommandResponse>>? OnCmdDelegate { get; set; }

public GenericCommand(IMqttClient c)
{
Expand All @@ -20,7 +19,7 @@ public GenericCommand(IMqttClient c)
{
var segments = topic.Split('/');
var cmdName = segments[3];
string msg = Encoding.UTF8.GetString(m.ApplicationMessage.Payload);
string msg = m.ApplicationMessage.ConvertPayloadToString();
GenericCommandRequest req = new()
{
CommandName = cmdName,
Expand All @@ -29,7 +28,7 @@ public GenericCommand(IMqttClient c)
if (OnCmdDelegate != null && req != null)
{
var tp = TopicParser.ParseTopic(topic);
GenericCommandResponse response = OnCmdDelegate.Invoke(req);
GenericCommandResponse response = await OnCmdDelegate.Invoke(req);
_ = connection.PublishStringAsync($"$iothub/methods/res/{response.Status}/?$rid={tp.Rid}", response.ReponsePayload);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public static async Task<IMqttClient> CreateFromConnectionSettingsAsync(Connecti
MqttClient? mqtt = new MqttFactory().CreateMqttClient(MqttNetTraceLogger.CreateTraceLogger()) as MqttClient;
var connAck = await mqtt!.ConnectAsync(new MqttClientOptionsBuilder()
.WithConnectionSettings(cs, withBirth)
//.WithProtocolVersion(Formatter.MqttProtocolVersion.V500)
.Build(), cancellationToken);
ComputedSettings = cs;
if (connAck.ResultCode != MqttClientConnectResultCode.Success)
Expand All @@ -25,7 +26,7 @@ public static async Task<IMqttClient> CreateFromConnectionSettingsAsync(Connecti
}
if (withBirth)
{
var birthPayload = new UTF8JsonSerializer().ToBytes(
var birthPayload = new Utf8JsonSerializer().ToBytes(
new BirthConvention.BirthMessage(BirthConvention.ConnectionStatus.online)
{
ModelId = cs.ModelId
Expand All @@ -34,11 +35,11 @@ public static async Task<IMqttClient> CreateFromConnectionSettingsAsync(Connecti
var pubAck = await mqtt.PublishBinaryAsync(
BirthConvention.BirthTopic(mqtt.Options.ClientId),
birthPayload,
Protocol.MqttQualityOfServiceLevel.AtMostOnce, true, cancellationToken);
//if (pubAck.ReasonCode != MqttClientPublishReasonCode.Success)
//{
// throw new ApplicationException($"Error publishing Birth {cs}");
//}
Protocol.MqttQualityOfServiceLevel.AtLeastOnce, true, cancellationToken); //hack to disable retained in registry
if (pubAck.ReasonCode != MqttClientPublishReasonCode.Success)
{
throw new ApplicationException($"Error publishing Birth {cs}");
}
}
return mqtt;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class PropertyClient<T>

public Action<string,T>? OnPropertyUpdated { get; set; } = null;

public PropertyClient(IMqttClient client, string name) : this(client, name, new UTF8JsonSerializer()) { }
public PropertyClient(IMqttClient client, string name) : this(client, name, new Utf8JsonSerializer()) { }

public PropertyClient(IMqttClient client, string name, IMessageSerializer messageSerializer)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public ReadOnlyProperty(IMqttClient mqttClient, string name)
{
if (m.ApplicationMessage.Topic == _topic)
{
var ser = new UTF8JsonSerializer();
var ser = new Utf8JsonSerializer();
if (ser.TryReadFromBytes(m.ApplicationMessage.Payload, _name, out T propVal))
{
Value = propVal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class TelemetryClient<T>
public Action<string,T>? OnTelemetry { get; set; }

public TelemetryClient(IMqttClient client, string name)
: this(client, name, new UTF8JsonSerializer())
: this(client, name, new Utf8JsonSerializer())
{

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class GenericCommand

public GenericCommand(IMqttClient c)
{
_serializer = new UTF8JsonSerializer();
_serializer = new Utf8JsonSerializer();
connection = c;
_ = connection.SubscribeWithReplyAsync($"device/{c.Options.ClientId}/commands/+");
connection.ApplicationMessageReceivedAsync += async m =>
Expand All @@ -23,7 +23,7 @@ public GenericCommand(IMqttClient c)
var segments = topic.Split('/');
var cmdName = segments[3];

if (_serializer.TryReadFromBytes(m.ApplicationMessage.Payload, string.Empty, out object reqPayload))
if (_serializer.TryReadFromBytes(m.ApplicationMessage.Payload, string.Empty, out string reqPayload))
{
var responseTopic = m.ApplicationMessage.ResponseTopic ?? $"{topic}/resp";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public GenericCommandClient(IMqttClient client)
{
_mqttClient = client;
_remoteClientId = string.Empty;
_serializer = new UTF8JsonSerializer();
_serializer = new Utf8JsonSerializer();

_mqttClient.ApplicationMessageReceivedAsync += async m =>
{
Expand All @@ -40,7 +40,7 @@ public GenericCommandClient(IMqttClient client)
var up = m.ApplicationMessage.UserProperties.FirstOrDefault(p => p.Name.Equals("status"));
int status = up != null ? int.Parse(up.Value) : 500;

if (_serializer.TryReadFromBytes(m.ApplicationMessage.Payload, string.Empty, out object respPayload))
if (_serializer.TryReadFromBytes(m.ApplicationMessage.Payload, string.Empty, out string respPayload))
{
GenericCommandResponse resp = new()
{
Expand All @@ -67,20 +67,16 @@ public async Task<GenericCommandResponse> InvokeAsync(string clientId, GenericCo

string commandTopic = requestTopicPattern.Replace("{clientId}", _remoteClientId).Replace("{commandName}", _commandName);
var responseTopic = responseTopicSub.Replace("{clientId}", _remoteClientId).Replace("{commandName}", _commandName);
await _mqttClient.SubscribeAsync(responseTopic, Protocol.MqttQualityOfServiceLevel.AtMostOnce, ct);
_ =_mqttClient.SubscribeAsync(responseTopic, Protocol.MqttQualityOfServiceLevel.AtMostOnce, ct);

var pubAck = await _mqttClient.PublishAsync(
_ = _mqttClient.PublishAsync(
new MqttApplicationMessageBuilder()
.WithTopic(commandTopic)
.WithPayload(_serializer.ToBytes(request.RequestPayload))
.WithResponseTopic(responseTopicSuccess.Replace("{clientId}", _remoteClientId).Replace("{commandName}", _commandName))
.WithCorrelationData(corr.ToByteArray())
.Build());

if (!pubAck.IsSuccess)
{
throw new ApplicationException("Error publishing Request Message");
}

return await _tcs.Task.TimeoutAfter(TimeSpan.FromSeconds(5));

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public abstract class CloudToDeviceBinder<T, TResp> : ICloudToDevice<T, TResp>
protected Action<TopicParameters>? PreProcessMessage;

public CloudToDeviceBinder(IMqttClient connection, string name)
: this(connection, name, new UTF8JsonSerializer()) { }
: this(connection, name, new Utf8JsonSerializer()) { }

public CloudToDeviceBinder(IMqttClient connection, string name, IMessageSerializer serializer)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public abstract class DeviceToCloudBinder<T> : IDeviceToCloud<T>
public bool WrapMessage = false;
public bool Retain = false;

public DeviceToCloudBinder(IMqttClient mqttClient, string name) : this(mqttClient, name, new UTF8JsonSerializer()) { }
public DeviceToCloudBinder(IMqttClient mqttClient, string name) : this(mqttClient, name, new Utf8JsonSerializer()) { }

public DeviceToCloudBinder(IMqttClient mqttClient, string name, IMessageSerializer ser)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class RequestResponseBinder<T, TResp>
readonly IMessageSerializer _serializer;

public RequestResponseBinder(IMqttClient client, string name, bool unwrap)
: this(client, name, unwrap, new UTF8JsonSerializer())
: this(client, name, unwrap, new Utf8JsonSerializer())
{

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public BirthMessage(ConnectionStatus connectionStatus)
}

public static byte[] LastWillPayload() =>
new UTF8JsonSerializer().ToBytes(new BirthMessage(ConnectionStatus.offline));
new Utf8JsonSerializer().ToBytes(new BirthMessage(ConnectionStatus.offline));
public static byte[] LastWillPayload(string modelId) =>
new UTF8JsonSerializer().ToBytes(new BirthMessage(ConnectionStatus.offline) { ModelId = modelId });
new Utf8JsonSerializer().ToBytes(new BirthMessage(ConnectionStatus.offline) { ModelId = modelId });
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace MQTTnet.Extensions.MultiCloud.Serializers;

public class UTF8JsonSerializer : IMessageSerializer
public class Utf8JsonSerializer : IMessageSerializer
{
private static class Json
{
Expand Down Expand Up @@ -44,6 +44,7 @@ public byte[] ToBytes<T>(T payload, string name = "")
{
return Encoding.UTF8.GetBytes(Json.Stringify(payload!));
}

}
else
{
Expand All @@ -67,7 +68,7 @@ public bool TryReadFromBytes<T>(byte[] payload, string name, out T result)
if (typeof(T) == typeof(string))
{
result = (T)Convert.ChangeType(Encoding.UTF8.GetString(payload), typeof(T));
}
}
else if (typeof(T) == typeof(object))
{
result = (T)Convert.ChangeType(Encoding.UTF8.GetString(payload), typeof(T))!;
Expand All @@ -76,6 +77,7 @@ public bool TryReadFromBytes<T>(byte[] payload, string name, out T result)
{
result = Json.FromString<T>(Encoding.UTF8.GetString(payload))!;
}

}
else
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQTTnet.Extensions.MultiCloud.Serializers
{
public class Utf8StringSerializer : IMessageSerializer
{
public byte[] ToBytes<T>(T payload, string name = "") => Encoding.UTF8.GetBytes((payload as string)!);


public bool TryReadFromBytes<T>(byte[] payload, string name, out T result)
{
result = (T)Convert.ChangeType(Encoding.UTF8.GetString(payload), typeof(T));
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,38 @@ internal async Task CommandsGetCalled()
await rm.RemoveDeviceAsync(deviceId);

}
[Fact]
internal async Task InvokeGenericCommand()
{
var deviceId = "gencmd" + new Random().Next(100);
var device = await GetOrCreateDeviceAsync(deviceId);
bool commandInvoked = false;
string commandName = string.Empty;
string commandRequestPayload = string.Empty;
var td = new HubMqttClient(await HubDpsFactory.CreateFromConnectionSettingsAsync($"HostName={hubName};DeviceId={deviceId};SharedAccessKey={device.Authentication.SymmetricKey.PrimaryKey}"));
td.OnCommandReceived += async cmd =>
{
commandName = cmd.CommandName!;
commandRequestPayload = cmd.CommandPayload!.ToString();
commandInvoked = true;
return await Task.FromResult(new AzureIoTClient.Untyped.GenericCommandResponse
{
Status = 200,
ReponsePayload = JsonSerializer.Serialize(new { myCommandResponse = "adios"})
});
};
await Task.Delay(200);
var sc = ServiceClient.CreateFromConnectionString(hubConnectionString);
CloudToDeviceMethod c2dMethod = new("aCommand");
string requestPayload = JsonSerializer.Serialize(new { myComandRequest = "hello" });
c2dMethod.SetPayloadJson(requestPayload); ;
var dmRes = await sc.InvokeDeviceMethodAsync(deviceId, c2dMethod);
Assert.True(commandInvoked);
Assert.Equal("aCommand", commandName);
Assert.Equal(requestPayload, commandRequestPayload);
string expectedJson = Json.Stringify(new { myCommandResponse = "adios" });
Assert.Equal(expectedJson, dmRes.GetPayloadAsJson());
}

private async Task<Device> GetOrCreateDeviceAsync(string deviceId, bool x509 = false)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
using MQTTnet.Extensions.MultiCloud.BrokerIoTClient;
using MQTTnet.Extensions.MultiCloud.Serializers;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Xunit;

Expand All @@ -20,12 +16,12 @@ public void CommandWithReqResp()
cmd.OnMessage = async req =>
{
cmdReceived = true;
return await Task.FromResult(req.ToString());
return await Task.FromResult($"received {req}");
};
mockMqtt.SimulateNewBinaryMessage("device/mock/commands/aCmdReqResp",new UTF8JsonSerializer().ToBytes(2));
mockMqtt.SimulateNewBinaryMessage("device/mock/commands/aCmdReqResp",new Utf8JsonSerializer().ToBytes(2));
Assert.True(cmdReceived);
Assert.Equal("device/mock/commands/aCmdReqResp/resp", mockMqtt.topicRecceived);
Assert.Equal("2", mockMqtt.payloadReceived);
Assert.Equal("received 2", mockMqtt.payloadReceived);
}

[Fact]
Expand All @@ -39,7 +35,7 @@ public void CommandWithReq()
cmdReceived = true;
return await Task.FromResult(string.Empty);
};
mockMqtt.SimulateNewBinaryMessage("device/mock/commands/aCmdReq", new UTF8JsonSerializer().ToBytes(2));
mockMqtt.SimulateNewBinaryMessage("device/mock/commands/aCmdReq", new Utf8JsonSerializer().ToBytes(2));
Assert.True(cmdReceived);
Assert.Equal("device/mock/commands/aCmdReq/resp", mockMqtt.topicRecceived);
Assert.Empty(mockMqtt.payloadReceived);
Expand All @@ -56,7 +52,7 @@ public void CommandWithRes()
cmdReceived = true;
return await Task.FromResult(1);
};
mockMqtt.SimulateNewBinaryMessage("device/mock/commands/aCmdRes", new UTF8JsonSerializer().ToBytes(""));
mockMqtt.SimulateNewBinaryMessage("device/mock/commands/aCmdRes", new Utf8JsonSerializer().ToBytes(""));
Assert.True(cmdReceived);
Assert.Equal("device/mock/commands/aCmdRes/resp", mockMqtt.topicRecceived);
Assert.Equal("1", mockMqtt.payloadReceived);
Expand All @@ -73,7 +69,7 @@ public void CommandEmpty()
cmdReceived = true;
return await Task.FromResult(string.Empty);
};
mockMqtt.SimulateNewBinaryMessage("device/mock/commands/aCmd", new UTF8JsonSerializer().ToBytes(""));
mockMqtt.SimulateNewBinaryMessage("device/mock/commands/aCmd", new Utf8JsonSerializer().ToBytes(""));
Assert.True(cmdReceived);
Assert.Equal("device/mock/commands/aCmd/resp", mockMqtt.topicRecceived);
Assert.Empty(mockMqtt.payloadReceived);
Expand Down
Loading