Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WProperty initializers #73

Merged
merged 43 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
3e32063
add 2nd cmd to grpc
ridomin Sep 23, 2022
664c077
updating nugets
ridomin Oct 3, 2022
47a3f56
rm ns2 if debug
ridomin Oct 4, 2022
1540b57
rm dupe task timeout
ridomin Oct 4, 2022
3287b1b
do not install 3.1
ridomin Oct 4, 2022
f215b04
code analysis clean
ridomin Oct 5, 2022
e17fb6a
finishing code analysis
ridomin Oct 5, 2022
e17f4aa
clean mmemon csproj
ridomin Oct 5, 2022
cabe478
start memmon2
ridomin Oct 5, 2022
adc0cab
empty commands
ridomin Oct 6, 2022
2cd537d
empty commands
ridomin Oct 6, 2022
8394867
Add empty commands to hub binders
ridomin Oct 6, 2022
3e989e6
Add empty commands to hub binders
ridomin Oct 6, 2022
8732b10
added primes and malloc/free
ridomin Oct 6, 2022
49b5809
fixing merge
rido-min Oct 6, 2022
b1b23a8
tryDeserialize
ridomin Oct 7, 2022
6970a4c
Merge branch 'dev' of https://github.com/iotmodels/MQTTnet.Extensions…
ridomin Oct 7, 2022
fe71f66
clean retained
ridomin Oct 7, 2022
9811680
remove from bytes
ridomin Oct 7, 2022
11daa97
Merge branch 'dev' into feat/memmon2
ridomin Oct 10, 2022
d79e46e
start memmon2
ridomin Oct 5, 2022
574745e
added primes and malloc/free
ridomin Oct 6, 2022
ecfb638
clean retained
ridomin Oct 7, 2022
2e45a3a
remove from bytes
ridomin Oct 7, 2022
d4ce991
Merge branch 'dev' of https://github.com/iotmodels/MQTTnet.Extensions…
ridomin Oct 10, 2022
516d8b4
2nd telemetry
ridomin Oct 10, 2022
8dcff51
add 2nd telemetry
ridomin Oct 10, 2022
6c830fa
ref init props
ridomin Oct 10, 2022
3b1f19a
GetTwin discard sub
ridomin Oct 10, 2022
172fec2
hub requires # to subscribe to commands
ridomin Oct 10, 2022
b7a9222
Merge branch 'dev' into feat/initprops
ridomin Oct 10, 2022
ef5e3c6
ref init properties
ridomin Oct 10, 2022
ee89ce4
add managed memory
ridomin Oct 10, 2022
8879f43
right GC stats
ridomin Oct 10, 2022
941206d
exclude gettwin test
ridomin Oct 10, 2022
a9ca110
exclude gettwin test
ridomin Oct 10, 2022
6dae246
clean Twin Initializer
ridomin Oct 10, 2022
5557b3f
split subs in c2d binder
ridomin Oct 10, 2022
1b35b78
memmon factory as service
ridomin Oct 10, 2022
121caf1
upd model
ridomin Oct 11, 2022
05f6447
clean warnings
ridomin Oct 11, 2022
2cf8086
use pi-sense factory
ridomin Oct 11, 2022
22742fc
delete unused code
ridomin Oct 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .github/workflows/push2nuget.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ jobs:
- name: Setup .NET
uses: actions/setup-dotnet@v2
with:
dotnet-version: |
6.0.x
3.1.x
dotnet-version: 6.0.x

- name: Restore dependencies
run: dotnet restore
Expand Down
4 changes: 2 additions & 2 deletions samples/iothub-sample/Device.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ public Device(ILogger<Device> logger, IConfiguration configuration)
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var connectionSettings = new ConnectionSettings(_configuration.GetConnectionString("cs"));
_logger.LogWarning($"Connecting to: {connectionSettings}");
_logger.LogWarning("Connecting to: {connectionSettings}", connectionSettings);

var client = new HubMqttClient(await HubDpsFactory.CreateFromConnectionSettingsAsync(connectionSettings, stoppingToken));

