Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/formats #86

Merged
merged 4 commits into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions samples/memmon-protobuff/Device.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ public class Device : BackgroundService
private const bool default_enabled = true;
private const int default_interval = 45;

private MemmonClient client;
private ConnectionSettings connectionSettings;
private MemmonClient? client;
private ConnectionSettings? connectionSettings;

private string infoVersion = string.Empty;

Expand Down Expand Up @@ -102,7 +102,7 @@ private async Task<ack> Property_enabled_UpdateHandler(Properties desired)
var ack = new ack
{
Value = Google.Protobuf.WellKnownTypes.Any.Pack(desired),
Version = client.Property_enabled.Version.Value,
Version = client!.Property_enabled.Version!.Value,
Description = "desired notification accepted",
Status = 200
};
Expand All @@ -126,15 +126,15 @@ private async Task<ack> Property_interval_UpdateHandler(Properties desired)
ack.Value = Google.Protobuf.WellKnownTypes.Any.Pack(desired);
ack.Description = "desired notification accepted";
ack.Status = 200;
ack.Version = client.Property_interval.Version.Value;
ack.Version = client.Property_interval.Version!.Value;
client.Props.Interval = desired.Interval;
}
else
{
ack.Description = "negative values not accepted";
ack.Status = 405;
ack.Value = Google.Protobuf.WellKnownTypes.Any.Pack(client.Props);
ack.Version = client.Property_interval.Version.Value;
ack.Version = client.Property_interval.Version!.Value;
};
return await Task.FromResult(ack);
}
Expand All @@ -161,8 +161,8 @@ private async Task<getRuntimeStatsResponse> Command_getRuntimeStats_Handler(getR
if (req.Mode == getRuntimeStatsMode.Full)
{
result.DiagResults.Add("sdk info:", infoVersion);
result.DiagResults.Add("interval: ", client.Props.Interval.ToString());
result.DiagResults.Add("enabled: ", client.Props.Enabled.ToString());
result.DiagResults.Add("interval: ", client!.Props.Interval.ToString());
result.DiagResults.Add("enabled: ", client!.Props.Enabled.ToString());
result.DiagResults.Add("twin receive: ", twinRecCounter.ToString());
//result.diagnosticResults.Add($"twin sends: ", RidCounter.Current.ToString());
result.DiagResults.Add("telemetry: ", telemetryCounter.ToString());
Expand All @@ -173,25 +173,25 @@ private async Task<getRuntimeStatsResponse> Command_getRuntimeStats_Handler(getR
}

#pragma warning disable IDE0052 // Remove unread private members
private Timer screenRefresher;
private Timer? screenRefresher;
#pragma warning restore IDE0052 // Remove unread private members
private void RefreshScreen(object state)
{
string RenderData()
{
void AppendLineWithPadRight(StringBuilder sb, string s) => sb.AppendLine(s?.PadRight(Console.BufferWidth > 1 ? Console.BufferWidth - 1 : 300));

string enabled_value = client?.Props.Enabled.ToString();
string interval_value = client?.Props.Interval.ToString();
string enabled_value = client!.Props.Enabled!.ToString();
string interval_value = client!.Props.Interval!.ToString();
StringBuilder sb = new();
AppendLineWithPadRight(sb, " ");
AppendLineWithPadRight(sb, $"{connectionSettings?.HostName}:{connectionSettings?.TcpPort}");
AppendLineWithPadRight(sb, $"{connectionSettings.ClientId} (Auth:{connectionSettings.Auth}/ TLS:{connectionSettings.UseTls})");
AppendLineWithPadRight(sb, $"{connectionSettings?.ClientId} (Auth:{connectionSettings!.Auth}/ TLS:{connectionSettings.UseTls})");
AppendLineWithPadRight(sb, " ");
AppendLineWithPadRight(sb, string.Format("{0:8} | {1:15} | {2}", "Property", "Value".PadRight(15), "Version"));
AppendLineWithPadRight(sb, string.Format("{0:8} | {1:15} | {2}", "--------", "-----".PadLeft(15, '-'), "------"));
AppendLineWithPadRight(sb, string.Format("{0:8} | {1:15} | {2}", "enabled".PadRight(8), enabled_value?.PadLeft(15), client.Property_enabled.Version.Value));
AppendLineWithPadRight(sb, string.Format("{0:8} | {1:15} | {2}", "interval".PadRight(8), interval_value?.PadLeft(15), client.Property_interval .Version.Value));
AppendLineWithPadRight(sb, string.Format("{0:8} | {1:15} | {2}", "enabled".PadRight(8), enabled_value?.PadLeft(15), client.Property_enabled.Version!.Value));
AppendLineWithPadRight(sb, string.Format("{0:8} | {1:15} | {2}", "interval".PadRight(8), interval_value?.PadLeft(15), client.Property_interval.Version!.Value));
//AppendLineWithPadRight(sb, string.Format("{0:8} | {1:15} | {2}", "started".PadRight(8), client.Props.Started.Seconds.ToString().PadLeft(15), ""));
AppendLineWithPadRight(sb, " ");
AppendLineWithPadRight(sb, $"Reconnects: {reconnectCounter}");
Expand All @@ -210,6 +210,6 @@ string RenderData()

Console.SetCursorPosition(0, 0);
Console.WriteLine(RenderData());
screenRefresher = new Timer(RefreshScreen, this, 1000, 0);
screenRefresher = new Timer(RefreshScreen!, this, 1000, 0);
}
}
18 changes: 18 additions & 0 deletions samples/memmon-protobuff/Serializers/CommandProtobuff.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using Google.Protobuf;
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.Binders;

namespace Serializers;

public class CommandProtobuff<T, TResp> : CloudToDeviceBinder<T, TResp>, ICommand<T, TResp>
{
public CommandProtobuff(IMqttClient client, string name, MessageParser parser)
: base(client, name, new ProtobufSerializer(parser))
{
UnwrapRequest = false;
RequestTopicPattern = "device/{clientId}/cmd/{name}";
SubscribeTopicPattern = "device/{clientId}/cmd/{name}";
ResponseTopicPattern = "device/{clientId}/cmd/{name}/resp";
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Google.Protobuf;
using MQTTnet.Extensions.MultiCloud;

namespace MQTTnet.Extensions.MultiCloud.Serializers;
namespace Serializers;

public class ProtobufSerializer : IMessageSerializer
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.Binders;
using MQTTnet.Extensions.MultiCloud.Serializers;

namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient;
namespace Serializers;

public class ReadOnlyPropertyProtobuff<T> : DeviceToCloudBinder<T>, IReadOnlyProperty<T>
{
Expand Down
20 changes: 20 additions & 0 deletions samples/memmon-protobuff/Serializers/TelemetryProtobuf.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.Binders;

namespace Serializers;

public class TelemetryProtobuf<T> : DeviceToCloudBinder<T>, ITelemetry<T>
{

public TelemetryProtobuf(IMqttClient mqttClient) :
this(mqttClient, string.Empty)
{ }

public TelemetryProtobuf(IMqttClient mqttClient, string name)
: base(mqttClient, name, new ProtobufSerializer())
{
TopicPattern = "device/{clientId}/tel";
WrapMessage = false;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using Google.Protobuf;
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.Binders;
using MQTTnet.Extensions.MultiCloud.BrokerIoTClient;
using MQTTnet.Extensions.MultiCloud.Serializers;

namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient
namespace Serializers
{
public class WritablePropertyProtobuff<T, TResp> : CloudToDeviceBinder<T, TResp>, IWritableProperty<T, TResp>, IDeviceToCloud<TResp>
{
Expand All @@ -16,7 +18,7 @@ public WritablePropertyProtobuff(IMqttClient connection, string name, MessagePar
{
_connection = connection;
_name = name;
SubscribeTopicPattern = "device/{clientId}/props/{name}/set";
SubscribeTopicPattern = "device/{clientId}/props/{name}/set/#";
RequestTopicPattern = "device/{clientId}/props/{name}/set";
ResponseTopicPattern = "device/{clientId}/props/{name}/ack";
RetainResponse = true;
Expand All @@ -40,9 +42,9 @@ public async Task InitPropertyAsync(string intialState, TResp defaultValue, Canc
{
TResp payload = default!; //TODO use generic ACK for protos
await SendMessageAsync(payload, cancellationToken);

}


}
}
3 changes: 2 additions & 1 deletion samples/memmon-protobuff/_protos/memmon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.BrokerIoTClient;
using Serializers;
using System;
using System.Collections.Generic;
using System.Linq;
Expand All @@ -23,7 +24,7 @@ internal class MemmonClient
public MemmonClient(IMqttClient c)
{
AllProperties = new ReadOnlyPropertyProtobuff<Properties>(c);
AllTelemetry = new TelemetryProtobuff<Telemetries>(c);
AllTelemetry = new TelemetryProtobuf<Telemetries>(c);
Property_interval = new WritablePropertyProtobuff<Properties, ack>(c, "interval", Properties.Parser);
Property_enabled = new WritablePropertyProtobuff<Properties, ack>(c, "enabled", Properties.Parser);
getRuntimeStats = new CommandProtobuff<getRuntimeStatsRequest, getRuntimeStatsResponse>(c, "getRuntimeStats", getRuntimeStatsRequest.Parser);
Expand Down
2 changes: 2 additions & 0 deletions samples/memmon-protobuff/memmon-protobuff.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<IsPackable>false</IsPackable>
<Nullable>enable</Nullable>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
</PropertyGroup>


<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.21.9" />
<PackageReference Include="Grpc.Tools" Version="2.50.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
17 changes: 17 additions & 0 deletions samples/mqtt-grpc-device/Serializers/CommandProtobuff.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using Google.Protobuf;
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.Binders;

namespace mqtt_grpc_device.Serializers;

public class CommandProtobuff<T, TResp> : CloudToDeviceBinder<T, TResp>, ICommand<T, TResp>
{
public CommandProtobuff(IMqttClient client, string name, MessageParser parser)
: base(client, name, new ProtobufSerializer(parser))
{
UnwrapRequest = false;
RequestTopicPattern = "device/{clientId}/cmd/{name}";
ResponseTopicPattern = "device/{clientId}/cmd/{name}/resp";
}
}
41 changes: 41 additions & 0 deletions samples/mqtt-grpc-device/Serializers/ProtobufSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using Google.Protobuf;
using MQTTnet.Extensions.MultiCloud;

namespace mqtt_grpc_device.Serializers;

public class ProtobufSerializer : IMessageSerializer
{
private readonly MessageParser? _parser;
public ProtobufSerializer() { }
public ProtobufSerializer(MessageParser parser) => _parser = parser;
public byte[] ToBytes<T>(T payload, string name = "") => (payload as IMessage).ToByteArray();

public bool TryReadFromBytes<T>(byte[] payload, string name, out T result)
{
if (payload == null || payload.Length == 0)
{
result = default!;
return false;
}
bool found = false;
IMessage msg = _parser!.ParseFrom(payload);
if (string.IsNullOrEmpty(name))
{
found = true;
result = (T)msg;
}
else
{
if (msg.ToString()!.Contains(name)) // find better way
{
result = (T)msg;
found = true;
}
else
{
result = default!;
}
}
return found;
}
}
19 changes: 19 additions & 0 deletions samples/mqtt-grpc-device/Serializers/ReadOnlyPropertyProtobuff.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.Binders;
using MQTTnet.Extensions.MultiCloud.Serializers;

namespace mqtt_grpc_device.Serializers;

public class ReadOnlyPropertyProtobuff<T> : DeviceToCloudBinder<T>, IReadOnlyProperty<T>
{
public ReadOnlyPropertyProtobuff(IMqttClient mqttClient) : this(mqttClient, string.Empty) { }

public ReadOnlyPropertyProtobuff(IMqttClient mqttClient, string name)
: base(mqttClient, name, new ProtobufSerializer())
{
TopicPattern = "device/{clientId}/props";
WrapMessage = false;
Retain = true;
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.Binders;
using MQTTnet.Extensions.MultiCloud.Serializers;

namespace MQTTnet.Extensions.MultiCloud.AzureIoTClient;
namespace mqtt_grpc_device.Serializers;

public class TelemetryProtobuf<T> : DeviceToCloudBinder<T>, ITelemetry<T>
{

public TelemetryProtobuf(IMqttClient mqttClient) :
this(mqttClient, string.Empty)
{ }

public TelemetryProtobuf(IMqttClient mqttClient, string name)
: base(mqttClient, name, new ProtobufSerializer())
{
Expand Down
51 changes: 51 additions & 0 deletions samples/mqtt-grpc-device/Serializers/WritablePropertyProtobuff.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using Google.Protobuf;
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.Binders;
using MQTTnet.Extensions.MultiCloud.BrokerIoTClient;
using System.Threading;
using System.Threading.Tasks;

namespace mqtt_grpc_device.Serializers
{
public class WritablePropertyProtobuff<T, TResp> : CloudToDeviceBinder<T, TResp>, IWritableProperty<T, TResp>, IDeviceToCloud<TResp>
{
readonly IMqttClient _connection;
readonly string _name;
public T? Value { get; set; } = default!;
public int? Version { get; set; } = -1;
public WritablePropertyProtobuff(IMqttClient connection, string name, MessageParser parser)
: base(connection, name, new ProtobufSerializer(parser))
{
_connection = connection;
_name = name;
SubscribeTopicPattern = "device/{clientId}/props/{name}/set";
RequestTopicPattern = "device/{clientId}/props/{name}/set";
ResponseTopicPattern = "device/{clientId}/props/{name}/ack";
RetainResponse = true;
PreProcessMessage = tp =>
{
Version = tp.Version;
};
}

public async Task SendMessageAsync(TResp payload, CancellationToken cancellationToken = default)
{
var prop = new ReadOnlyProperty<TResp>(_connection, _name)
{
TopicPattern = "device/{clientId}/props/{name}/ack",
WrapMessage = false
};
await prop.SendMessageAsync(payload, cancellationToken);
}

public async Task InitPropertyAsync(string intialState, TResp defaultValue, CancellationToken cancellationToken = default)
{
TResp payload = default!; //TODO use generic ACK for protos
await SendMessageAsync(payload, cancellationToken);

}


}
}
1 change: 1 addition & 0 deletions samples/mqtt-grpc-device/mqtt-grpc-device.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.21.9" />
<PackageReference Include="Grpc.Tools" Version="2.50.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
3 changes: 2 additions & 1 deletion samples/mqtt-grpc-device/protos/mqtt_grpc_sample_device.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// <auto-generated />

using Google.Protobuf;
using mqtt_grpc_device.Serializers;
using mqtt_grpc_device_protos;
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud;
Expand Down Expand Up @@ -29,7 +30,7 @@ internal mqtt_grpc_sample_device(IMqttClient client)
Connection = client;
Props = new Properties();
AllProperties = new ReadOnlyPropertyProtobuff<Properties>(client, string.Empty);
AllTelemetries = new TelemetryProtobuff<Telemetries>(client, string.Empty);
AllTelemetries = new TelemetryProtobuf<Telemetries>(client, string.Empty);
Interval = new WritablePropertyProtobuff<Properties, ack>(client, "interval", Properties.Parser);
Echo = new CommandProtobuff<echoRequest, echoResponse>(client, "echo", echoRequest.Parser);
GetRuntimeStats = new CommandProtobuff<getRuntimeStatsRequest, getRuntimeStatsResponse>(client, "getRuntimeStats", getRuntimeStatsRequest.Parser);
Expand Down
1 change: 1 addition & 0 deletions samples/payload-size/AvroDeviceClient.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.BrokerIoTClient;
using payload_size.Binders;

namespace payload_size;

Expand Down
Loading