diff --git a/README.md b/README.md index c2bbda1..d4902f6 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,32 @@ # MQTTnet.Rx.Client A Reactive Client for MQTTnet Broker -## NOTE: ManagedClient support has currently been removed from the MQTTnet.Rx.Client library. We will look into the possibility of adding this functionality in the future. This is due to the fact that the ManagedClient is no longer included in the MQTTnet V5 library. +## NOTE: ManagedClient support has been removed from the MQTTnet.Rx.Client library. This is due to the fact that the ManagedClient is no longer included in the MQTTnet V5 library. + +## We now have a Reactive implimentaion through IResilientClient that we aim to have feature parity with the ManagedClient. +## The ResilientClient is a wrapper around the MqttClient that will automatically reconnect to the broker if the connection is lost. + +## Create a Resilient Mqtt Client to Publish an Observable stream +```csharp +Create.ResilientMqttClient() + .WithResilientClientOptions(a => + a.WithAutoReconnectDelay(TimeSpan.FromSeconds(5)) + .WithClientOptions(c => + c.WithTcpServer("localhost", 9000))) + .PublishMessage(_message) + .Subscribe(r => Console.WriteLine($"{r.ReasonCode} [{r.PacketIdentifier}]")); +``` + +## Create a Resilient Mqtt Client to Subscribe to a Topic +```csharp +Create.ResilientMqttClient() + .WithResilientClientOptions(a => + a.WithAutoReconnectDelay(TimeSpan.FromSeconds(5)) + .WithClientOptions(c => + c.WithTcpServer("localhost", 9000))) + .SubscribeToTopic("FromMilliseconds") + .Subscribe(r => Console.WriteLine($"{r.ReasonCode} [{r.ApplicationMessage.Topic}] value : {r.ApplicationMessage.ConvertPayloadToString()}")); +``` ## Create a Mqtt Client to Publish an Observable stream ```csharp diff --git a/Version.json b/Version.json index 6f6e77f..9280037 100644 --- a/Version.json +++ b/Version.json @@ -1,6 +1,6 @@ { "$schema": "https://raw.githubusercontent.com/dotnet/Nerdbank.GitVersioning/master/src/NerdBank.GitVersioning/version.schema.json", - "version": "3.0", + "version": "3.1.0", "publicReleaseRefSpec": [ "^refs/heads/master$", "^refs/heads/main$" diff --git a/src/MQTTnet.Rx.Client.TestApp/Program.cs b/src/MQTTnet.Rx.Client.TestApp/Program.cs index 1bee113..f32399e 100644 --- a/src/MQTTnet.Rx.Client.TestApp/Program.cs +++ b/src/MQTTnet.Rx.Client.TestApp/Program.cs @@ -23,7 +23,7 @@ private static void Main(string[] args) { var publishMenu = new ConsoleMenu(args, level: 1) .Add("Publish Client", PublishClient) - ////.Add("Publish Managed", PublishManagedClient) + .Add("Publish Resilient", PublishResilientClient) .Add("Close", ConsoleMenu.Close) .Configure(config => { @@ -35,8 +35,8 @@ private static void Main(string[] args) }); var subscribeMenu = new ConsoleMenu(args, level: 1) .Add("Subscribe Client", SubscribeClient) - ////.Add("Subscribe Managed Client", SubscribeManagedClient) - .Add("Discover Managed Client", DiscoverTopicsManagedClient) + .Add("Subscribe Resilient Client", SubscribeResilientClient) + .Add("Discover Resilient Client", DiscoverTopicsManagedClient) .Add("Close", ConsoleMenu.Close) .Configure(config => { @@ -59,18 +59,18 @@ private static void Main(string[] args) .Show(); } - ////private static void PublishManagedClient() - ////{ - //// _disposables.Add(Create.ManagedMqttClient() - //// .WithManagedClientOptions(a => - //// a.WithAutoReconnectDelay(TimeSpan.FromSeconds(5)) - //// .WithClientOptions(c => - //// c.WithTcpServer("localhost", 9000))) - //// .PublishMessage(_message) - //// .Subscribe(r => Console.WriteLine($"{r.ApplicationMessage.Id} [{r.ApplicationMessage.ApplicationMessage.Topic}] value : {r.ApplicationMessage.ApplicationMessage.ConvertPayloadToString()}"))); - //// StartMessages("managed/"); - //// WaitForExit(); - ////} + private static void PublishResilientClient() + { + _disposables.Add(Create.ResilientMqttClient() + .WithResilientClientOptions(a => + a.WithAutoReconnectDelay(TimeSpan.FromSeconds(5)) + .WithClientOptions(c => + c.WithTcpServer("localhost", 9000))) + .PublishMessage(_message) + .Subscribe(r => Console.WriteLine($"{r.ApplicationMessage.Id} [{r.ApplicationMessage.ApplicationMessage?.Topic}] value : {r.ApplicationMessage.ApplicationMessage.ConvertPayloadToString()}"))); + StartMessages("managed/"); + WaitForExit(); + } private static void PublishClient() { @@ -98,25 +98,25 @@ private static void SubscribeClient() WaitForExit(); } - ////private static void SubscribeManagedClient() - ////{ - //// _disposables.Add(Create.ManagedMqttClient() - //// .WithManagedClientOptions(a => - //// a.WithAutoReconnectDelay(TimeSpan.FromSeconds(5)) - //// .WithClientOptions(c => - //// c.WithTcpServer("localhost", 9000))) - //// .SubscribeToTopic("+/FromMilliseconds") - //// .Do(r => Console.WriteLine($"{r.ReasonCode} [{r.ApplicationMessage.Topic}] value : {r.ApplicationMessage.ConvertPayloadToString()}")) - //// .ToDictionary() - //// .Subscribe(dict => - //// { - //// foreach (var item in dict!) - //// { - //// Console.WriteLine($"key: {item.Key} value: {item.Value}"); - //// } - //// })); - //// WaitForExit(); - ////} + private static void SubscribeResilientClient() + { + _disposables.Add(Create.ResilientMqttClient() + .WithResilientClientOptions(a => + a.WithAutoReconnectDelay(TimeSpan.FromSeconds(5)) + .WithClientOptions(c => + c.WithTcpServer("localhost", 9000))) + .SubscribeToTopic("+/FromMilliseconds") + .Do(r => Console.WriteLine($"{r.ReasonCode} [{r.ApplicationMessage.Topic}] value : {r.ApplicationMessage.ConvertPayloadToString()}")) + .ToDictionary() + .Subscribe(dict => + { + foreach (var item in dict!) + { + Console.WriteLine($"key: {item.Key} value: {item.Value}"); + } + })); + WaitForExit(); + } private static void DiscoverTopicsManagedClient() { diff --git a/src/MQTTnet.Rx.Client/Create.cs b/src/MQTTnet.Rx.Client/Create.cs index cba2f2f..c508347 100644 --- a/src/MQTTnet.Rx.Client/Create.cs +++ b/src/MQTTnet.Rx.Client/Create.cs @@ -3,6 +3,7 @@ using System.Reactive.Disposables; using System.Reactive.Linq; +using MQTTnet.Rx.Client.ResilientClient.Internal; namespace MQTTnet.Rx.Client; @@ -48,28 +49,28 @@ public static IObservable MqttClient() }).Retry(); } - /////// - /////// Manageds the MQTT client. - /////// - /////// A Managed Mqtt Client. - ////public static IObservable ManagedMqttClient() - ////{ - //// var mqttClient = MqttFactory.CreateManagedMqttClient(); - //// var clientCount = 0; - //// return Observable.Create(observer => - //// { - //// observer.OnNext(mqttClient); - //// Interlocked.Increment(ref clientCount); - //// return Disposable.Create(() => - //// { - //// Interlocked.Decrement(ref clientCount); - //// if (clientCount == 0) - //// { - //// mqttClient.Dispose(); - //// } - //// }); - //// }).Retry(); - ////} + /// + /// Resilient the MQTT client. + /// + /// A Resilient Mqtt Client. + public static IObservable ResilientMqttClient() + { + var mqttClient = MqttFactory.CreateResilientMqttClient(); + var clientCount = 0; + return Observable.Create(observer => + { + observer.OnNext(mqttClient); + Interlocked.Increment(ref clientCount); + return Disposable.Create(() => + { + Interlocked.Decrement(ref clientCount); + if (clientCount == 0) + { + mqttClient.Dispose(); + } + }); + }).Retry(); + } /// /// Withes the client options. @@ -97,60 +98,79 @@ public static IObservable WithClientOptions(this IObservable - /////// Withes the managed client options. - /////// - /////// The client. - /////// The options builder. - /////// A Managed Mqtt Client. - ////public static IObservable WithManagedClientOptions(this IObservable client, Action optionsBuilder) => - //// Observable.Create(observer => - //// { - //// var mqttClientOptions = MqttFactory.CreateManagedClientOptionsBuilder(); - //// optionsBuilder(mqttClientOptions); - //// var disposable = new CompositeDisposable(); - //// disposable.Add(client.Subscribe(c => - //// { - //// if (c.IsStarted) - //// { - //// observer.OnNext(c); - //// } - //// else - //// { - //// disposable.Add(Observable.StartAsync(async () => await c.StartAsync(mqttClientOptions.Build())).Subscribe(_ => observer.OnNext(c))); - //// } - //// })); - //// return disposable; - //// }); + /// + /// Withes the Resilient client options. + /// + /// The client. + /// The options builder. + /// A Resilient Mqtt Client. + public static IObservable WithResilientClientOptions(this IObservable client, Action optionsBuilder) => + Observable.Create(observer => + { + var mqttClientOptions = MqttFactory.CreateResilientClientOptionsBuilder(); + optionsBuilder(mqttClientOptions); + var disposable = new CompositeDisposable(); + disposable.Add(client.Subscribe(c => + { + if (c.IsStarted) + { + observer.OnNext(c); + } + else + { + disposable.Add(Observable.StartAsync(async () => await c.StartAsync(mqttClientOptions.Build())).Subscribe(_ => observer.OnNext(c))); + } + })); + return disposable; + }); + + /// + /// Withes the client options. + /// + /// The builder. + /// The client builder. + /// A ManagedMqttClientOptionsBuilder. + /// + /// builder + /// or + /// clientBuilder. + /// + public static ResilientMqttClientOptionsBuilder WithClientOptions(this ResilientMqttClientOptionsBuilder builder, Action clientBuilder) + { + ArgumentNullException.ThrowIfNull(builder); + ArgumentNullException.ThrowIfNull(clientBuilder); + + var optionsBuilder = MqttFactory.CreateClientOptionsBuilder(); + clientBuilder(optionsBuilder); + builder.WithClientOptions(optionsBuilder); + return builder; + } + + /// + /// Creates the client options builder. + /// + /// The MqttFactory. + /// A Resilient Mqtt Client Options Builder. +#pragma warning disable RCS1175 // Unused 'this' parameter. + public static ResilientMqttClientOptionsBuilder CreateResilientClientOptionsBuilder(this MqttClientFactory factory) => new(); +#pragma warning restore RCS1175 // Unused 'this' parameter. - /////// - /////// Withes the client options. - /////// - /////// The builder. - /////// The client builder. - /////// A ManagedMqttClientOptionsBuilder. - /////// - /////// builder - /////// or - /////// clientBuilder. - /////// - ////public static ManagedMqttClientOptionsBuilder WithClientOptions(this ManagedMqttClientOptionsBuilder builder, Action clientBuilder) - ////{ - //// builder.ThrowArgumentNullExceptionIfNull(nameof(builder)); - //// clientBuilder.ThrowArgumentNullExceptionIfNull(nameof(clientBuilder)); + /// + /// Creates the Resilient MQTT client. + /// + /// The factory. + /// The MQTT client. + /// IResilientMqttClient. + /// factory. + private static ResilientMqttClient CreateResilientMqttClient(this MqttClientFactory factory, IMqttClient? mqttClient = null) + { + ArgumentNullException.ThrowIfNull(factory); - //// var optionsBuilder = MqttFactory.CreateClientOptionsBuilder(); - //// clientBuilder(optionsBuilder); - //// builder.WithClientOptions(optionsBuilder); - //// return builder; - ////} + if (mqttClient == null) + { + return new ResilientMqttClient(factory.CreateMqttClient(), factory.DefaultLogger); + } -//// /// -//// /// Creates the client options builder. -//// /// -//// /// The MqttFactory. -//// /// A Managed Mqtt Client Options Builder. -////#pragma warning disable RCS1175 // Unused 'this' parameter. -//// public static ManagedMqttClientOptionsBuilder CreateManagedClientOptionsBuilder(this MqttClientFactory factory) => new(); -////#pragma warning restore RCS1175 // Unused 'this' parameter. + return new ResilientMqttClient(mqttClient, factory.DefaultLogger); + } } diff --git a/src/MQTTnet.Rx.Client/MQTTnet.Rx.Client.csproj b/src/MQTTnet.Rx.Client/MQTTnet.Rx.Client.csproj index 9795951..7c189e6 100644 --- a/src/MQTTnet.Rx.Client/MQTTnet.Rx.Client.csproj +++ b/src/MQTTnet.Rx.Client/MQTTnet.Rx.Client.csproj @@ -6,10 +6,6 @@ enable - - - - diff --git a/src/MQTTnet.Rx.Client/MqttManagedClientExtensions.cs b/src/MQTTnet.Rx.Client/MqttManagedClientExtensions.cs deleted file mode 100644 index f4a42ba..0000000 --- a/src/MQTTnet.Rx.Client/MqttManagedClientExtensions.cs +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright (c) Chris Pulman. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -using MQTTnet.Client; -using MQTTnet.Extensions.ManagedClient; - -namespace MQTTnet.Rx.Client; - -/// -/// Mqtt Managed Client Extensions. -/// -public static class MqttManagedClientExtensions -{ - /// - /// Application messages processed. - /// - /// The client. - /// A Application Message Processed Event Args. - public static IObservable ApplicationMessageProcessed(this IManagedMqttClient client) => - CreateObservable.FromAsyncEvent( - handler => client.ApplicationMessageProcessedAsync += handler, - handler => client.ApplicationMessageProcessedAsync -= handler); - - /// - /// Connected to the specified client. - /// - /// The client. - /// A Mqtt Client Connected Event Args. - public static IObservable Connected(this IManagedMqttClient client) => - CreateObservable.FromAsyncEvent( - handler => client.ConnectedAsync += handler, - handler => client.ConnectedAsync -= handler); - - /// - /// Disconnected from the specified client. - /// - /// The client. - /// A Mqtt Client Disconnected Event Args. - public static IObservable Disconnected(this IManagedMqttClient client) => - CreateObservable.FromAsyncEvent( - handler => client.DisconnectedAsync += handler, - handler => client.DisconnectedAsync -= handler); - - /// - /// Connecting failed. - /// - /// The client. - /// A Connecting Failed Event Args. - public static IObservable ConnectingFailed(this IManagedMqttClient client) => - CreateObservable.FromAsyncEvent( - handler => client.ConnectingFailedAsync += handler, - handler => client.ConnectingFailedAsync -= handler); - - /// - /// Connection state changed. - /// - /// The client. - /// Event Args. - public static IObservable ConnectionStateChanged(this IManagedMqttClient client) => - CreateObservable.FromAsyncEvent( - handler => client.ConnectionStateChangedAsync += handler, - handler => client.ConnectionStateChangedAsync -= handler); - - /// - /// Synchronizing subscriptions failed. - /// - /// The client. - /// A Managed Process Failed Event Args. - public static IObservable SynchronizingSubscriptionsFailed(this IManagedMqttClient client) => - CreateObservable.FromAsyncEvent( - handler => client.SynchronizingSubscriptionsFailedAsync += handler, - handler => client.SynchronizingSubscriptionsFailedAsync -= handler); - - /// - /// Application messages processed. - /// - /// The client. - /// A Application Message Skipped Event Args. - public static IObservable ApplicationMessageSkipped(this IManagedMqttClient client) => - CreateObservable.FromAsyncEvent( - handler => client.ApplicationMessageSkippedAsync += handler, - handler => client.ApplicationMessageSkippedAsync -= handler); - - /// - /// Application messages received. - /// - /// The client. - /// A Mqtt Application Message Received Event Args. - public static IObservable ApplicationMessageReceived(this IManagedMqttClient client) => - CreateObservable.FromAsyncEvent( - handler => client.ApplicationMessageReceivedAsync += handler, - handler => client.ApplicationMessageReceivedAsync -= handler); -} diff --git a/src/MQTTnet.Rx.Client/MqttdPublishExtensions.cs b/src/MQTTnet.Rx.Client/MqttdPublishExtensions.cs index dec3468..6bf1a58 100644 --- a/src/MQTTnet.Rx.Client/MqttdPublishExtensions.cs +++ b/src/MQTTnet.Rx.Client/MqttdPublishExtensions.cs @@ -51,87 +51,87 @@ public static IObservable PublishMessage(this IObservab return disposable; }).Retry(); - /////// - /////// Publishes the message. - /////// - /////// The client. - /////// The message. - /////// The qos. - /////// if set to true [retain]. - /////// A Mqtt Client Publish Result. - ////public static IObservable PublishMessage(this IObservable client, IObservable<(string topic, string payLoad)> message, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.ExactlyOnce, bool retain = true) => - //// Observable.Create(observer => - //// { - //// var disposable = new CompositeDisposable(); - //// var setup = false; - //// disposable.Add(client.CombineLatest(message, (cli, mess) => (cli, mess)).Subscribe(async c => - //// { - //// if (!setup) - //// { - //// setup = true; - //// disposable.Add(c.cli.ApplicationMessageProcessed().Retry().Subscribe(args => observer.OnNext(args))); - //// } - - //// var applicationMessage = Create.MqttFactory.CreateApplicationMessageBuilder() - //// .WithTopic(c.mess.topic) - //// .WithPayload(c.mess.payLoad) - //// .WithQualityOfServiceLevel(qos) - //// .WithRetainFlag(retain) - //// .Build(); - - //// try - //// { - //// await c.cli.EnqueueAsync(applicationMessage); - //// } - //// catch (Exception ex) - //// { - //// observer.OnError(ex); - //// } - //// })); - - //// return disposable; - //// }).Retry(); - - /////// - /////// Publishes the message. - /////// - /////// The client. - /////// The message. - /////// The qos. - /////// if set to true [retain]. - /////// A Mqtt Client Publish Result. - ////public static IObservable PublishMessage(this IObservable client, IObservable<(string topic, byte[] payLoad)> message, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.ExactlyOnce, bool retain = true) => - //// Observable.Create(observer => - //// { - //// var disposable = new CompositeDisposable(); - //// var setup = false; - //// disposable.Add(client.CombineLatest(message, (cli, mess) => (cli, mess)).Subscribe(async c => - //// { - //// if (!setup) - //// { - //// setup = true; - //// disposable.Add(c.cli.ApplicationMessageProcessed().Retry().Subscribe(args => observer.OnNext(args))); - //// } - - //// var applicationMessage = Create.MqttFactory.CreateApplicationMessageBuilder() - //// .WithTopic(c.mess.topic) - //// .WithPayload(c.mess.payLoad) - //// .WithQualityOfServiceLevel(qos) - //// .WithRetainFlag(retain) - //// .Build(); - - //// try - //// { - //// await c.cli.EnqueueAsync(applicationMessage); - //// } - //// catch (Exception ex) - //// { - //// observer.OnError(ex); - //// } - //// })); - - //// return disposable; - //// }).Retry(); + /// + /// Publishes the message. + /// + /// The client. + /// The message. + /// The qos. + /// if set to true [retain]. + /// A Mqtt Client Publish Result. + public static IObservable PublishMessage(this IObservable client, IObservable<(string topic, string payLoad)> message, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.ExactlyOnce, bool retain = true) => + Observable.Create(observer => + { + var disposable = new CompositeDisposable(); + var setup = false; + disposable.Add(client.CombineLatest(message, (cli, mess) => (cli, mess)).Subscribe(async c => + { + if (!setup) + { + setup = true; + disposable.Add(c.cli.ApplicationMessageProcessed.Retry().Subscribe(args => observer.OnNext(args))); + } + + var applicationMessage = Create.MqttFactory.CreateApplicationMessageBuilder() + .WithTopic(c.mess.topic) + .WithPayload(c.mess.payLoad) + .WithQualityOfServiceLevel(qos) + .WithRetainFlag(retain) + .Build(); + + try + { + await c.cli.EnqueueAsync(applicationMessage); + } + catch (Exception ex) + { + observer.OnError(ex); + } + })); + + return disposable; + }).Retry(); + + /// + /// Publishes the message. + /// + /// The client. + /// The message. + /// The qos. + /// if set to true [retain]. + /// A Mqtt Client Publish Result. + public static IObservable PublishMessage(this IObservable client, IObservable<(string topic, byte[] payLoad)> message, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.ExactlyOnce, bool retain = true) => + Observable.Create(observer => + { + var disposable = new CompositeDisposable(); + var setup = false; + disposable.Add(client.CombineLatest(message, (cli, mess) => (cli, mess)).Subscribe(async c => + { + if (!setup) + { + setup = true; + disposable.Add(c.cli.ApplicationMessageProcessed.Retry().Subscribe(args => observer.OnNext(args))); + } + + var applicationMessage = Create.MqttFactory.CreateApplicationMessageBuilder() + .WithTopic(c.mess.topic) + .WithPayload(c.mess.payLoad) + .WithQualityOfServiceLevel(qos) + .WithRetainFlag(retain) + .Build(); + + try + { + await c.cli.EnqueueAsync(applicationMessage); + } + catch (Exception ex) + { + observer.OnError(ex); + } + })); + + return disposable; + }).Retry(); /// /// Publishes the message. diff --git a/src/MQTTnet.Rx.Client/MqttdSubscribeExtensions.cs b/src/MQTTnet.Rx.Client/MqttdSubscribeExtensions.cs index 409bcf0..1cc47ec 100644 --- a/src/MQTTnet.Rx.Client/MqttdSubscribeExtensions.cs +++ b/src/MQTTnet.Rx.Client/MqttdSubscribeExtensions.cs @@ -14,7 +14,7 @@ namespace MQTTnet.Rx.Client; public static class MqttdSubscribeExtensions { private static readonly Dictionary> _dictJsonValues = []; - ////private static readonly Dictionary> _managedSubscribeToTopicClients = []; + private static readonly Dictionary> _managedSubscribeToTopicClients = []; private static readonly Dictionary> _unmanagedSubscribeToTopicClients = []; /// @@ -247,132 +247,132 @@ public static IObservable SubscribeToTo return disposable; }).Retry().Publish().RefCount(); - /////// - /////// Discovers the topics. - /////// - /////// The client. - /////// The topic expiry, topics are removed if they do not publish a value within this time. - /////// - /////// A List of topics. - /////// - ////public static IObservable> DiscoverTopics(this IObservable client, TimeSpan? topicExpiry = null) => - //// Observable.Create>(observer => - //// { - //// if (topicExpiry == null) - //// { - //// topicExpiry = TimeSpan.FromHours(1); - //// } - - //// if (topicExpiry.Value.TotalSeconds < 1) - //// { - //// throw new ArgumentOutOfRangeException(nameof(topicExpiry), "Topic expiry must be greater or equal to one."); - //// } - - //// var disposable = new CompositeDisposable(); - //// var semaphore = new SemaphoreSlim(1); - //// disposable.Add(semaphore); - //// var topics = new List<(string Topic, DateTime LastSeen)>(); - //// var cleanupTopics = false; - //// var lastCount = -1; - //// disposable.Add(client.SubscribeToTopic("#").Select(m => m.ApplicationMessage.Topic) - //// .Merge(Observable.Interval(TimeSpan.FromMinutes(1)).Select(_ => string.Empty)).Subscribe(topic => - //// { - //// semaphore.Wait(); - //// if (string.IsNullOrEmpty(topic)) - //// { - //// cleanupTopics = true; - //// } - //// else if (topics.Select(x => x.Topic).Contains(topic)) - //// { - //// topics.RemoveAll(x => x.Topic == topic); - //// topics.Add((topic, DateTime.UtcNow)); - //// } - //// else - //// { - //// topics.Add((topic, DateTime.UtcNow)); - //// } - - //// if (cleanupTopics || lastCount != topics.Count) - //// { - //// topics.RemoveAll(x => DateTime.UtcNow.Subtract(x.LastSeen) > topicExpiry); - //// lastCount = topics.Count; - //// cleanupTopics = false; - //// observer.OnNext(topics); - //// } - - //// semaphore.Release(); - //// })); - - //// return disposable; - //// }).Retry().Publish().RefCount(); - - /////// - /////// Subscribes to topic. - /////// - /////// The client. - /////// The topic. - /////// An Observable Mqtt Client Subscribe Result. - ////public static IObservable SubscribeToTopic(this IObservable client, string topic) => - //// Observable.Create(observer => - //// { - //// var disposable = new CompositeDisposable(); - //// IManagedMqttClient? mqttClient = null; - //// disposable.Add(client.Subscribe(async c => - //// { - //// mqttClient = c; - //// if (!_managedSubscribeToTopicClients.TryGetValue(mqttClient, out var value)) - //// { - //// value = new([(topic, 0)]); - //// _managedSubscribeToTopicClients.Add(mqttClient, value); - //// } - //// else if (!value.Any(x => x.topic == topic)) - //// { - //// value.Add((topic, 0)); - //// } - - //// var check = value.Find(x => x.topic == topic); - //// if (!EqualityComparer<(string topic, int count)>.Default.Equals(check, default)) - //// { - //// disposable.Add(mqttClient.ApplicationMessageReceived().WhereTopicIsMatch(topic).Subscribe(observer)); - //// check.count++; - //// if (check.count == 1) - //// { - //// var mqttSubscribeOptions = Create.MqttFactory.CreateTopicFilterBuilder() - //// .WithTopic(topic) - //// .Build(); - - //// await mqttClient.SubscribeAsync([mqttSubscribeOptions]); - //// } - //// } - //// })); - //// return Disposable.Create(async () => - //// { - //// try - //// { - //// if (mqttClient != null && _managedSubscribeToTopicClients.TryGetValue(mqttClient, out var value)) - //// { - //// var check = value.Find(x => x.topic == topic); - //// if (!EqualityComparer<(string topic, int count)>.Default.Equals(check, default)) - //// { - //// check.count--; - //// if (check.count == 0) - //// { - //// await mqttClient.UnsubscribeAsync([topic]).ConfigureAwait(false); - //// } - //// } - //// } - - //// disposable.Dispose(); - //// } - //// catch (ObjectDisposedException) - //// { - //// } - //// catch (Exception exception) - //// { - //// observer.OnError(exception); - //// } - //// }); - //// }).Retry().Publish().RefCount(); + /// + /// Discovers the topics. + /// + /// The client. + /// The topic expiry, topics are removed if they do not publish a value within this time. + /// + /// A List of topics. + /// + public static IObservable> DiscoverTopics(this IObservable client, TimeSpan? topicExpiry = null) => + Observable.Create>(observer => + { + if (topicExpiry == null) + { + topicExpiry = TimeSpan.FromHours(1); + } + + if (topicExpiry.Value.TotalSeconds < 1) + { + throw new ArgumentOutOfRangeException(nameof(topicExpiry), "Topic expiry must be greater or equal to one."); + } + + var disposable = new CompositeDisposable(); + var semaphore = new SemaphoreSlim(1); + disposable.Add(semaphore); + var topics = new List<(string Topic, DateTime LastSeen)>(); + var cleanupTopics = false; + var lastCount = -1; + disposable.Add(client.SubscribeToTopic("#").Select(m => m.ApplicationMessage.Topic) + .Merge(Observable.Interval(TimeSpan.FromMinutes(1)).Select(_ => string.Empty)).Subscribe(topic => + { + semaphore.Wait(); + if (string.IsNullOrEmpty(topic)) + { + cleanupTopics = true; + } + else if (topics.Select(x => x.Topic).Contains(topic)) + { + topics.RemoveAll(x => x.Topic == topic); + topics.Add((topic, DateTime.UtcNow)); + } + else + { + topics.Add((topic, DateTime.UtcNow)); + } + + if (cleanupTopics || lastCount != topics.Count) + { + topics.RemoveAll(x => DateTime.UtcNow.Subtract(x.LastSeen) > topicExpiry); + lastCount = topics.Count; + cleanupTopics = false; + observer.OnNext(topics); + } + + semaphore.Release(); + })); + + return disposable; + }).Retry().Publish().RefCount(); + + /// + /// Subscribes to topic. + /// + /// The client. + /// The topic. + /// An Observable Mqtt Client Subscribe Result. + public static IObservable SubscribeToTopic(this IObservable client, string topic) => + Observable.Create(observer => + { + var disposable = new CompositeDisposable(); + IResilientMqttClient? mqttClient = null; + disposable.Add(client.Subscribe(async c => + { + mqttClient = c; + if (!_managedSubscribeToTopicClients.TryGetValue(mqttClient, out var value)) + { + value = new([(topic, 0)]); + _managedSubscribeToTopicClients.Add(mqttClient, value); + } + else if (!value.Any(x => x.topic == topic)) + { + value.Add((topic, 0)); + } + + var check = value.Find(x => x.topic == topic); + if (!EqualityComparer<(string topic, int count)>.Default.Equals(check, default)) + { + disposable.Add(mqttClient.ApplicationMessageReceived.WhereTopicIsMatch(topic).Subscribe(observer)); + check.count++; + if (check.count == 1) + { + var mqttSubscribeOptions = Create.MqttFactory.CreateTopicFilterBuilder() + .WithTopic(topic) + .Build(); + + await mqttClient.SubscribeAsync([mqttSubscribeOptions]); + } + } + })); + return Disposable.Create(async () => + { + try + { + if (mqttClient != null && _managedSubscribeToTopicClients.TryGetValue(mqttClient, out var value)) + { + var check = value.Find(x => x.topic == topic); + if (!EqualityComparer<(string topic, int count)>.Default.Equals(check, default)) + { + check.count--; + if (check.count == 0) + { + await mqttClient.UnsubscribeAsync([topic]).ConfigureAwait(false); + } + } + } + + disposable.Dispose(); + } + catch (ObjectDisposedException) + { + } + catch (Exception exception) + { + observer.OnError(exception); + } + }); + }).Retry().Publish().RefCount(); /// /// Filters allowing only the topics which match the specified topic. diff --git a/src/MQTTnet.Rx.Client/ResilientClient/Domain/IResilientMqttClient.cs b/src/MQTTnet.Rx.Client/ResilientClient/Domain/IResilientMqttClient.cs new file mode 100644 index 0000000..64eb882 --- /dev/null +++ b/src/MQTTnet.Rx.Client/ResilientClient/Domain/IResilientMqttClient.cs @@ -0,0 +1,195 @@ +// Copyright (c) Chris Pulman. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using MQTTnet.Packets; + +namespace MQTTnet.Rx.Client; + +/// +/// IResilientMqttClient. +/// +/// +public interface IResilientMqttClient : IDisposable +{ + /// + /// Occurs when [application message processed asynchronous]. + /// + event Func ApplicationMessageProcessedAsync; + + /// + /// Occurs when [application message received asynchronous]. + /// + event Func ApplicationMessageReceivedAsync; + + /// + /// Occurs when [application message skipped asynchronous]. + /// + event Func ApplicationMessageSkippedAsync; + + /// + /// Occurs when [connected asynchronous]. + /// + event Func ConnectedAsync; + + /// + /// Occurs when [connecting failed asynchronous]. + /// + event Func ConnectingFailedAsync; + + /// + /// Occurs when [connection state changed asynchronous]. + /// + event Func ConnectionStateChangedAsync; + + /// + /// Occurs when [disconnected asynchronous]. + /// + event Func DisconnectedAsync; + + /// + /// Occurs when [synchronizing subscriptions failed asynchronous]. + /// + event Func SynchronizingSubscriptionsFailedAsync; + + /// + /// Occurs when [subscriptions changed asynchronous]. + /// + event Func SubscriptionsChangedAsync; + + /// + /// Gets application messages processed. + /// + /// A Application Message Processed Event Args. + IObservable ApplicationMessageProcessed { get; } + + /// + /// Gets connected to the specified client. + /// + /// A Mqtt Client Connected Event Args. + IObservable Connected { get; } + + /// + /// Gets disconnected from the specified client. + /// + /// A Mqtt Client Disconnected Event Args. + IObservable Disconnected { get; } + + /// + /// Gets connecting failed. + /// + /// A Connecting Failed Event Args. + IObservable ConnectingFailed { get; } + + /// + /// Gets connection state changed. + /// + /// Event Args. + IObservable ConnectionStateChanged { get; } + + /// + /// Gets synchronizing subscriptions failed. + /// + /// A Resilient Process Failed Event Args. + IObservable SynchronizingSubscriptionsFailed { get; } + + /// + /// Gets application messages processed. + /// + /// A Application Message Skipped Event Args. + IObservable ApplicationMessageSkipped { get; } + + /// + /// Gets application messages received. + /// + /// A Mqtt Application Message Received Event Args. + IObservable ApplicationMessageReceived { get; } + + /// + /// Gets the internal client. + /// + /// + /// The internal client. + /// + IMqttClient InternalClient { get; } + + /// + /// Gets a value indicating whether this instance is connected. + /// + /// + /// true if this instance is connected; otherwise, false. + /// + bool IsConnected { get; } + + /// + /// Gets a value indicating whether this instance is started. + /// + /// + /// true if this instance is started; otherwise, false. + /// + bool IsStarted { get; } + + /// + /// Gets the options. + /// + /// + /// The options. + /// + ResilientMqttClientOptions? Options { get; } + + /// + /// Gets the pending application messages count. + /// + /// + /// The pending application messages count. + /// + int PendingApplicationMessagesCount { get; } + + /// + /// Enqueues the asynchronous. + /// + /// The application message. + /// A representing the asynchronous operation. + Task EnqueueAsync(MqttApplicationMessage applicationMessage); + + /// + /// Enqueues the asynchronous. + /// + /// The application message. + /// A representing the asynchronous operation. + Task EnqueueAsync(ResilientMqttApplicationMessage applicationMessage); + + /// + /// Pings the asynchronous. + /// + /// The cancellation token. + /// A representing the asynchronous operation. + Task PingAsync(CancellationToken cancellationToken = default); + + /// + /// Starts the asynchronous. + /// + /// The options. + /// A representing the asynchronous operation. + Task StartAsync(ResilientMqttClientOptions options); + + /// + /// Stops the asynchronous. + /// + /// if set to true [clean disconnect]. + /// A representing the asynchronous operation. + Task StopAsync(bool cleanDisconnect = true); + + /// + /// Subscribes the asynchronous. + /// + /// The topic filters. + /// A representing the asynchronous operation. + Task SubscribeAsync(IEnumerable topicFilters); + + /// + /// Unsubscribes the asynchronous. + /// + /// The topics. + /// A representing the asynchronous operation. + Task UnsubscribeAsync(IEnumerable topics); +} diff --git a/src/MQTTnet.Rx.Client/ResilientClient/Domain/IResilientMqttClientStorage.cs b/src/MQTTnet.Rx.Client/ResilientClient/Domain/IResilientMqttClientStorage.cs new file mode 100644 index 0000000..b50211d --- /dev/null +++ b/src/MQTTnet.Rx.Client/ResilientClient/Domain/IResilientMqttClientStorage.cs @@ -0,0 +1,23 @@ +// Copyright (c) Chris Pulman. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace MQTTnet.Rx.Client; + +/// +/// IResilient Mqtt Client Storage. +/// +public interface IResilientMqttClientStorage +{ + /// + /// Saves the queued messages asynchronous. + /// + /// The messages. + /// A representing the asynchronous operation. + Task SaveQueuedMessagesAsync(IList messages); + + /// + /// Loads the queued messages asynchronous. + /// + /// A representing the asynchronous operation. + Task> LoadQueuedMessagesAsync(); +} diff --git a/src/MQTTnet.Rx.Client/ResilientClient/EventArgs/ApplicationMessageProcessedEventArgs.cs b/src/MQTTnet.Rx.Client/ResilientClient/EventArgs/ApplicationMessageProcessedEventArgs.cs new file mode 100644 index 0000000..51e9099 --- /dev/null +++ b/src/MQTTnet.Rx.Client/ResilientClient/EventArgs/ApplicationMessageProcessedEventArgs.cs @@ -0,0 +1,30 @@ +// Copyright (c) Chris Pulman. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace MQTTnet.Rx.Client; + +/// +/// Application Message Processed EventArgs. +/// +/// +/// +/// Initializes a new instance of the class. +/// +/// The application message. +/// The exception. +/// applicationMessage. +public sealed class ApplicationMessageProcessedEventArgs(ResilientMqttApplicationMessage applicationMessage, Exception? exception) : EventArgs +{ + /// + /// Gets the application message. + /// + /// + /// The application message. + /// + public ResilientMqttApplicationMessage ApplicationMessage { get; } = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage)); + + /// + /// Gets then this is _null_ the message was processed successfully without any error. + /// + public Exception? Exception { get; } = exception; +} diff --git a/src/MQTTnet.Rx.Client/ResilientClient/EventArgs/ApplicationMessageSkippedEventArgs.cs b/src/MQTTnet.Rx.Client/ResilientClient/EventArgs/ApplicationMessageSkippedEventArgs.cs new file mode 100644 index 0000000..33db97f --- /dev/null +++ b/src/MQTTnet.Rx.Client/ResilientClient/EventArgs/ApplicationMessageSkippedEventArgs.cs @@ -0,0 +1,24 @@ +// Copyright (c) Chris Pulman. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace MQTTnet.Rx.Client; + +/// +/// Application Message Skipped EventArgs. +/// +/// +/// +/// Initializes a new instance of the class. +/// +/// The application message. +/// applicationMessage. +public sealed class ApplicationMessageSkippedEventArgs(ResilientMqttApplicationMessage applicationMessage) : EventArgs +{ + /// + /// Gets the application message. + /// + /// + /// The application message. + /// + public ResilientMqttApplicationMessage ApplicationMessage { get; } = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage)); +} diff --git a/src/MQTTnet.Rx.Client/ResilientClient/EventArgs/ConnectingFailedEventArgs.cs b/src/MQTTnet.Rx.Client/ResilientClient/EventArgs/ConnectingFailedEventArgs.cs new file mode 100644 index 0000000..d6edbeb --- /dev/null +++ b/src/MQTTnet.Rx.Client/ResilientClient/EventArgs/ConnectingFailedEventArgs.cs @@ -0,0 +1,24 @@ +// Copyright (c) Chris Pulman. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace MQTTnet.Rx.Client; + +/// +/// Connecting Failed EventArgs. +/// +/// +public sealed class ConnectingFailedEventArgs(MqttClientConnectResult? connectResult, Exception exception) : EventArgs +{ + /// + /// Gets this is null when the connection was failing and the server was not reachable. + /// + public MqttClientConnectResult? ConnectResult { get; } = connectResult; + + /// + /// Gets the exception. + /// + /// + /// The exception. + /// + public Exception Exception { get; } = exception; +} diff --git a/src/MQTTnet.Rx.Client/ResilientClient/EventArgs/InterceptingPublishMessageEventArgs.cs b/src/MQTTnet.Rx.Client/ResilientClient/EventArgs/InterceptingPublishMessageEventArgs.cs new file mode 100644 index 0000000..9c91006 --- /dev/null +++ b/src/MQTTnet.Rx.Client/ResilientClient/EventArgs/InterceptingPublishMessageEventArgs.cs @@ -0,0 +1,27 @@ +// Copyright (c) Chris Pulman. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace MQTTnet.Rx.Client; + +/// +/// Intercepting Publish Message EventArgs. +/// +/// +public sealed class InterceptingPublishMessageEventArgs(ResilientMqttApplicationMessage applicationMessage) : EventArgs +{ + /// + /// Gets the application message. + /// + /// + /// The application message. + /// + public ResilientMqttApplicationMessage ApplicationMessage { get; } = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage)); + + /// + /// Gets or sets a value indicating whether [accept publish]. + /// + /// + /// true if [accept publish]; otherwise, false. + /// + public bool AcceptPublish { get; set; } = true; +} diff --git a/src/MQTTnet.Rx.Client/ResilientClient/EventArgs/ResilientProcessFailedEventArgs.cs b/src/MQTTnet.Rx.Client/ResilientClient/EventArgs/ResilientProcessFailedEventArgs.cs new file mode 100644 index 0000000..88dd50c --- /dev/null +++ b/src/MQTTnet.Rx.Client/ResilientClient/EventArgs/ResilientProcessFailedEventArgs.cs @@ -0,0 +1,67 @@ +// Copyright (c) Chris Pulman. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using MQTTnet.Packets; + +namespace MQTTnet.Rx.Client; + +/// +/// Resilient Process Failed EventArgs. +/// +/// +public class ResilientProcessFailedEventArgs : EventArgs +{ + /// + /// Initializes a new instance of the class. + /// + /// The exception. + /// The added subscriptions. + /// The removed subscriptions. + /// exception. + public ResilientProcessFailedEventArgs(Exception exception, List? addedSubscriptions, List? removedSubscriptions) + { + Exception = exception ?? throw new ArgumentNullException(nameof(exception)); + + if (addedSubscriptions != null) + { + AddedSubscriptions = new List(addedSubscriptions.Select(item => item.Topic)); + } + else + { + AddedSubscriptions = []; + } + + if (removedSubscriptions != null) + { + RemovedSubscriptions = new List(removedSubscriptions); + } + else + { + RemovedSubscriptions = []; + } + } + + /// + /// Gets the exception. + /// + /// + /// The exception. + /// + public Exception Exception { get; } + + /// + /// Gets the added subscriptions. + /// + /// + /// The added subscriptions. + /// + public List AddedSubscriptions { get; } + + /// + /// Gets the removed subscriptions. + /// + /// + /// The removed subscriptions. + /// + public List RemovedSubscriptions { get; } +} diff --git a/src/MQTTnet.Rx.Client/ResilientClient/EventArgs/SubscriptionsChangedEventArgs.cs b/src/MQTTnet.Rx.Client/ResilientClient/EventArgs/SubscriptionsChangedEventArgs.cs new file mode 100644 index 0000000..d9facd4 --- /dev/null +++ b/src/MQTTnet.Rx.Client/ResilientClient/EventArgs/SubscriptionsChangedEventArgs.cs @@ -0,0 +1,37 @@ +// Copyright (c) Chris Pulman. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace MQTTnet.Rx.Client; + +/// +/// Subscriptions Changed EventArgs. +/// +/// +/// +/// Initializes a new instance of the class. +/// +/// The subscribe result. +/// The unsubscribe result. +/// +/// subscribeResult +/// or +/// unsubscribeResult. +/// +public sealed class SubscriptionsChangedEventArgs(List subscribeResult, List unsubscribeResult) : EventArgs +{ + /// + /// Gets the subscribe result. + /// + /// + /// The subscribe result. + /// + public List SubscribeResult { get; } = subscribeResult ?? throw new ArgumentNullException(nameof(subscribeResult)); + + /// + /// Gets the unsubscribe result. + /// + /// + /// The unsubscribe result. + /// + public List UnsubscribeResult { get; } = unsubscribeResult ?? throw new ArgumentNullException(nameof(unsubscribeResult)); +} diff --git a/src/MQTTnet.Rx.Client/ResilientClient/Internal/ResilientMqttApplicationMessageBuilder.cs b/src/MQTTnet.Rx.Client/ResilientClient/Internal/ResilientMqttApplicationMessageBuilder.cs new file mode 100644 index 0000000..9914959 --- /dev/null +++ b/src/MQTTnet.Rx.Client/ResilientClient/Internal/ResilientMqttApplicationMessageBuilder.cs @@ -0,0 +1,71 @@ +// Copyright (c) Chris Pulman. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace MQTTnet.Rx.Client.ResilientClient.Internal; + +/// +/// Resilient Mqtt Application Message Builder. +/// +internal class ResilientMqttApplicationMessageBuilder +{ + private Guid _id = Guid.NewGuid(); + private MqttApplicationMessage? _applicationMessage; + + /// + /// Withes the identifier. + /// + /// The identifier. + /// Resilient Mqtt Application Message Builder. + public ResilientMqttApplicationMessageBuilder WithId(in Guid id) + { + _id = id; + return this; + } + + /// + /// Withes the application message. + /// + /// The application message. + /// Resilient Mqtt Application Message Builder. + public ResilientMqttApplicationMessageBuilder WithApplicationMessage(MqttApplicationMessage applicationMessage) + { + _applicationMessage = applicationMessage; + return this; + } + + /// + /// Withes the application message. + /// + /// The builder. + /// Resilient Mqtt Application Message Builder. + /// builder. + public ResilientMqttApplicationMessageBuilder WithApplicationMessage(Action builder) + { + ArgumentNullException.ThrowIfNull(builder); + + var internalBuilder = new MqttApplicationMessageBuilder(); + builder(internalBuilder); + + _applicationMessage = internalBuilder.Build(); + return this; + } + + /// + /// Builds this instance. + /// + /// Resilient Mqtt Application Message. + /// The ApplicationMessage cannot be null. + public ResilientMqttApplicationMessage Build() + { + if (_applicationMessage == null) + { + throw new InvalidOperationException("The ApplicationMessage cannot be null."); + } + + return new() + { + Id = _id, + ApplicationMessage = _applicationMessage + }; + } +} diff --git a/src/MQTTnet.Rx.Client/ResilientClient/Internal/ResilientMqttClient.cs b/src/MQTTnet.Rx.Client/ResilientClient/Internal/ResilientMqttClient.cs new file mode 100644 index 0000000..0836ed4 --- /dev/null +++ b/src/MQTTnet.Rx.Client/ResilientClient/Internal/ResilientMqttClient.cs @@ -0,0 +1,976 @@ +// Copyright (c) Chris Pulman. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using MQTTnet.Diagnostics.Logger; +using MQTTnet.Exceptions; +using MQTTnet.Internal; +using MQTTnet.Packets; +using MQTTnet.Protocol; + +namespace MQTTnet.Rx.Client.ResilientClient.Internal; + +/// +/// Resilient Mqtt Client. +/// +/// +/// +internal sealed class ResilientMqttClient : Disposable, IResilientMqttClient +{ + private readonly MqttNetSourceLogger _logger; + + private readonly AsyncEvent _interceptingPublishMessageEvent = new(); + private readonly AsyncEvent _applicationMessageProcessedEvent = new(); + private readonly AsyncEvent _applicationMessageSkippedEvent = new(); + private readonly AsyncEvent _connectingFailedEvent = new(); + private readonly AsyncEvent _connectionStateChangedEvent = new(); + private readonly AsyncEvent _synchronizingSubscriptionsFailedEvent = new(); + private readonly AsyncEvent _subscriptionsChangedEvent = new(); + + private readonly BlockingQueue _messageQueue = new(); + private readonly AsyncLock _messageQueueLock = new(); + private readonly Dictionary _reconnectSubscriptions = []; + + private readonly Dictionary _subscriptions = []; + private readonly SemaphoreSlim _subscriptionsQueuedSignal = new(0); + private readonly HashSet _unsubscriptions = []; + + private CancellationTokenSource? _connectionCancellationToken; + private Task? _maintainConnectionTask; + private CancellationTokenSource? _publishingCancellationToken; + + private ResilientMqttClientStorageManager? _storageManager; + private bool _isCleanDisconnect; + + /// + /// Initializes a new instance of the class. + /// + /// The MQTT client. + /// The logger. + /// + /// mqttClient + /// or + /// logger. + /// + public ResilientMqttClient(IMqttClient mqttClient, IMqttNetLogger logger) + { + InternalClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient)); + + ArgumentNullException.ThrowIfNull(logger); + + _logger = logger.WithSource(nameof(ResilientMqttClient)); + } + + /// + /// Occurs when [application message skipped asynchronous]. + /// + public event Func ApplicationMessageSkippedAsync + { + add => _applicationMessageSkippedEvent.AddHandler(value); + remove => _applicationMessageSkippedEvent.RemoveHandler(value); + } + + /// + /// Occurs when [application message processed asynchronous]. + /// + public event Func ApplicationMessageProcessedAsync + { + add => _applicationMessageProcessedEvent.AddHandler(value); + remove => _applicationMessageProcessedEvent.RemoveHandler(value); + } + + /// + /// Occurs when [intercept publish message asynchronous]. + /// + public event Func InterceptPublishMessageAsync + { + add => _interceptingPublishMessageEvent.AddHandler(value); + remove => _interceptingPublishMessageEvent.RemoveHandler(value); + } + + /// + /// Occurs when [application message received asynchronous]. + /// + public event Func ApplicationMessageReceivedAsync + { + add => InternalClient.ApplicationMessageReceivedAsync += value; + remove => InternalClient.ApplicationMessageReceivedAsync -= value; + } + + /// + /// Occurs when [connected asynchronous]. + /// + public event Func ConnectedAsync + { + add => InternalClient.ConnectedAsync += value; + remove => InternalClient.ConnectedAsync -= value; + } + + /// + /// Occurs when [connecting failed asynchronous]. + /// + public event Func ConnectingFailedAsync + { + add => _connectingFailedEvent.AddHandler(value); + remove => _connectingFailedEvent.RemoveHandler(value); + } + + /// + /// Occurs when [connection state changed asynchronous]. + /// + public event Func ConnectionStateChangedAsync + { + add => _connectionStateChangedEvent.AddHandler(value); + remove => _connectionStateChangedEvent.RemoveHandler(value); + } + + /// + /// Occurs when [disconnected asynchronous]. + /// + public event Func DisconnectedAsync + { + add => InternalClient.DisconnectedAsync += value; + remove => InternalClient.DisconnectedAsync -= value; + } + + /// + /// Occurs when [synchronizing subscriptions failed asynchronous]. + /// + public event Func SynchronizingSubscriptionsFailedAsync + { + add => _synchronizingSubscriptionsFailedEvent.AddHandler(value); + remove => _synchronizingSubscriptionsFailedEvent.RemoveHandler(value); + } + + /// + /// Occurs when [subscriptions changed asynchronous]. + /// + public event Func SubscriptionsChangedAsync + { + add => _subscriptionsChangedEvent.AddHandler(value); + remove => _subscriptionsChangedEvent.RemoveHandler(value); + } + + /// + /// Gets application messages processed. + /// + /// A Application Message Processed Event Args. + public IObservable ApplicationMessageProcessed => + CreateObservable.FromAsyncEvent( + handler => ApplicationMessageProcessedAsync += handler, + handler => ApplicationMessageProcessedAsync -= handler); + + /// + /// Gets connected to the specified client. + /// + /// A Mqtt Client Connected Event Args. + public IObservable Connected => + CreateObservable.FromAsyncEvent( + handler => ConnectedAsync += handler, + handler => ConnectedAsync -= handler); + + /// + /// Gets disconnected from the specified client. + /// + /// A Mqtt Client Disconnected Event Args. + public IObservable Disconnected => + CreateObservable.FromAsyncEvent( + handler => DisconnectedAsync += handler, + handler => DisconnectedAsync -= handler); + + /// + /// Gets connecting failed. + /// + /// A Connecting Failed Event Args. + public IObservable ConnectingFailed => + CreateObservable.FromAsyncEvent( + handler => ConnectingFailedAsync += handler, + handler => ConnectingFailedAsync -= handler); + + /// + /// Gets connection state changed. + /// + /// Event Args. + public IObservable ConnectionStateChanged => + CreateObservable.FromAsyncEvent( + handler => ConnectionStateChangedAsync += handler, + handler => ConnectionStateChangedAsync -= handler); + + /// + /// Gets synchronizing subscriptions failed. + /// + /// A Resilient Process Failed Event Args. + public IObservable SynchronizingSubscriptionsFailed => + CreateObservable.FromAsyncEvent( + handler => SynchronizingSubscriptionsFailedAsync += handler, + handler => SynchronizingSubscriptionsFailedAsync -= handler); + + /// + /// Gets application messages processed. + /// + /// A Application Message Skipped Event Args. + public IObservable ApplicationMessageSkipped => + CreateObservable.FromAsyncEvent( + handler => ApplicationMessageSkippedAsync += handler, + handler => ApplicationMessageSkippedAsync -= handler); + + /// + /// Gets application messages received. + /// + /// A Mqtt Application Message Received Event Args. + public IObservable ApplicationMessageReceived => + CreateObservable.FromAsyncEvent( + handler => ApplicationMessageReceivedAsync += handler, + handler => ApplicationMessageReceivedAsync -= handler); + + /// + /// Gets the internal client. + /// + /// + /// The internal client. + /// + public IMqttClient InternalClient { get; } + + /// + /// Gets a value indicating whether this instance is connected. + /// + /// + /// true if this instance is connected; otherwise, false. + /// + public bool IsConnected => InternalClient.IsConnected; + + /// + /// Gets a value indicating whether this instance is started. + /// + /// + /// true if this instance is started; otherwise, false. + /// + public bool IsStarted => _connectionCancellationToken != null; + + /// + /// Gets the options. + /// + /// + /// The options. + /// + public ResilientMqttClientOptions? Options { get; private set; } + + /// + /// Gets the pending application messages count. + /// + /// + /// The pending application messages count. + /// + public int PendingApplicationMessagesCount => _messageQueue.Count; + + /// + /// Enqueues the asynchronous. + /// + /// The application message. + /// applicationMessage. + /// A representing the asynchronous operation. + public async Task EnqueueAsync(MqttApplicationMessage applicationMessage) + { + ThrowIfDisposed(); + + ArgumentNullException.ThrowIfNull(applicationMessage); + + var managedMqttApplicationMessage = new ResilientMqttApplicationMessageBuilder().WithApplicationMessage(applicationMessage); + await EnqueueAsync(managedMqttApplicationMessage.Build()).ConfigureAwait(false); + } + + /// + /// Enqueues the asynchronous. + /// + /// The application message. + /// applicationMessage. + /// call StartAsync before publishing messages. + /// A representing the asynchronous operation. + public async Task EnqueueAsync(ResilientMqttApplicationMessage applicationMessage) + { + ThrowIfDisposed(); + + ArgumentNullException.ThrowIfNull(applicationMessage); + + if (Options == null) + { + throw new InvalidOperationException("call StartAsync before publishing messages"); + } + + MqttTopicValidator.ThrowIfInvalid(applicationMessage.ApplicationMessage); + + ResilientMqttApplicationMessage? removedMessage = null; + ApplicationMessageSkippedEventArgs? applicationMessageSkippedEventArgs = null; + + try + { + using (await _messageQueueLock.EnterAsync().ConfigureAwait(false)) + { + if (_messageQueue.Count >= Options.MaxPendingMessages) + { + if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage) + { + _logger.Verbose("Skipping publish of new application message because internal queue is full."); + applicationMessageSkippedEventArgs = new ApplicationMessageSkippedEventArgs(applicationMessage); + return; + } + + if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage) + { + removedMessage = _messageQueue.RemoveFirst(); + _logger.Verbose("Removed oldest application message from internal queue because it is full."); + applicationMessageSkippedEventArgs = new ApplicationMessageSkippedEventArgs(removedMessage); + } + } + + _messageQueue.Enqueue(applicationMessage); + + if (_storageManager != null) + { + if (removedMessage != null) + { + await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false); + } + + await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false); + } + } + } + finally + { + if (applicationMessageSkippedEventArgs != null && _applicationMessageSkippedEvent.HasHandlers) + { + await _applicationMessageSkippedEvent.InvokeAsync(applicationMessageSkippedEventArgs).ConfigureAwait(false); + } + } + } + + /// + /// Pings the asynchronous. + /// + /// The cancellation token. + /// + /// A representing the asynchronous operation. + /// + public Task PingAsync(CancellationToken cancellationToken = default) => InternalClient.PingAsync(cancellationToken); + + /// + /// Starts the asynchronous. + /// + /// The options. + /// options. + /// The client options are not set. - options. + /// The managed client is already started. + /// A representing the asynchronous operation. + public async Task StartAsync(ResilientMqttClientOptions options) + { + ThrowIfDisposed(); + + ArgumentNullException.ThrowIfNull(options); + + if (options.ClientOptions == null) + { + throw new ArgumentException("The client options are not set.", nameof(options)); + } + + if (!_maintainConnectionTask?.IsCompleted ?? false) + { + throw new InvalidOperationException("The managed client is already started."); + } + + Options = options; + + if (options.Storage != null) + { + _storageManager = new ResilientMqttClientStorageManager(options.Storage); + var messages = await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false); + + foreach (var message in messages) + { + _messageQueue.Enqueue(message); + } + } + + var cancellationTokenSource = new CancellationTokenSource(); + var cancellationToken = cancellationTokenSource.Token; + _connectionCancellationToken = cancellationTokenSource; + + _maintainConnectionTask = Task.Run(() => MaintainConnectionAsync(cancellationToken), cancellationToken); + _maintainConnectionTask.RunInBackground(_logger); + + _logger.Info("Started"); + } + + /// + /// Stops the asynchronous. + /// + /// if set to true [clean disconnect]. + /// A representing the asynchronous operation. + public async Task StopAsync(bool cleanDisconnect = true) + { + ThrowIfDisposed(); + + _isCleanDisconnect = cleanDisconnect; + + StopPublishing(); + StopMaintainingConnection(); + + _messageQueue.Clear(); + + if (_maintainConnectionTask != null) + { + await Task.WhenAny(_maintainConnectionTask); + _maintainConnectionTask = null; + } + } + + /// + /// Subscribes the asynchronous. + /// + /// The topic filters. + /// + /// A representing the asynchronous operation. + /// + /// topicFilters. + public Task SubscribeAsync(IEnumerable topicFilters) + { + ThrowIfDisposed(); + + ArgumentNullException.ThrowIfNull(topicFilters); + + foreach (var topicFilter in topicFilters) + { + MqttTopicValidator.ThrowIfInvalidSubscribe(topicFilter.Topic); + } + + lock (_subscriptions) + { + foreach (var topicFilter in topicFilters) + { + _subscriptions[topicFilter.Topic] = topicFilter; + _unsubscriptions.Remove(topicFilter.Topic); + } + } + + _subscriptionsQueuedSignal.Release(); + + return CompletedTask.Instance; + } + + /// + /// Unsubscribes the asynchronous. + /// + /// The topics. + /// + /// A representing the asynchronous operation. + /// + /// topics. + public Task UnsubscribeAsync(IEnumerable topics) + { + ThrowIfDisposed(); + + ArgumentNullException.ThrowIfNull(topics); + + lock (_subscriptions) + { + foreach (var topic in topics) + { + _subscriptions.Remove(topic); + _unsubscriptions.Add(topic); + } + } + + _subscriptionsQueuedSignal.Release(); + + return CompletedTask.Instance; + } + + /// + /// Releases unmanaged and - optionally - managed resources. + /// + /// true to release both managed and unmanaged resources; false to release only unmanaged resources. + protected override void Dispose(bool disposing) + { + if (disposing) + { + StopPublishing(); + StopMaintainingConnection(); + + if (_maintainConnectionTask != null) + { + _maintainConnectionTask.GetAwaiter().GetResult(); + _maintainConnectionTask = null; + } + + _messageQueue.Dispose(); + _messageQueueLock.Dispose(); + InternalClient.Dispose(); + _subscriptionsQueuedSignal.Dispose(); + _storageManager?.Dispose(); + } + + base.Dispose(disposing); + } + + private static TimeSpan GetRemainingTime(in DateTime endTime) + { + var remainingTime = endTime - DateTime.UtcNow; + return remainingTime < TimeSpan.Zero ? TimeSpan.Zero : remainingTime; + } + + private CancellationTokenSource NewTimeoutToken(in CancellationToken linkedToken) + { + var newTimeoutToken = CancellationTokenSource.CreateLinkedTokenSource(linkedToken); + newTimeoutToken.CancelAfter(Options!.ClientOptions!.Timeout); + return newTimeoutToken; + } + + private async Task HandleSubscriptionExceptionAsync(Exception exception, List? addedSubscriptions, List? removedSubscriptions) + { + _logger.Warning(exception, "Synchronizing subscriptions failed."); + + if (_synchronizingSubscriptionsFailedEvent.HasHandlers) + { + await _synchronizingSubscriptionsFailedEvent.InvokeAsync(new ResilientProcessFailedEventArgs(exception, addedSubscriptions, removedSubscriptions)).ConfigureAwait(false); + } + } + + private async Task HandleSubscriptionsResultAsync(SendSubscriptionResults subscribeUnsubscribeResult) + { + if (_subscriptionsChangedEvent.HasHandlers) + { + await _subscriptionsChangedEvent.InvokeAsync(new SubscriptionsChangedEventArgs(subscribeUnsubscribeResult.SubscribeResults, subscribeUnsubscribeResult.UnsubscribeResults)).ConfigureAwait(false); + } + } + + private async Task MaintainConnectionAsync(CancellationToken cancellationToken) + { + try + { + while (!cancellationToken.IsCancellationRequested) + { + await TryMaintainConnectionAsync(cancellationToken).ConfigureAwait(false); + } + } + catch (OperationCanceledException) + { + } + catch (Exception exception) + { + _logger.Error(exception, "Error exception while maintaining connection."); + } + finally + { + if (!IsDisposed) + { + try + { + if (_isCleanDisconnect) + { + using (var disconnectTimeout = NewTimeoutToken(CancellationToken.None)) + { + await InternalClient.DisconnectAsync(new MqttClientDisconnectOptions(), disconnectTimeout.Token).ConfigureAwait(false); + } + } + } + catch (OperationCanceledException) + { + _logger.Warning("Timeout while sending DISCONNECT packet."); + } + catch (Exception exception) + { + _logger.Error(exception, "Error while disconnecting."); + } + + _logger.Info("Stopped"); + } + + _reconnectSubscriptions.Clear(); + + lock (_subscriptions) + { + _subscriptions.Clear(); + _unsubscriptions.Clear(); + } + } + } + + private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken) + { + try + { + while (!cancellationToken.IsCancellationRequested && InternalClient.IsConnected) + { + // Peek at the message without dequeueing in order to prevent the + // possibility of the queue growing beyond the configured cap. + // Previously, messages could be re-enqueued if there was an + // exception, and this re-enqueueing did not honor the cap. + // Furthermore, because re-enqueueing would shuffle the order + // of the messages, the DropOldestQueuedMessage strategy would + // be unable to know which message is actually the oldest and would + // instead drop the first item in the queue. + var message = _messageQueue.PeekAndWait(cancellationToken); + if (message == null) + { + continue; + } + + cancellationToken.ThrowIfCancellationRequested(); + + await TryPublishQueuedMessageAsync(message, cancellationToken).ConfigureAwait(false); + } + } + catch (OperationCanceledException) + { + } + catch (Exception exception) + { + _logger.Error(exception, "Error while publishing queued application messages."); + } + finally + { + _logger.Verbose("Stopped publishing messages."); + } + } + + private async Task PublishReconnectSubscriptionsAsync(CancellationToken cancellationToken) + { + _logger.Info("Publishing subscriptions at reconnect"); + + List? topicFilters = null; + + try + { + if (_reconnectSubscriptions.Count > 0) + { + topicFilters = []; + SendSubscriptionResults subscribeUnsubscribeResult; + + foreach (var sub in _reconnectSubscriptions) + { + topicFilters.Add(sub.Value); + + if (topicFilters.Count == Options!.MaxTopicFiltersInSubscribeUnsubscribePackets) + { + subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(topicFilters, null, cancellationToken).ConfigureAwait(false); + topicFilters.Clear(); + await HandleSubscriptionsResultAsync(subscribeUnsubscribeResult).ConfigureAwait(false); + } + } + + subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(topicFilters, null, cancellationToken).ConfigureAwait(false); + await HandleSubscriptionsResultAsync(subscribeUnsubscribeResult).ConfigureAwait(false); + } + } + catch (Exception exception) + { + await HandleSubscriptionExceptionAsync(exception, topicFilters, null).ConfigureAwait(false); + } + } + + private async Task PublishSubscriptionsAsync(TimeSpan timeout, CancellationToken cancellationToken) + { + var endTime = DateTime.UtcNow + timeout; + + while (await _subscriptionsQueuedSignal.WaitAsync(GetRemainingTime(endTime), cancellationToken).ConfigureAwait(false)) + { + List subscriptions; + SendSubscriptionResults subscribeUnsubscribeResult; + HashSet unsubscriptions; + + lock (_subscriptions) + { + subscriptions = [.. _subscriptions.Values]; + _subscriptions.Clear(); + + unsubscriptions = new HashSet(_unsubscriptions); + _unsubscriptions.Clear(); + } + + if (subscriptions.Count == 0 && unsubscriptions.Count == 0) + { + continue; + } + + _logger.Verbose("Publishing {0} added and {1} removed subscriptions", subscriptions.Count, unsubscriptions.Count); + + foreach (var unsubscription in unsubscriptions) + { + _reconnectSubscriptions.Remove(unsubscription); + } + + foreach (var subscription in subscriptions) + { + _reconnectSubscriptions[subscription.Topic] = subscription; + } + + var addedTopicFilters = new List(); + foreach (var subscription in subscriptions) + { + addedTopicFilters.Add(subscription); + + if (addedTopicFilters.Count == Options!.MaxTopicFiltersInSubscribeUnsubscribePackets) + { + subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(addedTopicFilters, null, cancellationToken).ConfigureAwait(false); + addedTopicFilters.Clear(); + await HandleSubscriptionsResultAsync(subscribeUnsubscribeResult).ConfigureAwait(false); + } + } + + subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(addedTopicFilters, null, cancellationToken).ConfigureAwait(false); + await HandleSubscriptionsResultAsync(subscribeUnsubscribeResult).ConfigureAwait(false); + + var removedTopicFilters = new List(); + foreach (var unSub in unsubscriptions) + { + removedTopicFilters.Add(unSub); + + if (removedTopicFilters.Count == Options!.MaxTopicFiltersInSubscribeUnsubscribePackets) + { + subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(null, removedTopicFilters, cancellationToken).ConfigureAwait(false); + removedTopicFilters.Clear(); + await HandleSubscriptionsResultAsync(subscribeUnsubscribeResult).ConfigureAwait(false); + } + } + + subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(null, removedTopicFilters, cancellationToken).ConfigureAwait(false); + await HandleSubscriptionsResultAsync(subscribeUnsubscribeResult).ConfigureAwait(false); + } + } + + private async Task ReconnectIfRequiredAsync(CancellationToken cancellationToken) + { + if (InternalClient.IsConnected) + { + return ReconnectionResult.StillConnected; + } + + MqttClientConnectResult? connectResult = null; + try + { + using (var connectTimeout = NewTimeoutToken(cancellationToken)) + { + connectResult = await InternalClient.ConnectAsync(Options!.ClientOptions, connectTimeout.Token).ConfigureAwait(false); + } + + if (connectResult.ResultCode != MqttClientConnectResultCode.Success) + { + throw new MqttCommunicationException($"Client connected but server denied connection with reason '{connectResult.ResultCode}'."); + } + + return connectResult.IsSessionPresent ? ReconnectionResult.Recovered : ReconnectionResult.Reconnected; + } + catch (Exception exception) + { + await _connectingFailedEvent.InvokeAsync(new ConnectingFailedEventArgs(connectResult, exception)); + return ReconnectionResult.NotConnected; + } + } + + private async Task SendSubscribeUnsubscribe(List? addedSubscriptions, List? removedSubscriptions, CancellationToken cancellationToken) + { + var subscribeResults = new List(); + var unsubscribeResults = new List(); + try + { + if (removedSubscriptions?.Count > 0) + { + var unsubscribeOptionsBuilder = new MqttClientUnsubscribeOptionsBuilder(); + + foreach (var removedSubscription in removedSubscriptions) + { + unsubscribeOptionsBuilder.WithTopicFilter(removedSubscription); + } + + using (var unsubscribeTimeout = NewTimeoutToken(cancellationToken)) + { + var unsubscribeResult = await InternalClient.UnsubscribeAsync(unsubscribeOptionsBuilder.Build(), unsubscribeTimeout.Token).ConfigureAwait(false); + unsubscribeResults.Add(unsubscribeResult); + } + + // clear because these worked, maybe the subscribe below will fail, only report those + removedSubscriptions.Clear(); + } + + if (addedSubscriptions?.Count > 0) + { + var subscribeOptionsBuilder = new MqttClientSubscribeOptionsBuilder(); + + foreach (var addedSubscription in addedSubscriptions) + { + subscribeOptionsBuilder.WithTopicFilter(addedSubscription); + } + + using (var subscribeTimeout = NewTimeoutToken(cancellationToken)) + { + var subscribeResult = await InternalClient.SubscribeAsync(subscribeOptionsBuilder.Build(), subscribeTimeout.Token).ConfigureAwait(false); + subscribeResults.Add(subscribeResult); + } + } + } + catch (Exception exception) + { + await HandleSubscriptionExceptionAsync(exception, addedSubscriptions, removedSubscriptions).ConfigureAwait(false); + } + + return new SendSubscriptionResults(subscribeResults, unsubscribeResults); + } + + private void StartPublishing() + { + StopPublishing(); + + var cancellationTokenSource = new CancellationTokenSource(); + var cancellationToken = cancellationTokenSource.Token; + _publishingCancellationToken = cancellationTokenSource; + + Task.Run(() => PublishQueuedMessagesAsync(cancellationToken), cancellationToken).RunInBackground(_logger); + } + + private void StopMaintainingConnection() + { + try + { + _connectionCancellationToken?.Cancel(false); + } + finally + { + _connectionCancellationToken?.Dispose(); + _connectionCancellationToken = null; + } + } + + private void StopPublishing() + { + try + { + _publishingCancellationToken?.Cancel(false); + } + finally + { + _publishingCancellationToken?.Dispose(); + _publishingCancellationToken = null; + } + } + + private async Task TryMaintainConnectionAsync(CancellationToken cancellationToken) + { + try + { + var oldConnectionState = InternalClient.IsConnected; + var connectionState = await ReconnectIfRequiredAsync(cancellationToken).ConfigureAwait(false); + + if (connectionState == ReconnectionResult.NotConnected) + { + StopPublishing(); + await Task.Delay(Options!.AutoReconnectDelay, cancellationToken).ConfigureAwait(false); + } + else if (connectionState == ReconnectionResult.Reconnected) + { + await PublishReconnectSubscriptionsAsync(cancellationToken).ConfigureAwait(false); + StartPublishing(); + } + else if (connectionState == ReconnectionResult.Recovered) + { + StartPublishing(); + } + else if (connectionState == ReconnectionResult.StillConnected) + { + await PublishSubscriptionsAsync(Options!.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false); + } + + if (oldConnectionState != InternalClient.IsConnected) + { + await _connectionStateChangedEvent.InvokeAsync(EventArgs.Empty).ConfigureAwait(false); + } + } + catch (OperationCanceledException) + { + } + catch (MqttCommunicationException exception) + { + _logger.Warning(exception, "Communication error while maintaining connection."); + } + catch (Exception exception) + { + _logger.Error(exception, "Error exception while maintaining connection."); + } + } + + private async Task TryPublishQueuedMessageAsync(ResilientMqttApplicationMessage message, CancellationToken cancellationToken) + { + Exception? transmitException = null; + var acceptPublish = true; + try + { + if (_interceptingPublishMessageEvent.HasHandlers) + { + var interceptEventArgs = new InterceptingPublishMessageEventArgs(message); + await _interceptingPublishMessageEvent.InvokeAsync(interceptEventArgs).ConfigureAwait(false); + acceptPublish = interceptEventArgs.AcceptPublish; + } + + if (acceptPublish) + { + using (var publishTimeout = NewTimeoutToken(cancellationToken)) + { + await InternalClient.PublishAsync(message.ApplicationMessage, publishTimeout.Token).ConfigureAwait(false); + } + } + + using (await _messageQueueLock.EnterAsync(CancellationToken.None).ConfigureAwait(false)) // lock to avoid conflict with this.PublishAsync + { + // While publishing this message, this.PublishAsync could have booted this + // message off the queue to make room for another (when using a cap + // with the DropOldestQueuedMessage strategy). If the first item + // in the queue is equal to this message, then it's safe to remove + // it from the queue. If not, that means this.PublishAsync has already + // removed it, in which case we don't want to do anything. + _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id)); + + if (_storageManager != null) + { + await _storageManager.RemoveAsync(message).ConfigureAwait(false); + } + } + } + catch (MqttCommunicationException exception) + { + transmitException = exception; + + _logger.Warning(exception, "Publishing application message ({0}) failed.", message.Id); + + if (message.ApplicationMessage?.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) + { + // If QoS 0, we don't want this message to stay on the queue. + // If QoS 1 or 2, it's possible that, when using a cap, this message + // has been booted off the queue by this.PublishAsync, in which case this + // thread will not continue to try to publish it. While this does + // contradict the expected behavior of QoS 1 and 2, that's also true + // for the usage of a message queue cap, so it's still consistent + // with prior behavior in that way. + using (await _messageQueueLock.EnterAsync(CancellationToken.None).ConfigureAwait(false)) // lock to avoid conflict with this.PublishAsync + { + _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id)); + + if (_storageManager != null) + { + await _storageManager.RemoveAsync(message).ConfigureAwait(false); + } + } + } + } + catch (Exception exception) + { + transmitException = exception; + _logger.Error(exception, "Error while publishing application message ({0}).", message.Id); + } + finally + { + if (_applicationMessageProcessedEvent.HasHandlers) + { + var eventArgs = new ApplicationMessageProcessedEventArgs(message, transmitException); + await _applicationMessageProcessedEvent.InvokeAsync(eventArgs).ConfigureAwait(false); + } + } + } +} diff --git a/src/MQTTnet.Rx.Client/ResilientClient/Internal/ResilientMqttClientStorageManager.cs b/src/MQTTnet.Rx.Client/ResilientClient/Internal/ResilientMqttClientStorageManager.cs new file mode 100644 index 0000000..2a36993 --- /dev/null +++ b/src/MQTTnet.Rx.Client/ResilientClient/Internal/ResilientMqttClientStorageManager.cs @@ -0,0 +1,106 @@ +// Copyright (c) Chris Pulman. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using MQTTnet.Internal; + +namespace MQTTnet.Rx.Client.ResilientClient.Internal; + +/// +/// Resilient Mqtt Client Storage Manager. +/// +/// +/// Initializes a new instance of the class. +/// +/// The storage. +/// storage. +internal class ResilientMqttClientStorageManager(IResilientMqttClientStorage storage) : IDisposable +{ + private readonly List _messages = []; + private readonly AsyncLock _messagesLock = new(); + + private readonly IResilientMqttClientStorage _storage = storage ?? throw new ArgumentNullException(nameof(storage)); + private bool _disposedValue; + + /// + /// Loads the queued messages asynchronous. + /// + /// A representing the asynchronous operation. + public async Task> LoadQueuedMessagesAsync() + { + var loadedMessages = await _storage.LoadQueuedMessagesAsync().ConfigureAwait(false); + _messages.AddRange(loadedMessages); + + return _messages; + } + + /// + /// Adds the asynchronous. + /// + /// The application message. + /// applicationMessage. + /// A representing the asynchronous operation. + public async Task AddAsync(ResilientMqttApplicationMessage applicationMessage) + { + ArgumentNullException.ThrowIfNull(applicationMessage); + + using (await _messagesLock.EnterAsync().ConfigureAwait(false)) + { + _messages.Add(applicationMessage); + await SaveAsync().ConfigureAwait(false); + } + } + + /// + /// Removes the asynchronous. + /// + /// The application message. + /// applicationMessage. + /// A representing the asynchronous operation. + public async Task RemoveAsync(ResilientMqttApplicationMessage applicationMessage) + { + ArgumentNullException.ThrowIfNull(applicationMessage); + + using (await _messagesLock.EnterAsync().ConfigureAwait(false)) + { + var index = _messages.IndexOf(applicationMessage); + if (index == -1) + { + return; + } + + _messages.RemoveAt(index); + await SaveAsync().ConfigureAwait(false); + } + } + + /// + /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. + /// + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + + /// + /// Releases unmanaged and - optionally - managed resources. + /// + /// true to release both managed and unmanaged resources; false to release only unmanaged resources. + protected virtual void Dispose(bool disposing) + { + if (!_disposedValue) + { + if (disposing) + { + _messagesLock.Dispose(); + } + + // TODO: free unmanaged resources (unmanaged objects) and override finalizer + // TODO: set large fields to null + _disposedValue = true; + } + } + + private Task SaveAsync() => _storage.SaveQueuedMessagesAsync(_messages); +} diff --git a/src/MQTTnet.Rx.Client/ResilientClient/Internal/SendSubscriptionResults.cs b/src/MQTTnet.Rx.Client/ResilientClient/Internal/SendSubscriptionResults.cs new file mode 100644 index 0000000..88a0b87 --- /dev/null +++ b/src/MQTTnet.Rx.Client/ResilientClient/Internal/SendSubscriptionResults.cs @@ -0,0 +1,36 @@ +// Copyright (c) Chris Pulman. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace MQTTnet.Rx.Client.ResilientClient.Internal; + +/// +/// SendSubscribeUnsubscribeResult. +/// +/// +/// Initializes a new instance of the class. +/// +/// The subscribe results. +/// The unsubscribe results. +/// +/// subscribeResults +/// or +/// unsubscribeResults. +/// +internal sealed class SendSubscriptionResults(List subscribeResults, List unsubscribeResults) +{ + /// + /// Gets the subscribe results. + /// + /// + /// The subscribe results. + /// + public List SubscribeResults { get; } = subscribeResults ?? throw new ArgumentNullException(nameof(subscribeResults)); + + /// + /// Gets the unsubscribe results. + /// + /// + /// The unsubscribe results. + /// + public List UnsubscribeResults { get; } = unsubscribeResults ?? throw new ArgumentNullException(nameof(unsubscribeResults)); +} diff --git a/src/MQTTnet.Rx.Client/ResilientClient/MqttPendingMessagesOverflowStrategy.cs b/src/MQTTnet.Rx.Client/ResilientClient/MqttPendingMessagesOverflowStrategy.cs new file mode 100644 index 0000000..8c8ce59 --- /dev/null +++ b/src/MQTTnet.Rx.Client/ResilientClient/MqttPendingMessagesOverflowStrategy.cs @@ -0,0 +1,20 @@ +// Copyright (c) Chris Pulman. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace MQTTnet.Rx.Client; + +/// +/// MqttPendingMessagesOverflowStrategy. +/// +public enum MqttPendingMessagesOverflowStrategy +{ + /// + /// The drop oldest queued message. + /// + DropOldestQueuedMessage, + + /// + /// The drop new message. + /// + DropNewMessage +} diff --git a/src/MQTTnet.Rx.Client/ResilientClient/ReconnectionResult.cs b/src/MQTTnet.Rx.Client/ResilientClient/ReconnectionResult.cs new file mode 100644 index 0000000..d223164 --- /dev/null +++ b/src/MQTTnet.Rx.Client/ResilientClient/ReconnectionResult.cs @@ -0,0 +1,27 @@ +// Copyright (c) Chris Pulman. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace MQTTnet.Rx.Client; + +/// +/// ReconnectionResult. +/// +public enum ReconnectionResult +{ + /// + /// The still connected. + /// + StillConnected, + /// + /// The reconnected. + /// + Reconnected, + /// + /// The recovered. + /// + Recovered, + /// + /// The not connected. + /// + NotConnected +} diff --git a/src/MQTTnet.Rx.Client/ResilientClient/ResilientMqttApplicationMessage.cs b/src/MQTTnet.Rx.Client/ResilientClient/ResilientMqttApplicationMessage.cs new file mode 100644 index 0000000..0832714 --- /dev/null +++ b/src/MQTTnet.Rx.Client/ResilientClient/ResilientMqttApplicationMessage.cs @@ -0,0 +1,26 @@ +// Copyright (c) Chris Pulman. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace MQTTnet.Rx.Client; + +/// +/// ResilientMqttApplicationMessage. +/// +public class ResilientMqttApplicationMessage +{ + /// + /// Gets or sets the identifier. + /// + /// + /// The identifier. + /// + public Guid Id { get; set; } = Guid.NewGuid(); + + /// + /// Gets or sets the application message. + /// + /// + /// The application message. + /// + public MqttApplicationMessage? ApplicationMessage { get; set; } +} diff --git a/src/MQTTnet.Rx.Client/ResilientClient/ResilientMqttClientOptions.cs b/src/MQTTnet.Rx.Client/ResilientClient/ResilientMqttClientOptions.cs new file mode 100644 index 0000000..bd8fe43 --- /dev/null +++ b/src/MQTTnet.Rx.Client/ResilientClient/ResilientMqttClientOptions.cs @@ -0,0 +1,64 @@ +// Copyright (c) Chris Pulman. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace MQTTnet.Rx.Client; + +/// +/// Resilient Mqtt Client Options. +/// +public sealed class ResilientMqttClientOptions +{ + /// + /// Gets or sets the client options. + /// + /// + /// The client options. + /// + public MqttClientOptions? ClientOptions { get; set; } + + /// + /// Gets or sets the automatic reconnect delay. + /// + /// + /// The automatic reconnect delay. + /// + public TimeSpan AutoReconnectDelay { get; set; } = TimeSpan.FromSeconds(5); + + /// + /// Gets or sets the connection check interval. + /// + /// + /// The connection check interval. + /// + public TimeSpan ConnectionCheckInterval { get; set; } = TimeSpan.FromSeconds(1); + + /// + /// Gets or sets the storage. + /// + /// + /// The storage. + /// + public IResilientMqttClientStorage? Storage { get; set; } + + /// + /// Gets or sets the maximum pending messages. + /// + /// + /// The maximum pending messages. + /// + public int MaxPendingMessages { get; set; } = int.MaxValue; + + /// + /// Gets or sets the pending messages overflow strategy. + /// + /// + /// The pending messages overflow strategy. + /// + public MqttPendingMessagesOverflowStrategy PendingMessagesOverflowStrategy { get; set; } = MqttPendingMessagesOverflowStrategy.DropNewMessage; + + /// + /// Gets or sets defines the maximum amount of topic filters which will be sent in a SUBSCRIBE/UNSUBSCRIBE packet. + /// Amazon Web Services (AWS) limits this number to 8. The default is int.MaxValue. + /// + public int MaxTopicFiltersInSubscribeUnsubscribePackets { get; set; } = int.MaxValue; +} diff --git a/src/MQTTnet.Rx.Client/ResilientClient/ResilientMqttClientOptionsBuilder.cs b/src/MQTTnet.Rx.Client/ResilientClient/ResilientMqttClientOptionsBuilder.cs new file mode 100644 index 0000000..1d64ac3 --- /dev/null +++ b/src/MQTTnet.Rx.Client/ResilientClient/ResilientMqttClientOptionsBuilder.cs @@ -0,0 +1,140 @@ +// Copyright (c) Chris Pulman. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace MQTTnet.Rx.Client; + +/// +/// Resilient Mqtt Client Options Builder. +/// +public class ResilientMqttClientOptionsBuilder +{ + private readonly ResilientMqttClientOptions _options = new(); + private MqttClientOptionsBuilder? _clientOptionsBuilder; + + /// + /// Withes the maximum pending messages. + /// + /// The value. + /// Resilient Mqtt Client Options Builder. + public ResilientMqttClientOptionsBuilder WithMaxPendingMessages(int value) + { + _options.MaxPendingMessages = value; + return this; + } + + /// + /// Withes the pending messages overflow strategy. + /// + /// The value. + /// Resilient Mqtt Client Options Builder. + public ResilientMqttClientOptionsBuilder WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy value) + { + _options.PendingMessagesOverflowStrategy = value; + return this; + } + + /// + /// Withes the automatic reconnect delay. + /// + /// The value. + /// Resilient Mqtt Client Options Builder. + public ResilientMqttClientOptionsBuilder WithAutoReconnectDelay(in TimeSpan value) + { + _options.AutoReconnectDelay = value; + return this; + } + + /// + /// Withes the storage. + /// + /// The value. + /// Resilient Mqtt Client Options Builder. + public ResilientMqttClientOptionsBuilder WithStorage(IResilientMqttClientStorage value) + { + _options.Storage = value; + return this; + } + + /// + /// Withes the client options. + /// + /// The value. + /// Resilient Mqtt Client Options Builder. + /// Cannot use client options builder and client options at the same time. + /// value. + public ResilientMqttClientOptionsBuilder WithClientOptions(MqttClientOptions value) + { + if (_clientOptionsBuilder != null) + { + throw new InvalidOperationException("Cannot use client options builder and client options at the same time."); + } + + _options.ClientOptions = value ?? throw new ArgumentNullException(nameof(value)); + + return this; + } + + /// + /// Withes the client options. + /// + /// The builder. + /// Resilient Mqtt Client Options Builder. + /// Cannot use client options builder and client options at the same time. + public ResilientMqttClientOptionsBuilder WithClientOptions(MqttClientOptionsBuilder builder) + { + if (_options.ClientOptions != null) + { + throw new InvalidOperationException("Cannot use client options builder and client options at the same time."); + } + + _clientOptionsBuilder = builder; + return this; + } + + /// + /// Withes the client options. + /// + /// The options. + /// Resilient Mqtt Client Options Builder. + /// options. + public ResilientMqttClientOptionsBuilder WithClientOptions(Action options) + { + ArgumentNullException.ThrowIfNull(options); + + _clientOptionsBuilder ??= new MqttClientOptionsBuilder(); + + options(_clientOptionsBuilder); + return this; + } + + /// + /// Withes the maximum topic filters in subscribe unsubscribe packets. + /// + /// The value. + /// Resilient Mqtt Client Options Builder. + public ResilientMqttClientOptionsBuilder WithMaxTopicFiltersInSubscribeUnsubscribePackets(int value) + { + _options.MaxTopicFiltersInSubscribeUnsubscribePackets = value; + return this; + } + + /// + /// Builds this instance. + /// + /// Resilient Mqtt Client Options. + /// The Client Options cannot be null. + public ResilientMqttClientOptions Build() + { + if (_clientOptionsBuilder != null) + { + _options.ClientOptions = _clientOptionsBuilder.Build(); + } + + if (_options.ClientOptions == null) + { + throw new InvalidOperationException("The Client Options cannot be null."); + } + + return _options; + } +} diff --git a/src/TestSingleClient/Program.cs b/src/TestSingleClient/Program.cs index 32e71bf..5972af7 100644 --- a/src/TestSingleClient/Program.cs +++ b/src/TestSingleClient/Program.cs @@ -1,6 +1,7 @@ // Copyright (c) Chris Pulman. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System; using System.Reactive.Linq; using System.Reactive.Subjects; using MQTTnet; @@ -17,20 +18,24 @@ sub.Disposable.Add(sub.Server.ClientConnected().Subscribe(args => Console.WriteLine($"SERVER: ClientConnectedAsync => clientId:{args.ClientId}"))); sub.Disposable.Add(sub.Server.ClientDisconnected().Subscribe(args => Console.WriteLine($"SERVER: ClientDisconnectedAsync => clientId:{args.ClientId}"))); - var obsClient1 = MQTTnet.Rx.Client.Create.MqttClient() - .WithClientOptions(options => - options.WithTcpServer("localhost", serverPort)); - - var obsClient2 = MQTTnet.Rx.Client.Create.MqttClient() - .WithClientOptions(options => - options.WithTcpServer("localhost", serverPort) - .WithClientId("Client02")); + var obsClient1 = MQTTnet.Rx.Client.Create.ResilientMqttClient() + .WithResilientClientOptions(options => + options.WithClientOptions(c => + c.WithTcpServer("localhost", serverPort)) + .WithAutoReconnectDelay(TimeSpan.FromSeconds(2))); + + var obsClient2 = MQTTnet.Rx.Client.Create.ResilientMqttClient() + .WithResilientClientOptions(options => + options.WithClientOptions(c => + c.WithTcpServer("localhost", serverPort) + .WithClientId("Client02")) + .WithAutoReconnectDelay(TimeSpan.FromSeconds(2))); sub.Disposable.Add( obsClient1.Subscribe(i => { - sub.Disposable.Add(i.Connected().Subscribe((_) => + sub.Disposable.Add(i.Connected.Subscribe((_) => Console.WriteLine($"{DateTime.Now.Dump()}\t CLIENT: Connected with server."))); - sub.Disposable.Add(i.Disconnected().Subscribe((_) => + sub.Disposable.Add(i.Disconnected.Subscribe((_) => Console.WriteLine($"{DateTime.Now.Dump()}\t CLIENT: Disconnected with server."))); }));