var v = await client.UpdateTwinAsync(new { started = DateTime.Now }, stoppingToken);
_logger.LogInformation($" Updated Twin to verison: {v} ");
_logger.LogInformation("Updated Twin to verison: {v}", v);
var twin = await client.GetTwinAsync(stoppingToken);
Console.WriteLine(twin);

Expand Down
10 changes: 4 additions & 6 deletions samples/memmon-protobuff/Device.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ public class Device : BackgroundService
private double telemetryWorkingSet = 0;
private const bool default_enabled = true;
private const int default_interval = 45;

private string lastDiscconectReason = string.Empty;


private MemmonClient client;
private ConnectionSettings connectionSettings;

Expand All @@ -44,7 +42,7 @@ public Device(ILogger<Device> logger, IConfiguration configuration, TelemetryCli
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var cs = new ConnectionSettings(_configuration.GetConnectionString("cs")) { ModelId = MemmonClient.ModelId };
_logger.LogWarning($"Connecting to..{cs}");
_logger.LogWarning("Connecting to..{cs}", cs);
var mqtt = await BrokerClientFactory.CreateFromConnectionSettingsAsync(cs, true, stoppingToken);
connectionSettings = cs;
mqtt.DisconnectedAsync += Connection_DisconnectedAsync;
Expand All @@ -63,7 +61,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
client.Props.Interval = default_interval;


await client.AllProperties.SendMessageAsync(client.Props);
await client.AllProperties.SendMessageAsync(client.Props, stoppingToken);

RefreshScreen(this);

Expand All @@ -88,7 +86,7 @@ private async Task Connection_DisconnectedAsync(MQTTnet.Client.MqttClientDisconn
_telemetryClient.TrackException(arg.Exception);
}

lastDiscconectReason = arg.ReasonString;
//lastDiscconectReason = arg.ReasonString;
reconnectCounter++;
await Task.Yield();
}
Expand Down
2 changes: 1 addition & 1 deletion samples/memmon-protobuff/_protos/memmon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace _protos
internal class MemmonClient
{
internal const string ModelId = "rido.memmon";
public Properties Props = new Properties();
public Properties Props = new();
public IReadOnlyProperty<Properties> AllProperties { get; set; }
public IWritableProperty<Properties, ack> Property_interval { get; set; }
public IWritableProperty<Properties, ack> Property_enabled { get; set; }
Expand Down
2 changes: 1 addition & 1 deletion samples/memmon-protobuff/memmon-protobuff.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.21.6" />
<PackageReference Include="Google.Protobuf" Version="3.21.7" />
<PackageReference Include="Grpc.Tools" Version="2.49.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
102 changes: 71 additions & 31 deletions samples/memmon/Device.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
using dtmi_rido_pnp_memmon;
using dtmi_rido_memmon;
using Humanizer;
using Microsoft.ApplicationInsights;
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.AzureIoTClient;
using MQTTnet.Extensions.MultiCloud.BrokerIoTClient;
using MQTTnet.Extensions.MultiCloud.Connections;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Text;

namespace memmon;

public class Device : BackgroundService
{
private readonly ILogger<Device> _logger;
private readonly IConfiguration _configuration;
private readonly TelemetryClient _telemetryClient;

private readonly Stopwatch clock = Stopwatch.StartNew();
Expand All @@ -23,65 +20,60 @@ public class Device : BackgroundService
private int reconnectCounter = 0;

private double telemetryWorkingSet = 0;
private double managedMemory = 0;
private const bool default_enabled = true;
private const int default_interval = 45;
private const int default_interval = 500;

private string lastDiscconectReason = string.Empty;

private Imemmon client;
private ConnectionSettings connectionSettings;
private readonly MemMonFactory memmonFactory;

private string infoVersion = string.Empty;

public Device(ILogger<Device> logger, IConfiguration configuration, TelemetryClient tc)
public Device(TelemetryClient tc, MemMonFactory clientFactory)
{
_logger = logger;
_configuration = configuration;
_telemetryClient = tc;
memmonFactory = clientFactory;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var cs = new ConnectionSettings(_configuration.GetConnectionString("cs"));
_logger.LogWarning($"Connecting to..{cs}");
var memmonFactory = new MemMonFactory(_configuration);
client = await memmonFactory.CreateMemMonClientAsync(_configuration.GetConnectionString("cs"), stoppingToken);
client = await memmonFactory.CreateMemMonClientAsync("cs", stoppingToken);

client.Connection.DisconnectedAsync += Connection_DisconnectedAsync;

connectionSettings = MemMonFactory.connectionSettings;
_logger.LogWarning("Connected");

infoVersion = MemMonFactory.NuGetPackageVersion;

client.Property_enabled.OnMessage = Property_enabled_UpdateHandler;
client.Property_interval.OnMessage= Property_interval_UpdateHandler;
client.Command_getRuntimeStats.OnMessage= Command_getRuntimeStats_Handler;
client.Command_isPrime.OnMessage = Command_isPrime_Handler;
client.Command_malloc.OnMessage = Command_malloc_Hanlder;
client.Command_free.OnMessage = Command_free_Hanlder;

if (client is HubMqttClient)
{
await TwinInitializer.InitPropertyAsync(client.Connection, client.InitialState, client.Property_interval, "interval", default_interval);
await TwinInitializer.InitPropertyAsync(client.Connection, client.InitialState, client.Property_enabled, "enabled", default_enabled);
}
else
{
await PropertyInitializer.InitPropertyAsync(client.Property_interval, default_interval);
await PropertyInitializer.InitPropertyAsync(client.Property_enabled, default_enabled);
}
await client.Property_enabled.InitPropertyAsync(client.InitialState, default_enabled, stoppingToken);
await client.Property_interval.InitPropertyAsync(client.InitialState, default_interval, stoppingToken);


await client.Property_started.SendMessageAsync(DateTime.Now);
await client.Property_started.SendMessageAsync(DateTime.Now, stoppingToken);

RefreshScreen(this);

while (!stoppingToken.IsCancellationRequested)
{
if (client.Property_enabled.Value == true)
{
telemetryWorkingSet = Environment.WorkingSet;
telemetryWorkingSet = Environment.WorkingSet.Bytes().Megabytes;
managedMemory = GC.GetTotalMemory(true).Bytes().Megabytes;
await client.Telemetry_workingSet.SendMessageAsync(telemetryWorkingSet, stoppingToken);
await client.Telemetry_managedMemory.SendMessageAsync(managedMemory, stoppingToken);
telemetryCounter++;
_telemetryClient.TrackMetric("WorkingSet", telemetryWorkingSet);
_telemetryClient.TrackMetric("managedMemory", managedMemory);
}
await Task.Delay(client.Property_interval.Value * 1000, stoppingToken);
await Task.Delay(client.Property_interval.Value, stoppingToken);
}
}

Expand Down Expand Up @@ -150,6 +142,51 @@ private async Task<Ack<int>> Property_interval_UpdateHandler(int p)
return await Task.FromResult(ack);
}

private async Task<bool> Command_isPrime_Handler(int number)
{
commandCounter++;
IEnumerable<string> Multiples(int number)
{
return from n1 in Enumerable.Range(2, number / 2)
from n2 in Enumerable.Range(2, n1)
where n1 * n2 == number
select $"{n1} x {n2} => {number}";
}

bool result = Multiples(number).Any();
return await Task.FromResult(!result);

}

List<string> memory = new();
IntPtr memoryPtr = IntPtr.Zero;
private async Task<string> Command_malloc_Hanlder(int number)
{
commandCounter++;
for (int i = 0; i < number; i++)
{
memory.Add(i.ToOrdinalWords());
}

memoryPtr = Marshal.AllocHGlobal(number);
return await Task.FromResult(string.Empty);
}

private async Task<string> Command_free_Hanlder(string empty)
{
commandCounter++;
await _telemetryClient.FlushAsync(CancellationToken.None);
memory = new List<string>();
GC.Collect(2, GCCollectionMode.Forced, false);
if (memoryPtr != IntPtr.Zero)
{
Marshal.FreeHGlobal(memoryPtr);
memoryPtr = IntPtr.Zero;
}
return await Task.FromResult(string.Empty);
}


private async Task<Dictionary<string, string>> Command_getRuntimeStats_Handler(DiagnosticsMode req)
{
commandCounter++;
Expand Down Expand Up @@ -180,6 +217,8 @@ private async Task<Dictionary<string, string>> Command_getRuntimeStats_Handler(D
result.Add("telemetry: ", telemetryCounter.ToString());
result.Add("command: ", commandCounter.ToString());
result.Add("reconnects: ", reconnectCounter.ToString());
result.Add("workingSet", Environment.WorkingSet.Bytes().ToString());
result.Add("GC Memmory", GC.GetTotalAllocatedBytes().Bytes().ToString());
}
return await Task.FromResult(result);
}
Expand Down Expand Up @@ -212,7 +251,8 @@ string RenderData()
//AppendLineWithPadRight(sb, $"Twin send: {RidCounter.Current}");
AppendLineWithPadRight(sb, $"Command messages: {commandCounter}");
AppendLineWithPadRight(sb, " ");
AppendLineWithPadRight(sb, $"WorkingSet: {telemetryWorkingSet.Bytes()}");
AppendLineWithPadRight(sb, $"WorkingSet: {telemetryWorkingSet} MB");
AppendLineWithPadRight(sb, $"ManagedMemory: {managedMemory} MB");
AppendLineWithPadRight(sb, " ");
AppendLineWithPadRight(sb, $"Time Running: {TimeSpan.FromMilliseconds(clock.ElapsedMilliseconds).Humanize(3)}");
AppendLineWithPadRight(sb, $"ConnectionStatus: {client.Connection.IsConnected} [{lastDiscconectReason}]");
Expand Down
52 changes: 31 additions & 21 deletions samples/memmon/MemMonFactory.cs
Original file line number Diff line number Diff line change
@@ -1,31 +1,35 @@
using dtmi_rido_pnp_memmon;
using dtmi_rido_memmon;
using MQTTnet.Extensions.MultiCloud.AwsIoTClient;
using MQTTnet.Extensions.MultiCloud.AzureIoTClient;
using MQTTnet.Extensions.MultiCloud.BrokerIoTClient;
using MQTTnet.Extensions.MultiCloud.Connections;

namespace memmon;

internal class MemMonFactory
public class MemMonFactory
{
static string nugetPackageVersion = string.Empty;
public static string NuGetPackageVersion => nugetPackageVersion;
internal static string ComputeDeviceKey(string masterKey, string deviceId) =>
Convert.ToBase64String(new System.Security.Cryptography.HMACSHA256(Convert.FromBase64String(masterKey)).ComputeHash(System.Text.Encoding.UTF8.GetBytes(deviceId)));

IConfiguration _configuration;
readonly IConfiguration _configuration;
readonly ILogger<MemMonFactory> _logger;

internal static ConnectionSettings connectionSettings;

public MemMonFactory(IConfiguration configuration)
public MemMonFactory(IConfiguration configuration, ILogger<MemMonFactory> logger)
{
this._configuration = configuration;
_configuration = configuration;
_logger = logger;
}

public async Task<Imemmon> CreateMemMonClientAsync(string connectionString, CancellationToken cancellationToken = default)
public async Task<Imemmon> CreateMemMonClientAsync(string connectinStringName, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(connectionString, nameof(connectionString));
connectionSettings = new ConnectionSettings(_configuration.GetConnectionString("cs"));
Imemmon client;
string connectionString = _configuration.GetConnectionString(connectinStringName);
connectionSettings = new ConnectionSettings(connectionString);
_logger.LogWarning("Connecting to..{cs}", connectionSettings);
if (connectionString.Contains("IdScope") || connectionString.Contains("SharedAccessKey"))
{
if (connectionSettings.IdScope != null && _configuration["masterKey"] != null)
Expand All @@ -34,52 +38,58 @@ public async Task<Imemmon> CreateMemMonClientAsync(string connectionString, Canc
var masterKey = _configuration.GetValue<string>("masterKey");
var deviceKey = ComputeDeviceKey(masterKey, deviceId);
var newCs = $"IdScope={connectionSettings.IdScope};DeviceId={deviceId};SharedAccessKey={deviceKey};SasMinutes={connectionSettings.SasMinutes}";
return await CreateHubClientAsync(newCs, cancellationToken);
client = await CreateHubClientAsync(newCs, cancellationToken);
}
else
{
return await CreateHubClientAsync(connectionString, cancellationToken);
client = await CreateHubClientAsync(connectionString, cancellationToken);
}
}
else if (connectionSettings.HostName.Contains("amazonaws.com"))
{
return await CreateAwsClientAsync(connectionString, cancellationToken);
client = await CreateAwsClientAsync(connectionString, cancellationToken);
}
else if (connectionSettings.HostName.Contains("azure-devices.net"))
{
return await CreateHubClientAsync(connectionString, cancellationToken);
client = await CreateHubClientAsync(connectionString, cancellationToken);
}
else
{
return await CreateBrokerClientAsync(connectionString, cancellationToken);
client = await CreateBrokerClientAsync(connectionString, cancellationToken);
}

_logger.LogWarning("Connected");
return client;
}

static async Task<dtmi_rido_pnp_memmon.mqtt.memmon> CreateBrokerClientAsync(string connectionString, CancellationToken cancellationToken = default)
static async Task<dtmi_rido_memmon.mqtt.memmon> CreateBrokerClientAsync(string connectionString, CancellationToken cancellationToken = default)
{
var cs = new ConnectionSettings(connectionString) { ModelId = Imemmon.ModelId };
var mqtt = await BrokerClientFactory.CreateFromConnectionSettingsAsync(cs, true, cancellationToken);
connectionSettings = BrokerClientFactory.ComputedSettings;
var client = new dtmi_rido_pnp_memmon.mqtt.memmon(mqtt);
var client = new dtmi_rido_memmon.mqtt.memmon(mqtt)
{
InitialState = String.Empty
};
nugetPackageVersion = BrokerClientFactory.NuGetPackageVersion;
return client;
}

static async Task<dtmi_rido_pnp_memmon.hub.memmon> CreateHubClientAsync(string connectionString, CancellationToken cancellationToken = default)
static async Task<dtmi_rido_memmon.hub.memmon> CreateHubClientAsync(string connectionString, CancellationToken cancellationToken = default)
{
var cs = new ConnectionSettings(connectionString) { ModelId = Imemmon.ModelId };
var hub = await HubDpsFactory.CreateFromConnectionSettingsAsync(cs);
var hub = await HubDpsFactory.CreateFromConnectionSettingsAsync(cs, cancellationToken);
connectionSettings = HubDpsFactory.ComputedSettings;
var client = new dtmi_rido_pnp_memmon.hub.memmon(hub);
var client = new dtmi_rido_memmon.hub.memmon(hub);
nugetPackageVersion = HubDpsFactory.NuGetPackageVersion;
client.InitialState = await client.GetTwinAsync();
client.InitialState = await client.GetTwinAsync(cancellationToken);
return client;
}

static async Task<dtmi_rido_pnp_memmon.aws.memmon> CreateAwsClientAsync(string connectionString, CancellationToken cancellationToken = default)
static async Task<dtmi_rido_memmon.aws.memmon> CreateAwsClientAsync(string connectionString, CancellationToken cancellationToken = default)
{
var mqtt = await AwsClientFactory.CreateFromConnectionSettingsAsync(connectionString, cancellationToken);
var client = new dtmi_rido_pnp_memmon.aws.memmon(mqtt);
var client = new dtmi_rido_memmon.aws.memmon(mqtt);
nugetPackageVersion = AwsClientFactory.NuGetPackageVersion;
return client;
}
Expand Down
1 change: 1 addition & 0 deletions samples/memmon/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
.ConfigureServices(services =>
{
services.AddApplicationInsightsTelemetryWorkerService();
services.AddSingleton<MemMonFactory>();
services.AddHostedService<Device>();
})
.Build();
Expand Down
2 changes: 1 addition & 1 deletion samples/memmon/Properties/launchSettings.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
"mosquitto_tls": {
"commandName": "Project",
"environmentVariables": {
"ConnectionStrings__cs": "HostName=test.mosquitto.org;ClientId=memmon01;CaFile=test.mosquitto.org.crt"
"ConnectionStrings__cs": "HostName=test.mosquitto.org;ClientId=memmon01;CaFile=test.mosquitto.org.pem"
}
}
}
Expand Down
File renamed without changes.
Loading