Skip to content

Commit

Permalink
Review AWS Support (#77)
Browse files Browse the repository at this point in the history
* add compSettings to pi-sense sample

* upd trx in ci (#76)

* upd trx in ci

* upd trx to 2.2.1

* fix aws connect test

* Feat/aws (#75)

* add aws samples, and retries

* get update shadow

* revisit AWS impl

* review aws sample

* upd aws readme

Co-authored-by: ridomin <[email protected]>

* init WP from shadow

* shadow versioning

* rev v5

* clean warning

Co-authored-by: rido-min <[email protected]>
  • Loading branch information
ridomin and rido-min authored Oct 18, 2022
1 parent 0133048 commit 87e6a9a
Show file tree
Hide file tree
Showing 31 changed files with 442 additions and 188 deletions.
4 changes: 1 addition & 3 deletions .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ jobs:
- name: Setup .NET
uses: actions/setup-dotnet@v2
with:
dotnet-version: |
6.0.x
3.1.x
dotnet-version: 6.0.x

- name: Restore dependencies
run: dotnet restore
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:

- name: Process trx reports with default
if: always()
uses: im-open/process-dotnet-test-results@v2.1.3
uses: im-open/process-dotnet-test-results@v2.2.1
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
*.sln.docstates

**/launchSettings.json
.vscode/

# User-specific files (MonoDevelop/Xamarin Studio)
*.userprefs
Expand Down
8 changes: 8 additions & 0 deletions MQTTnet.Extensions.MultiCloud.sln
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{22965CB2-1455-4938-9783-416B192B79D0}"
ProjectSection(SolutionItems) = preProject
docs\arch.png = docs\arch.png
docs\aws.md = docs\aws.md
docs\ConnectionSettings.md = docs\ConnectionSettings.md
docs\feat-matrix.md = docs\feat-matrix.md
docs\iotpnp-128.png = docs\iotpnp-128.png
Expand Down Expand Up @@ -63,6 +64,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "payload-size", "samples\pay
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "memmon-protobuff", "samples\memmon-protobuff\memmon-protobuff.csproj", "{20B75646-CBD2-4E72-8C56-22887A519FA3}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "aws-sample", "samples\aws-sample\aws-sample.csproj", "{713F4937-160C-4CA3-9F9B-91DD91E7F5AC}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -121,6 +124,10 @@ Global
{20B75646-CBD2-4E72-8C56-22887A519FA3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{20B75646-CBD2-4E72-8C56-22887A519FA3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{20B75646-CBD2-4E72-8C56-22887A519FA3}.Release|Any CPU.Build.0 = Release|Any CPU
{713F4937-160C-4CA3-9F9B-91DD91E7F5AC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{713F4937-160C-4CA3-9F9B-91DD91E7F5AC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{713F4937-160C-4CA3-9F9B-91DD91E7F5AC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{713F4937-160C-4CA3-9F9B-91DD91E7F5AC}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -135,6 +142,7 @@ Global
{AF019503-8813-4967-B858-0DDAE43C2073} = {F5E59EDE-6E77-484C-AB93-C4651F43B9A7}
{DF590535-2FDC-4A0A-9EE4-7C9BF818C7B4} = {F5E59EDE-6E77-484C-AB93-C4651F43B9A7}
{20B75646-CBD2-4E72-8C56-22887A519FA3} = {F5E59EDE-6E77-484C-AB93-C4651F43B9A7}
{713F4937-160C-4CA3-9F9B-91DD91E7F5AC} = {F5E59EDE-6E77-484C-AB93-C4651F43B9A7}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {09914DD8-50B2-48E3-B9AB-7764AE36AD6B}
Expand Down
23 changes: 23 additions & 0 deletions docs/aws.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Connecting to AWS IoT Core

AWS IoT Core requires X509 client certificates to connecto to the MQTT endpoint.

These certificates can be self-signed or CA signed.

Additionally you might need to create a Thing identity, or use a Provisioning template.

https://docs.aws.amazon.com/iot/latest/developerguide/single-thing-provisioning.html

https://docs.aws.amazon.com/iot/latest/developerguide/auto-register-device-cert.html

https://aws.amazon.com/blogs/iot/just-in-time-registration-of-device-certificates-on-aws-iot/

As with any other MQTT broker, you can use WithConnectionSettings including the X509Key

To support JIT, the first connection always fails and a retry is needed, in that case you can use the AwsClientFactory


## Shadows

To use shadows, you must configure a "thing", associate to a ceritifcate, and enable a classic shadow.

53 changes: 53 additions & 0 deletions samples/aws-sample/Device.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.AwsIoTClient;
using MQTTnet.Extensions.MultiCloud.Connections;

namespace aws_sample
{
public class Device : BackgroundService
{
private readonly ILogger<Device> _logger;
private readonly IConfiguration _configuration;

public Device(ILogger<Device> logger, IConfiguration configuration)
{
_logger = logger;
_configuration = configuration;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
ConnectionSettings cs = new (_configuration.GetConnectionString("cs"));
var mqtt = await AwsClientFactory.CreateFromConnectionSettingsAsync(cs, false, stoppingToken);
Console.WriteLine(mqtt.IsConnected);
Console.WriteLine(AwsClientFactory.ComputedSettings);
var client = new AwsMqttClient(mqtt);
var shadow = await client.GetShadowAsync(stoppingToken);
Console.WriteLine(shadow);

var res = await client.UpdateShadowAsync(new { myProp = "hello 123" }, stoppingToken);
Console.WriteLine(res);
shadow = await client.GetShadowAsync(stoppingToken);
Console.WriteLine(shadow.Contains("myProp"));

WritableProperty<string> wp = new(mqtt, "myWProp")
{
OnMessage = async m =>
{
Console.WriteLine(m);
return await Task.FromResult(new Ack<string> { Value = m });
}
};


await wp.InitPropertyAsync(shadow, "my default val", stoppingToken);


while (!stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
await Task.Delay(1000, stoppingToken);
}
}
}
}
10 changes: 10 additions & 0 deletions samples/aws-sample/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using aws_sample;

IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.AddHostedService<Device>();
})
.Build();

await host.RunAsync();
8 changes: 8 additions & 0 deletions samples/aws-sample/appsettings.Development.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.Hosting.Lifetime": "Information"
}
}
}
8 changes: 8 additions & 0 deletions samples/aws-sample/appsettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.Hosting.Lifetime": "Information"
}
}
}
18 changes: 18 additions & 0 deletions samples/aws-sample/aws-sample.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk.Worker">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<UserSecretsId>dotnet-aws_sample-7C0BD790-EA55-4969-A22F-6B86B301A627</UserSecretsId>
<RootNamespace>aws_sample</RootNamespace>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\MQTTnet.Extensions.MultiCloud.AwsIoTClient\MQTTnet.Extensions.MultiCloud.AwsIoTClient.csproj" />
</ItemGroup>
</Project>
5 changes: 3 additions & 2 deletions samples/memmon/Device.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
client.Command_malloc.OnMessage = Command_malloc_Hanlder;
client.Command_free.OnMessage = Command_free_Hanlder;

