Skip to content

Commit

Permalink
Feature Add Resilient Client
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisPulman committed Jan 22, 2025
1 parent 14456f2 commit 7762233
Show file tree
Hide file tree
Showing 26 changed files with 2,272 additions and 426 deletions.
27 changes: 26 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,32 @@
# MQTTnet.Rx.Client
A Reactive Client for MQTTnet Broker

## NOTE: ManagedClient support has currently been removed from the MQTTnet.Rx.Client library. We will look into the possibility of adding this functionality in the future. This is due to the fact that the ManagedClient is no longer included in the MQTTnet V5 library.
## NOTE: ManagedClient support has been removed from the MQTTnet.Rx.Client library. This is due to the fact that the ManagedClient is no longer included in the MQTTnet V5 library.

## We now have a Reactive implimentaion through IResilientClient that we aim to have feature parity with the ManagedClient.
## The ResilientClient is a wrapper around the MqttClient that will automatically reconnect to the broker if the connection is lost.

## Create a Resilient Mqtt Client to Publish an Observable stream
```csharp
Create.ResilientMqttClient()
.WithResilientClientOptions(a =>
a.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.WithClientOptions(c =>
c.WithTcpServer("localhost", 9000)))
.PublishMessage(_message)
.Subscribe(r => Console.WriteLine($"{r.ReasonCode} [{r.PacketIdentifier}]"));
```

## Create a Resilient Mqtt Client to Subscribe to a Topic
```csharp
Create.ResilientMqttClient()
.WithResilientClientOptions(a =>
a.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.WithClientOptions(c =>
c.WithTcpServer("localhost", 9000)))
.SubscribeToTopic("FromMilliseconds")
.Subscribe(r => Console.WriteLine($"{r.ReasonCode} [{r.ApplicationMessage.Topic}] value : {r.ApplicationMessage.ConvertPayloadToString()}"));
```

