Skip to content

Commit

Permalink
Feat/formats (#86)
Browse files Browse the repository at this point in the history
* moving serializers

* copy serializers to samples

* upd tests to new hub

* review proto bindings
  • Loading branch information
ridomin authored Nov 11, 2022
1 parent 1ee0784 commit bbceded
Show file tree
Hide file tree
Showing 41 changed files with 314 additions and 339 deletions.
28 changes: 14 additions & 14 deletions samples/memmon-protobuff/Device.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ public class Device : BackgroundService
private const bool default_enabled = true;
private const int default_interval = 45;

private MemmonClient client;
private ConnectionSettings connectionSettings;
private MemmonClient? client;
private ConnectionSettings? connectionSettings;

private string infoVersion = string.Empty;

Expand Down Expand Up @@ -102,7 +102,7 @@ private async Task<ack> Property_enabled_UpdateHandler(Properties desired)
var ack = new ack
{
Value = Google.Protobuf.WellKnownTypes.Any.Pack(desired),
Version = client.Property_enabled.Version.Value,
Version = client!.Property_enabled.Version!.Value,
Description = "desired notification accepted",
Status = 200
};
Expand All @@ -126,15 +126,15 @@ private async Task<ack> Property_interval_UpdateHandler(Properties desired)
ack.Value = Google.Protobuf.WellKnownTypes.Any.Pack(desired);
ack.Description = "desired notification accepted";
ack.Status = 200;
ack.Version = client.Property_interval.Version.Value;
ack.Version = client.Property_interval.Version!.Value;
client.Props.Interval = desired.Interval;
}
else
{
ack.Description = "negative values not accepted";
ack.Status = 405;
ack.Value = Google.Protobuf.WellKnownTypes.Any.Pack(client.Props);
ack.Version = client.Property_interval.Version.Value;
ack.Version = client.Property_interval.Version!.Value;
};
return await Task.FromResult(ack);
}
Expand All @@ -161,8 +161,8 @@ private async Task<getRuntimeStatsResponse> Command_getRuntimeStats_Handler(getR
if (req.Mode == getRuntimeStatsMode.Full)
{
result.DiagResults.Add("sdk info:", infoVersion);
result.DiagResults.Add("interval: ", client.Props.Interval.ToString());
result.DiagResults.Add("enabled: ", client.Props.Enabled.ToString());
result.DiagResults.Add("interval: ", client!.Props.Interval.ToString());
result.DiagResults.Add("enabled: ", client!.Props.Enabled.ToString());
result.DiagResults.Add("twin receive: ", twinRecCounter.ToString());
//result.diagnosticResults.Add($"twin sends: ", RidCounter.Current.ToString());
result.DiagResults.Add("telemetry: ", telemetryCounter.ToString());
Expand All @@ -173,25 +173,25 @@ private async Task<getRuntimeStatsResponse> Command_getRuntimeStats_Handler(getR
}

#pragma warning disable IDE0052 // Remove unread private members
private Timer screenRefresher;
private Timer? screenRefresher;
#pragma warning restore IDE0052 // Remove unread private members
private void RefreshScreen(object state)
{
string RenderData()
{
void AppendLineWithPadRight(StringBuilder sb, string s) => sb.AppendLine(s?.PadRight(Console.BufferWidth > 1 ? Console.BufferWidth - 1 : 300));

string enabled_value = client?.Props.Enabled.ToString();
string interval_value = client?.Props.Interval.ToString();
string enabled_value = client!.Props.Enabled!.ToString();
string interval_value = client!.Props.Interval!.ToString();
StringBuilder sb = new();
AppendLineWithPadRight(sb, " ");
AppendLineWithPadRight(sb, $"{connectionSettings?.HostName}:{connectionSettings?.TcpPort}");
AppendLineWithPadRight(sb, $"{connectionSettings.ClientId} (Auth:{connectionSettings.Auth}/ TLS:{connectionSettings.UseTls})");
AppendLineWithPadRight(sb, $"{connectionSettings?.ClientId} (Auth:{connectionSettings!.Auth}/ TLS:{connectionSettings.UseTls})");
AppendLineWithPadRight(sb, " ");
AppendLineWithPadRight(sb, string.Format("{0:8} | {1:15} | {2}", "Property", "Value".PadRight(15), "Version"));
AppendLineWithPadRight(sb, string.Format("{0:8} | {1:15} | {2}", "--------", "-----".PadLeft(15, '-'), "------"));
AppendLineWithPadRight(sb, string.Format("{0:8} | {1:15} | {2}", "enabled".PadRight(8), enabled_value?.PadLeft(15), client.Property_enabled.Version.Value));
AppendLineWithPadRight(sb, string.Format("{0:8} | {1:15} | {2}", "interval".PadRight(8), interval_value?.PadLeft(15), client.Property_interval .Version.Value));
AppendLineWithPadRight(sb, string.Format("{0:8} | {1:15} | {2}", "enabled".PadRight(8), enabled_value?.PadLeft(15), client.Property_enabled.Version!.Value));
AppendLineWithPadRight(sb, string.Format("{0:8} | {1:15} | {2}", "interval".PadRight(8), interval_value?.PadLeft(15), client.Property_interval.Version!.Value));
//AppendLineWithPadRight(sb, string.Format("{0:8} | {1:15} | {2}", "started".PadRight(8), client.Props.Started.Seconds.ToString().PadLeft(15), ""));
AppendLineWithPadRight(sb, " ");
AppendLineWithPadRight(sb, $"Reconnects: {reconnectCounter}");
Expand All @@ -210,6 +210,6 @@ string RenderData()

Console.SetCursorPosition(0, 0);
Console.WriteLine(RenderData());
screenRefresher = new Timer(RefreshScreen, this, 1000, 0);
screenRefresher = new Timer(RefreshScreen!, this, 1000, 0);
}
}
18 changes: 18 additions & 0 deletions samples/memmon-protobuff/Serializers/CommandProtobuff.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using Google.Protobuf;
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.Binders;

namespace Serializers;

public class CommandProtobuff<T, TResp> : CloudToDeviceBinder<T, TResp>, ICommand<T, TResp>
{
public CommandProtobuff(IMqttClient client, string name, MessageParser parser)
: base(client, name, new ProtobufSerializer(parser))
{
UnwrapRequest = false;
RequestTopicPattern = "device/{clientId}/cmd/{name}";
SubscribeTopicPattern = "device/{clientId}/cmd/{name}";
ResponseTopicPattern = "device/{clientId}/cmd/{name}/resp";
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Google.Protobuf;
using MQTTnet.Extensions.MultiCloud;

namespace MQTTnet.Extensions.MultiCloud.Serializers;
namespace Serializers;

public class ProtobufSerializer : IMessageSerializer
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.Binders;
using MQTTnet.Extensions.MultiCloud.Serializers;

namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient;
namespace Serializers;

public class ReadOnlyPropertyProtobuff<T> : DeviceToCloudBinder<T>, IReadOnlyProperty<T>
{
Expand Down
20 changes: 20 additions & 0 deletions samples/memmon-protobuff/Serializers/TelemetryProtobuf.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.Binders;

namespace Serializers;

public class TelemetryProtobuf<T> : DeviceToCloudBinder<T>, ITelemetry<T>
{

public TelemetryProtobuf(IMqttClient mqttClient) :
this(mqttClient, string.Empty)
{ }

public TelemetryProtobuf(IMqttClient mqttClient, string name)
: base(mqttClient, name, new ProtobufSerializer())
{
TopicPattern = "device/{clientId}/tel";
WrapMessage = false;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using Google.Protobuf;
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.Binders;
using MQTTnet.Extensions.MultiCloud.BrokerIoTClient;
using MQTTnet.Extensions.MultiCloud.Serializers;

namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient
namespace Serializers
{
public class WritablePropertyProtobuff<T, TResp> : CloudToDeviceBinder<T, TResp>, IWritableProperty<T, TResp>, IDeviceToCloud<TResp>
{
Expand All @@ -16,7 +18,7 @@ public WritablePropertyProtobuff(IMqttClient connection, string name, MessagePar
{
_connection = connection;
_name = name;
SubscribeTopicPattern = "device/{clientId}/props/{name}/set";
SubscribeTopicPattern = "device/{clientId}/props/{name}/set/#";
RequestTopicPattern = "device/{clientId}/props/{name}/set";
ResponseTopicPattern = "device/{clientId}/props/{name}/ack";
RetainResponse = true;
Expand All @@ -40,9 +42,9 @@ public async Task InitPropertyAsync(string intialState, TResp defaultValue, Canc
{
TResp payload = default!; //TODO use generic ACK for protos
await SendMessageAsync(payload, cancellationToken);

}


}
}
3 changes: 2 additions & 1 deletion samples/memmon-protobuff/_protos/memmon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.BrokerIoTClient;
using Serializers;
using System;
using System.Collections.Generic;
using System.Linq;
Expand All @@ -23,7 +24,7 @@ internal class MemmonClient
public MemmonClient(IMqttClient c)
{
AllProperties = new ReadOnlyPropertyProtobuff<Properties>(c);
AllTelemetry = new TelemetryProtobuff<Telemetries>(c);
AllTelemetry = new TelemetryProtobuf<Telemetries>(c);
Property_interval = new WritablePropertyProtobuff<Properties, ack>(c, "interval", Properties.Parser);
Property_enabled = new WritablePropertyProtobuff<Properties, ack>(c, "enabled", Properties.Parser);
getRuntimeStats = new CommandProtobuff<getRuntimeStatsRequest, getRuntimeStatsResponse>(c, "getRuntimeStats", getRuntimeStatsRequest.Parser);
Expand Down
2 changes: 2 additions & 0 deletions samples/memmon-protobuff/memmon-protobuff.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<IsPackable>false</IsPackable>
<Nullable>enable</Nullable>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
</PropertyGroup>


<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.21.9" />
<PackageReference Include="Grpc.Tools" Version="2.50.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
17 changes: 17 additions & 0 deletions samples/mqtt-grpc-device/Serializers/CommandProtobuff.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using Google.Protobuf;
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.Binders;

namespace mqtt_grpc_device.Serializers;

public class CommandProtobuff<T, TResp> : CloudToDeviceBinder<T, TResp>, ICommand<T, TResp>
{
public CommandProtobuff(IMqttClient client, string name, MessageParser parser)
: base(client, name, new ProtobufSerializer(parser))
{
UnwrapRequest = false;
RequestTopicPattern = "device/{clientId}/cmd/{name}";
ResponseTopicPattern = "device/{clientId}/cmd/{name}/resp";
}
}
41 changes: 41 additions & 0 deletions samples/mqtt-grpc-device/Serializers/ProtobufSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using Google.Protobuf;
using MQTTnet.Extensions.MultiCloud;

namespace mqtt_grpc_device.Serializers;

public class ProtobufSerializer : IMessageSerializer
{
private readonly MessageParser? _parser;
public ProtobufSerializer() { }
public ProtobufSerializer(MessageParser parser) => _parser = parser;
public byte[] ToBytes<T>(T payload, string name = "") => (payload as IMessage).ToByteArray();

public bool TryReadFromBytes<T>(byte[] payload, string name, out T result)
{
if (payload == null || payload.Length == 0)
{
result = default!;
return false;
}
bool found = false;
IMessage msg = _parser!.ParseFrom(payload);
if (string.IsNullOrEmpty(name))
{
found = true;
result = (T)msg;
}
else
{
if (msg.ToString()!.Contains(name)) // find better way
{
result = (T)msg;
found = true;
}
else
{
result = default!;
}
}
return found;
}
}
19 changes: 19 additions & 0 deletions samples/mqtt-grpc-device/Serializers/ReadOnlyPropertyProtobuff.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.Binders;
using MQTTnet.Extensions.MultiCloud.Serializers;

namespace mqtt_grpc_device.Serializers;

public class ReadOnlyPropertyProtobuff<T> : DeviceToCloudBinder<T>, IReadOnlyProperty<T>
{
public ReadOnlyPropertyProtobuff(IMqttClient mqttClient) : this(mqttClient, string.Empty) { }

public ReadOnlyPropertyProtobuff(IMqttClient mqttClient, string name)
: base(mqttClient, name, new ProtobufSerializer())
{
TopicPattern = "device/{clientId}/props";
WrapMessage = false;
Retain = true;
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.Binders;
using MQTTnet.Extensions.MultiCloud.Serializers;

namespace MQTTnet.Extensions.MultiCloud.AzureIoTClient;
namespace mqtt_grpc_device.Serializers;

public class TelemetryProtobuf<T> : DeviceToCloudBinder<T>, ITelemetry<T>
{

public TelemetryProtobuf(IMqttClient mqttClient) :
this(mqttClient, string.Empty)
{ }

public TelemetryProtobuf(IMqttClient mqttClient, string name)
: base(mqttClient, name, new ProtobufSerializer())
{
Expand Down
51 changes: 51 additions & 0 deletions samples/mqtt-grpc-device/Serializers/WritablePropertyProtobuff.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using Google.Protobuf;
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.Binders;
using MQTTnet.Extensions.MultiCloud.BrokerIoTClient;
using System.Threading;
using System.Threading.Tasks;

namespace mqtt_grpc_device.Serializers
{
public class WritablePropertyProtobuff<T, TResp> : CloudToDeviceBinder<T, TResp>, IWritableProperty<T, TResp>, IDeviceToCloud<TResp>
{
readonly IMqttClient _connection;
readonly string _name;
public T? Value { get; set; } = default!;
public int? Version { get; set; } = -1;
public WritablePropertyProtobuff(IMqttClient connection, string name, MessageParser parser)
: base(connection, name, new ProtobufSerializer(parser))
{
_connection = connection;
_name = name;
SubscribeTopicPattern = "device/{clientId}/props/{name}/set";
RequestTopicPattern = "device/{clientId}/props/{name}/set";
ResponseTopicPattern = "device/{clientId}/props/{name}/ack";
RetainResponse = true;
PreProcessMessage = tp =>
{
Version = tp.Version;
};
}

public async Task SendMessageAsync(TResp payload, CancellationToken cancellationToken = default)
{
var prop = new ReadOnlyProperty<TResp>(_connection, _name)
{
TopicPattern = "device/{clientId}/props/{name}/ack",
WrapMessage = false
};
await prop.SendMessageAsync(payload, cancellationToken);
}

public async Task InitPropertyAsync(string intialState, TResp defaultValue, CancellationToken cancellationToken = default)
{
TResp payload = default!; //TODO use generic ACK for protos
await SendMessageAsync(payload, cancellationToken);

}


}
}
1 change: 1 addition & 0 deletions samples/mqtt-grpc-device/mqtt-grpc-device.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.21.9" />
<PackageReference Include="Grpc.Tools" Version="2.50.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
3 changes: 2 additions & 1 deletion samples/mqtt-grpc-device/protos/mqtt_grpc_sample_device.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// <auto-generated />

using Google.Protobuf;
using mqtt_grpc_device.Serializers;
using mqtt_grpc_device_protos;
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud;
Expand Down Expand Up @@ -29,7 +30,7 @@ internal mqtt_grpc_sample_device(IMqttClient client)
Connection = client;
Props = new Properties();
AllProperties = new ReadOnlyPropertyProtobuff<Properties>(client, string.Empty);
AllTelemetries = new TelemetryProtobuff<Telemetries>(client, string.Empty);
AllTelemetries = new TelemetryProtobuf<Telemetries>(client, string.Empty);
Interval = new WritablePropertyProtobuff<Properties, ack>(client, "interval", Properties.Parser);
Echo = new CommandProtobuff<echoRequest, echoResponse>(client, "echo", echoRequest.Parser);
GetRuntimeStats = new CommandProtobuff<getRuntimeStatsRequest, getRuntimeStatsResponse>(client, "getRuntimeStats", getRuntimeStatsRequest.Parser);
Expand Down
1 change: 1 addition & 0 deletions samples/payload-size/AvroDeviceClient.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.BrokerIoTClient;
using payload_size.Binders;

namespace payload_size;

Expand Down
Loading

0 comments on commit bbceded

Please sign in to comment.