await client.Property_enabled.InitPropertyAsync(client.InitialState, default_enabled, stoppingToken);
await client.Property_started.SendMessageAsync(DateTime.Now, stoppingToken);

await client.Property_interval.InitPropertyAsync(client.InitialState, default_interval, stoppingToken);
await client.Property_enabled.InitPropertyAsync(client.InitialState, default_enabled, stoppingToken);

await client.Property_started.SendMessageAsync(DateTime.Now, stoppingToken);

RefreshScreen(this);

Expand Down
4 changes: 3 additions & 1 deletion samples/memmon/MemMonFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,11 @@ public async Task<Imemmon> CreateMemMonClientAsync(string connectinStringName, C

static async Task<dtmi_rido_memmon.aws.memmon> CreateAwsClientAsync(string connectionString, CancellationToken cancellationToken = default)
{
var mqtt = await AwsClientFactory.CreateFromConnectionSettingsAsync(connectionString, cancellationToken);
var mqtt = await AwsClientFactory.CreateFromConnectionSettingsAsync(connectionString, true, cancellationToken);
var client = new dtmi_rido_memmon.aws.memmon(mqtt);
connectionSettings = AwsClientFactory.ComputedSettings;
nugetPackageVersion = AwsClientFactory.NuGetPackageVersion;
client.InitialState = await client.GetShadowAsync(cancellationToken);
return client;
}
}
16 changes: 9 additions & 7 deletions samples/memmon/dtmi_rido_memmon-2.aws.g.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.AwsIoTClient;
using MQTTnet.Extensions.MultiCloud.AwsIoTClient.TopicBindings;
using MQTTnet.Extensions.MultiCloud.BrokerIoTClient;