## Create a Mqtt Client to Publish an Observable stream
```csharp
Expand Down
2 changes: 1 addition & 1 deletion Version.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"$schema": "https://raw.githubusercontent.com/dotnet/Nerdbank.GitVersioning/master/src/NerdBank.GitVersioning/version.schema.json",
"version": "3.0",
"version": "3.1.0",
"publicReleaseRefSpec": [
"^refs/heads/master$",
"^refs/heads/main$"
Expand Down
68 changes: 34 additions & 34 deletions src/MQTTnet.Rx.Client.TestApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ private static void Main(string[] args)
{
var publishMenu = new ConsoleMenu(args, level: 1)
.Add("Publish Client", PublishClient)
////.Add("Publish Managed", PublishManagedClient)
.Add("Publish Resilient", PublishResilientClient)
.Add("Close", ConsoleMenu.Close)
.Configure(config =>
{
Expand All @@ -35,8 +35,8 @@ private static void Main(string[] args)
});
var subscribeMenu = new ConsoleMenu(args, level: 1)
.Add("Subscribe Client", SubscribeClient)
////.Add("Subscribe Managed Client", SubscribeManagedClient)
.Add("Discover Managed Client", DiscoverTopicsManagedClient)
.Add("Subscribe Resilient Client", SubscribeResilientClient)
.Add("Discover Resilient Client", DiscoverTopicsManagedClient)
.Add("Close", ConsoleMenu.Close)
.Configure(config =>
{
Expand All @@ -59,18 +59,18 @@ private static void Main(string[] args)
.Show();
}

////private static void PublishManagedClient()
////{
//// _disposables.Add(Create.ManagedMqttClient()
//// .WithManagedClientOptions(a =>
//// a.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
//// .WithClientOptions(c =>
//// c.WithTcpServer("localhost", 9000)))
//// .PublishMessage(_message)
//// .Subscribe(r => Console.WriteLine($"{r.ApplicationMessage.Id} [{r.ApplicationMessage.ApplicationMessage.Topic}] value : {r.ApplicationMessage.ApplicationMessage.ConvertPayloadToString()}")));
//// StartMessages("managed/");
//// WaitForExit();
////}
private static void PublishResilientClient()
{
_disposables.Add(Create.ResilientMqttClient()
.WithResilientClientOptions(a =>
a.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.WithClientOptions(c =>
c.WithTcpServer("localhost", 9000)))
.PublishMessage(_message)
.Subscribe(r => Console.WriteLine($"{r.ApplicationMessage.Id} [{r.ApplicationMessage.ApplicationMessage?.Topic}] value : {r.ApplicationMessage.ApplicationMessage.ConvertPayloadToString()}")));
StartMessages("managed/");
WaitForExit();
}

private static void PublishClient()
{
Expand Down Expand Up @@ -98,25 +98,25 @@ private static void SubscribeClient()
WaitForExit();
}

////private static void SubscribeManagedClient()
////{
//// _disposables.Add(Create.ManagedMqttClient()
//// .WithManagedClientOptions(a =>
//// a.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
//// .WithClientOptions(c =>
//// c.WithTcpServer("localhost", 9000)))
//// .SubscribeToTopic("+/FromMilliseconds")
//// .Do(r => Console.WriteLine($"{r.ReasonCode} [{r.ApplicationMessage.Topic}] value : {r.ApplicationMessage.ConvertPayloadToString()}"))
//// .ToDictionary()
//// .Subscribe(dict =>
//// {
//// foreach (var item in dict!)
//// {
//// Console.WriteLine($"key: {item.Key} value: {item.Value}");
//// }
//// }));
//// WaitForExit();
////}
private static void SubscribeResilientClient()
{
_disposables.Add(Create.ResilientMqttClient()
.WithResilientClientOptions(a =>
a.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.WithClientOptions(c =>
c.WithTcpServer("localhost", 9000)))
.SubscribeToTopic("+/FromMilliseconds")
.Do(r => Console.WriteLine($"{r.ReasonCode} [{r.ApplicationMessage.Topic}] value : {r.ApplicationMessage.ConvertPayloadToString()}"))
.ToDictionary()
.Subscribe(dict =>
{
foreach (var item in dict!)
{
Console.WriteLine($"key: {item.Key} value: {item.Value}");
}
}));
WaitForExit();
}

private static void DiscoverTopicsManagedClient()
{
Expand Down
170 changes: 95 additions & 75 deletions src/MQTTnet.Rx.Client/Create.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System.Reactive.Disposables;
using System.Reactive.Linq;
using MQTTnet.Rx.Client.ResilientClient.Internal;

namespace MQTTnet.Rx.Client;

Expand Down Expand Up @@ -48,28 +49,28 @@ public static IObservable<IMqttClient> MqttClient()
}).Retry();
}

/////// <summary>
/////// Manageds the MQTT client.
/////// </summary>
/////// <returns>A Managed Mqtt Client.</returns>
////public static IObservable<IManagedMqttClient> ManagedMqttClient()
////{
//// var mqttClient = MqttFactory.CreateManagedMqttClient();
//// var clientCount = 0;
//// return Observable.Create<IManagedMqttClient>(observer =>
//// {
//// observer.OnNext(mqttClient);
//// Interlocked.Increment(ref clientCount);
//// return Disposable.Create(() =>
//// {
//// Interlocked.Decrement(ref clientCount);
//// if (clientCount == 0)
//// {
//// mqttClient.Dispose();
//// }
//// });
//// }).Retry();
////}
/// <summary>
/// Resilient the MQTT client.
/// </summary>
/// <returns>A Resilient Mqtt Client.</returns>
public static IObservable<IResilientMqttClient> ResilientMqttClient()
{
var mqttClient = MqttFactory.CreateResilientMqttClient();
var clientCount = 0;
return Observable.Create<IResilientMqttClient>(observer =>
{
observer.OnNext(mqttClient);
Interlocked.Increment(ref clientCount);
return Disposable.Create(() =>
{
Interlocked.Decrement(ref clientCount);
if (clientCount == 0)
{
mqttClient.Dispose();
}
});
}).Retry();
}

/// <summary>
/// Withes the client options.
Expand Down Expand Up @@ -97,60 +98,79 @@ public static IObservable<IMqttClient> WithClientOptions(this IObservable<IMqttC
return disposable;
});

/////// <summary>
/////// Withes the managed client options.
/////// </summary>
/////// <param name="client">The client.</param>
/////// <param name="optionsBuilder">The options builder.</param>
/////// <returns>A Managed Mqtt Client.</returns>
////public static IObservable<IManagedMqttClient> WithManagedClientOptions(this IObservable<IManagedMqttClient> client, Action<ManagedMqttClientOptionsBuilder> optionsBuilder) =>
//// Observable.Create<IManagedMqttClient>(observer =>
//// {
//// var mqttClientOptions = MqttFactory.CreateManagedClientOptionsBuilder();
//// optionsBuilder(mqttClientOptions);
//// var disposable = new CompositeDisposable();
//// disposable.Add(client.Subscribe(c =>
//// {
//// if (c.IsStarted)
//// {
//// observer.OnNext(c);
//// }
//// else
//// {
//// disposable.Add(Observable.StartAsync(async () => await c.StartAsync(mqttClientOptions.Build())).Subscribe(_ => observer.OnNext(c)));
//// }
//// }));
//// return disposable;
//// });
/// <summary>
/// Withes the Resilient client options.
/// </summary>
/// <param name="client">The client.</param>
/// <param name="optionsBuilder">The options builder.</param>
/// <returns>A Resilient Mqtt Client.</returns>
public static IObservable<IResilientMqttClient> WithResilientClientOptions(this IObservable<IResilientMqttClient> client, Action<ResilientMqttClientOptionsBuilder> optionsBuilder) =>
Observable.Create<IResilientMqttClient>(observer =>
{
var mqttClientOptions = MqttFactory.CreateResilientClientOptionsBuilder();
optionsBuilder(mqttClientOptions);
var disposable = new CompositeDisposable();
disposable.Add(client.Subscribe(c =>
{
if (c.IsStarted)
{
observer.OnNext(c);
}
else
{
disposable.Add(Observable.StartAsync(async () => await c.StartAsync(mqttClientOptions.Build())).Subscribe(_ => observer.OnNext(c)));
}
}));
return disposable;
});

/// <summary>
/// Withes the client options.
/// </summary>
/// <param name="builder">The builder.</param>
/// <param name="clientBuilder">The client builder.</param>
/// <returns>A ManagedMqttClientOptionsBuilder.</returns>
/// <exception cref="ArgumentNullException">
/// builder
/// or
/// clientBuilder.
/// </exception>
public static ResilientMqttClientOptionsBuilder WithClientOptions(this ResilientMqttClientOptionsBuilder builder, Action<MqttClientOptionsBuilder> clientBuilder)
{
ArgumentNullException.ThrowIfNull(builder);
ArgumentNullException.ThrowIfNull(clientBuilder);

var optionsBuilder = MqttFactory.CreateClientOptionsBuilder();
clientBuilder(optionsBuilder);
builder.WithClientOptions(optionsBuilder);
return builder;
}

/// <summary>
/// Creates the client options builder.
/// </summary>
/// <param name="factory">The MqttFactory.</param>
/// <returns>A Resilient Mqtt Client Options Builder.</returns>
#pragma warning disable RCS1175 // Unused 'this' parameter.
public static ResilientMqttClientOptionsBuilder CreateResilientClientOptionsBuilder(this MqttClientFactory factory) => new();
#pragma warning restore RCS1175 // Unused 'this' parameter.

/////// <summary>
/////// Withes the client options.
/////// </summary>
/////// <param name="builder">The builder.</param>
/////// <param name="clientBuilder">The client builder.</param>
/////// <returns>A ManagedMqttClientOptionsBuilder.</returns>
/////// <exception cref="System.ArgumentNullException">
/////// builder
/////// or
/////// clientBuilder.
/////// </exception>
////public static ManagedMqttClientOptionsBuilder WithClientOptions(this ManagedMqttClientOptionsBuilder builder, Action<MqttClientOptionsBuilder> clientBuilder)
////{
//// builder.ThrowArgumentNullExceptionIfNull(nameof(builder));
//// clientBuilder.ThrowArgumentNullExceptionIfNull(nameof(clientBuilder));
/// <summary>
/// Creates the Resilient MQTT client.
/// </summary>
/// <param name="factory">The factory.</param>
/// <param name="mqttClient">The MQTT client.</param>
/// <returns>IResilientMqttClient.</returns>
/// <exception cref="ArgumentNullException">factory.</exception>
private static ResilientMqttClient CreateResilientMqttClient(this MqttClientFactory factory, IMqttClient? mqttClient = null)
{
ArgumentNullException.ThrowIfNull(factory);

//// var optionsBuilder = MqttFactory.CreateClientOptionsBuilder();
//// clientBuilder(optionsBuilder);
//// builder.WithClientOptions(optionsBuilder);
//// return builder;
////}
if (mqttClient == null)
{
return new ResilientMqttClient(factory.CreateMqttClient(), factory.DefaultLogger);
}

//// /// <summary>
//// /// Creates the client options builder.
//// /// </summary>
//// /// <param name="factory">The MqttFactory.</param>
//// /// <returns>A Managed Mqtt Client Options Builder.</returns>
////#pragma warning disable RCS1175 // Unused 'this' parameter.
//// public static ManagedMqttClientOptionsBuilder CreateManagedClientOptionsBuilder(this MqttClientFactory factory) => new();
////#pragma warning restore RCS1175 // Unused 'this' parameter.
return new ResilientMqttClient(mqttClient, factory.DefaultLogger);
}
}
4 changes: 0 additions & 4 deletions src/MQTTnet.Rx.Client/MQTTnet.Rx.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<Compile Remove="MqttManagedClientExtensions.cs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="MQTTnet" Version="5.0.1.1416" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
Expand Down
Loading

0 comments on commit 7762233

Please sign in to comment.