Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RequestResponse Binder #97

Merged
merged 4 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion samples/iothub-sample/Device.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ public Device(ILogger<Device> logger, IConfiguration configuration)

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var connectionSettings = new ConnectionSettings(_configuration.GetConnectionString("csme"));
var connectionSettings = new ConnectionSettings(_configuration.GetConnectionString("cs"));
_logger.LogWarning("Connecting to: {connectionSettings}", connectionSettings);

var client = new HubMqttClient(await HubDpsFactory.CreateFromConnectionSettingsAsync(connectionSettings, stoppingToken));
client.Connection.DisconnectedAsync += async d => await Task.Run(() => _logger.LogError("MQTT client disconnected {reason}", d.Reason));
var t0 = await client.GetTwinAsync(stoppingToken);
var v = await client.UpdateTwinAsync(new { started = DateTime.Now }, stoppingToken);
_logger.LogInformation("Updated Twin to verison: {v}", v);
var twin = await client.GetTwinAsync(stoppingToken);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.Connections;
using MQTTnet.Extensions.MultiCloud.Serializers;
using System.Threading;
using System.Threading.Tasks;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.Serializers;
using MQTTnet.Protocol;
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
Expand Down
18 changes: 18 additions & 0 deletions src/MQTTnet.Extensions.MultiCloud.AzureIoTClient/GetTwinBinder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.Binders;
using System.Text.Json;

namespace MQTTnet.Extensions.MultiCloud.AzureIoTClient;

internal class GetTwinBinder : RequestResponseBinder<string, string>
{
internal int lastRid = 0;
public GetTwinBinder(IMqttClient client) : base(client, string.Empty, true)
{
var rid = RidCounter.NextValue();
lastRid = rid;
requestTopicPattern = $"$iothub/twin/GET/?$rid={rid}";
responseTopicSub = "$iothub/twin/res/#";
responseTopicSuccess = $"$iothub/twin/res/200/?$rid={rid}";
}
}
29 changes: 23 additions & 6 deletions src/MQTTnet.Extensions.MultiCloud.AzureIoTClient/HubMqttClient.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.AzureIoTClient.Untyped;
using MQTTnet.Extensions.MultiCloud.Serializers;
using System.Text.Json;
using System.Text.Json.Nodes;

namespace MQTTnet.Extensions.MultiCloud.AzureIoTClient
Expand All @@ -10,17 +11,23 @@ public class HubMqttClient : IHubMqttClient
public IMqttClient Connection { get; set; }
public string InitialState { get; set; } = String.Empty;

private readonly TwinRequestResponseBinder twinOperationsBinder;
//private readonly TwinRequestResponseBinder twinOperationsBinder;

private readonly GetTwinBinder getTwinBinder;
private readonly UpdateTwinBinder<object> updateTwinBinder;

private readonly GenericDesiredUpdatePropertyBinder genericDesiredUpdateProperty;
private readonly GenericCommand command;

public HubMqttClient(IMqttClient c)
{
Connection = c;
twinOperationsBinder = new TwinRequestResponseBinder(c);
//updateTwinBinder = new UpdateTwinBinder(c);
//twinOperationsBinder = new TwinRequestResponseBinder(c);

getTwinBinder = new GetTwinBinder(c);
updateTwinBinder = new UpdateTwinBinder<object>(c);
command = new GenericCommand(c);
genericDesiredUpdateProperty = new GenericDesiredUpdatePropertyBinder(c, twinOperationsBinder!);
genericDesiredUpdateProperty = new GenericDesiredUpdatePropertyBinder(c, updateTwinBinder!);
}

public Func<GenericCommandRequest, GenericCommandResponse> OnCommandReceived
Expand All @@ -35,8 +42,18 @@ public Func<JsonNode, GenericPropertyAck> OnPropertyUpdateReceived
set => genericDesiredUpdateProperty.OnProperty_Updated = value;
}

public Task<string> GetTwinAsync(CancellationToken cancellationToken = default) => twinOperationsBinder.GetTwinAsync(cancellationToken);
public Task<int> UpdateTwinAsync(object payload, CancellationToken cancellationToken = default) => twinOperationsBinder.UpdateTwinAsync(payload, cancellationToken);
public async Task<string> GetTwinAsync(CancellationToken cancellationToken = default)
{
var twin = await getTwinBinder.InvokeAsync(Connection.Options.ClientId, string.Empty);
return twin!.ToString()!;
}

public async Task<int> UpdateTwinAsync(object payload, CancellationToken cancellationToken = default)
{
var twin = await updateTwinBinder.InvokeAsync(Connection.Options.ClientId, payload);
return twin;
}

public async Task<MqttClientPublishResult> SendTelemetryAsync(object payload, CancellationToken t = default)
{
string clientSegment = Connection.Options.ClientId;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public class GenericDesiredUpdatePropertyBinder
{
private readonly IMqttClient connection;
public Func<JsonNode, GenericPropertyAck>? OnProperty_Updated = null;
public GenericDesiredUpdatePropertyBinder(IMqttClient c, TwinRequestResponseBinder updTwinBinder)
public GenericDesiredUpdatePropertyBinder(IMqttClient c, UpdateTwinBinder<object> updTwinBinder)
{
connection = c;
_ = connection.SubscribeWithReplyAsync("$iothub/twin/PATCH/properties/desired/#");
Expand All @@ -27,7 +27,7 @@ public GenericDesiredUpdatePropertyBinder(IMqttClient c, TwinRequestResponseBind
var ack = OnProperty_Updated(desired);
if (ack != null)
{
_ = updTwinBinder.UpdateTwinAsync(ack.BuildAck());
_ = updTwinBinder.InvokeAsync(connection.Options.ClientId, ack.BuildAck());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.Binders;
using MQTTnet.Extensions.MultiCloud.Serializers;
using System.Text;
using System.Text.Json;
using System.Web;

namespace MQTTnet.Extensions.MultiCloud.AzureIoTClient;

public class UpdateTwinBinder<T> : RequestResponseBinder<T, int>
{
public UpdateTwinBinder(IMqttClient c) : base(c, string.Empty, true)
{
var rid = RidCounter.NextValue();
requestTopicPattern = $"$iothub/twin/PATCH/properties/reported/?$rid={rid}";
responseTopicSub = "$iothub/twin/res/#";
responseTopicSuccess = $"$iothub/twin/res/204/?$rid={rid}";
requireNotEmptyPayload = false;
VersionExtractor = topic =>
{
var segments = topic.Split('/');
int twinVersion = -1;
string rid = string.Empty;
if (topic.Contains('?'))
{
var qs = HttpUtility.ParseQueryString(segments[^1]);
if (int.TryParse(qs["$version"], out int v))
{
twinVersion = v;
}
}
return twinVersion;
};
}
}
15 changes: 15 additions & 0 deletions src/MQTTnet.Extensions.MultiCloud.BrokerIoTClient/CommandClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.Binders;

namespace MQTTnet.Extensions.MultiCloud.BrokerIoTClient
{
public class CommandClient<T, TResp> : RequestResponseBinder<T, TResp>
{
public CommandClient(IMqttClient client, string commandName)
: base(client, commandName, false)
{
requestTopicPattern = "device/{clientId}/commands/{commandName}";
responseTopicSuccess = "device/{clientId}/commands/{commandName}/resp";
}
}
}
Loading