namespace dtmi_rido_memmon.aws;

Expand All @@ -17,18 +15,22 @@ public class memmon : AwsMqttClient, Imemmon
public ITelemetry<double> Telemetry_managedMemory { get; set; }
public ICommand<DiagnosticsMode, Dictionary<string, string>> Command_getRuntimeStats { get; set; }

public string InitialState => String.Empty;
public string InitialState { get; set; }

public ICommand<int, bool> Command_isPrime { get; set; }
public ICommand<int> Command_malloc { get; set; }
public ICommand Command_free { get; set; }

internal memmon(IMqttClient c) : base(c, Imemmon.ModelId)
internal memmon(IMqttClient c) : base(c)
{
Property_started = new ReadOnlyProperty<DateTime>(c, "started");
Property_interval = new WritableProperty<int>(c, "interval");
Property_enabled = new AwsWritablePropertyUTFJson<bool>(c, "enabled");
Telemetry_workingSet = new Telemetry<double>(c, "workingSet");
Command_getRuntimeStats = new Command<DiagnosticsMode, Dictionary<string, string>>(c, "getRuntimeStats");
Property_enabled = new WritableProperty<bool>(c, "enabled");
Telemetry_workingSet = new MQTTnet.Extensions.MultiCloud.BrokerIoTClient.Telemetry<double>(c, "workingSet");
Telemetry_managedMemory = new MQTTnet.Extensions.MultiCloud.BrokerIoTClient.Telemetry<double>(c, "managedMemory");
Command_getRuntimeStats = new MQTTnet.Extensions.MultiCloud.BrokerIoTClient.Command<DiagnosticsMode, Dictionary<string, string>>(c, "getRuntimeStats");
Command_isPrime = new MQTTnet.Extensions.MultiCloud.BrokerIoTClient.Command<int, bool>(c, "isPrime");
Command_malloc = new MQTTnet.Extensions.MultiCloud.BrokerIoTClient.Command<int>(c, "malloc");
Command_free = new MQTTnet.Extensions.MultiCloud.BrokerIoTClient.Command(c, "free");
}
}
1 change: 1 addition & 0 deletions samples/pi-sense-device/SenseHatFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public async Task<Isensehat> CreateSenseHatClientAsync(string connectionStringNa
var cs = new ConnectionSettings(connectionString) { ModelId = Isensehat.ModelId };
var mqtt = await BrokerClientFactory.CreateFromConnectionSettingsAsync(cs, true, cancellationToken);
var client = new dtmi_rido_pnp_sensehat.mqtt.sensehat(mqtt);
computedSettings = BrokerClientFactory.ComputedSettings!;
nugetPackageVersion = BrokerClientFactory.NuGetPackageVersion;
return client;
}
Expand Down
File renamed without changes.
58 changes: 47 additions & 11 deletions src/MQTTnet.Extensions.MultiCloud.AwsIoTClient/AwsClientFactory.cs
Original file line number Diff line number Diff line change
@@ -1,28 +1,64 @@
using MQTTnet.Client;
using MQTTnet.Adapter;
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.Connections;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Extensions.MultiCloud.AwsIoTClient
namespace MQTTnet.Extensions.MultiCloud.AwsIoTClient;

public static class AwsClientFactory
{
public static class AwsClientFactory
{
public static string NuGetPackageVersion => $"{ThisAssembly.AssemblyName} {ThisAssembly.NuGetPackageVersion}";
public static ConnectionSettings? ComputedSettings { get; private set; }
public static async Task<IMqttClient> CreateFromConnectionSettingsAsync(string connectinString, CancellationToken cancellationToken = default) =>
await CreateFromConnectionSettingsAsync(new ConnectionSettings(connectinString), cancellationToken);
public static string NuGetPackageVersion => $"{ThisAssembly.AssemblyName} {ThisAssembly.NuGetPackageVersion}";
public static ConnectionSettings? ComputedSettings { get; private set; }

public static async Task<IMqttClient> CreateFromConnectionSettingsAsync(ConnectionSettings cs, CancellationToken cancellationToken = default)
public static async Task<IMqttClient> CreateFromConnectionSettingsAsync(string connectinString, bool withBirth = false, CancellationToken cancellationToken = default) =>
await CreateFromConnectionSettingsAsync(new ConnectionSettings(connectinString), withBirth, cancellationToken);

public static async Task<IMqttClient> CreateFromConnectionSettingsAsync(ConnectionSettings cs, bool withBirth = false, CancellationToken cancellationToken = default)
{
MqttClient? mqtt = new MqttFactory().CreateMqttClient(MqttNetTraceLogger.CreateTraceLogger()) as MqttClient;
try
{
MqttClient? mqtt = new MqttFactory().CreateMqttClient(MqttNetTraceLogger.CreateTraceLogger()) as MqttClient;
var connAck = await mqtt!.ConnectAsync(new MqttClientOptionsBuilder().WithConnectionSettings(cs).Build(), cancellationToken);
if (connAck.ResultCode != MqttClientConnectResultCode.Success)
{
throw new ApplicationException($"Cannot connect to {cs}");
}
ComputedSettings = cs;
return mqtt;
}
catch (MqttConnectingFailedException ex)
{
if (ex.ResultCode == MqttClientConnectResultCode.UnspecifiedError
&& ex.InnerException!.Message == "The operation has timed out.")
{
var connAck = await mqtt!.ConnectAsync(new MqttClientOptionsBuilder().WithConnectionSettings(cs).Build(), cancellationToken);
if (connAck.ResultCode != MqttClientConnectResultCode.Success)
{
throw new ApplicationException($"Cannot connect to {cs}");
}
ComputedSettings = cs;
}
}

if (withBirth)
{
var birthPayload = new ShadowSerializer().ToBytes(
new BirthConvention.BirthMessage(BirthConvention.ConnectionStatus.online)
{
ModelId = cs.ModelId
});

var pubAck = await mqtt.PublishBinaryAsync(
BirthConvention.BirthTopic(mqtt!.Options.ClientId),
birthPayload,
Protocol.MqttQualityOfServiceLevel.AtLeastOnce, true, cancellationToken);
if (pubAck.ReasonCode != MqttClientPublishReasonCode.Success)
{
throw new ApplicationException($"Error publishing Birth {cs}");
}
}

return mqtt!;
}
}
41 changes: 14 additions & 27 deletions src/MQTTnet.Extensions.MultiCloud.AwsIoTClient/AwsMqttClient.cs
Original file line number Diff line number Diff line change
@@ -1,38 +1,25 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.AwsIoTClient.TopicBindings;
using MQTTnet.Extensions.MultiCloud.Connections;
using MQTTnet.Extensions.MultiCloud.Serializers;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Extensions.MultiCloud.AwsIoTClient
{
public class AwsMqttClient
{
public IMqttClient Connection { get; private set; }
private readonly ShadowRequestResponseBinder getShadowBinder;

namespace MQTTnet.Extensions.MultiCloud.AwsIoTClient;

public AwsMqttClient(IMqttClient c, string modelId = "") //: base(c)
{
Connection = c;
var birthMsg =
new UTF8JsonSerializer().ToBytes(
new BirthConvention.BirthMessage(BirthConvention.ConnectionStatus.online)
{
ModelId = modelId
});
_ = Connection.PublishBinaryAsync(
BirthConvention.BirthTopic(Connection.Options.ClientId),
birthMsg,
Protocol.MqttQualityOfServiceLevel.AtLeastOnce, true);
public class AwsMqttClient
{
public IMqttClient Connection { get; private set; }
private readonly ShadowRequestResponseBinder getShadowBinder;

getShadowBinder = new ShadowRequestResponseBinder(c);
}

public Task<string> GetShadowAsync(CancellationToken cancellationToken = default) =>
getShadowBinder.GetShadowAsync(cancellationToken);
public Task<string> UpdateShadowAsync(object payload, CancellationToken cancellationToken = default) =>
getShadowBinder.UpdateShadowAsync(payload, cancellationToken);
public AwsMqttClient(IMqttClient c) //: base(c)
{
Connection = c;
getShadowBinder = new ShadowRequestResponseBinder(c);
}

public Task<string> GetShadowAsync(CancellationToken cancellationToken = default) =>
getShadowBinder.GetShadowAsync(cancellationToken);
public Task<string> UpdateShadowAsync(object payload, CancellationToken cancellationToken = default) =>
getShadowBinder.UpdateShadowAsync(payload, cancellationToken);
}
Loading

0 comments on commit 87e6a9a

Please sign in to comment.