Skip to content

Commit

Permalink
Merge pull request #14 from ChrisPulman/AddMqttServerObservables
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisPulman authored Oct 16, 2023
2 parents 6940886 + 2ce22c0 commit 8aae5a0
Show file tree
Hide file tree
Showing 5 changed files with 322 additions and 61 deletions.
2 changes: 1 addition & 1 deletion Version.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"$schema": "https://raw.githubusercontent.com/dotnet/Nerdbank.GitVersioning/master/src/NerdBank.GitVersioning/version.schema.json",
"version": "1.2.1",
"version": "1.2.2",
"publicReleaseRefSpec": [
"^refs/heads/master$",
"^refs/heads/main$"
Expand Down
40 changes: 40 additions & 0 deletions src/MQTTnet.Rx.Client/Create.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Reactive.Linq;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Server;

namespace MQTTnet.Rx.Client;

Expand All @@ -27,6 +28,45 @@ public static class Create
/// <param name="mqttFactory">The MQTT factory.</param>
public static void NewMqttFactory(MqttFactory mqttFactory) => MqttFactory = mqttFactory;

/// <summary>
/// Creates a MQTTs server.
/// </summary>
/// <param name="builder">The builder.</param>
/// <returns>An MqttServer.</returns>
/// <exception cref="System.ArgumentNullException">builder.</exception>
public static IObservable<(MqttServer Server, CompositeDisposable Disposable)> MqttServer(Func<MqttServerOptionsBuilder, MqttServerOptions> builder)
{
if (builder == null)
{
throw new ArgumentNullException(nameof(builder));
}

var mqttServer = MqttFactory.CreateMqttServer(builder(MqttFactory.CreateServerOptionsBuilder()));
var serverCount = 0;
return Observable.Create<(MqttServer Server, CompositeDisposable Disposable)>(async observer =>
{
var disposable = new CompositeDisposable();
observer.OnNext((mqttServer, disposable));
Interlocked.Increment(ref serverCount);
if (serverCount == 1)
{
await mqttServer.StartAsync();
}

return Disposable.Create(async () =>
{
Interlocked.Decrement(ref serverCount);
if (serverCount == 0)
{
await mqttServer.StopAsync();
mqttServer.Dispose();
}

disposable.Dispose();
});
}).Retry();
}

/// <summary>
/// Created a mqtt Client.
/// </summary>
Expand Down
16 changes: 8 additions & 8 deletions src/MQTTnet.Rx.Client/MqttManagedClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace MQTTnet.Rx.Client;
public static class MqttManagedClientExtensions
{
/// <summary>
/// Applications the message processed.
/// Application messages processed.
/// </summary>
/// <param name="client">The client.</param>
/// <returns>A Application Message Processed Event Args.</returns>
Expand All @@ -22,7 +22,7 @@ public static IObservable<ApplicationMessageProcessedEventArgs> ApplicationMessa
handler => client.ApplicationMessageProcessedAsync -= handler);

/// <summary>
/// Connecteds the specified client.
/// Connected to the specified client.
/// </summary>
/// <param name="client">The client.</param>
/// <returns>A Mqtt Client Connected Event Args.</returns>
Expand All @@ -32,7 +32,7 @@ public static IObservable<MqttClientConnectedEventArgs> Connected(this IManagedM
handler => client.ConnectedAsync -= handler);

/// <summary>
/// Disconnecteds the specified client.
/// Disconnected from the specified client.
/// </summary>
/// <param name="client">The client.</param>
/// <returns>A Mqtt Client Disconnected Event Args.</returns>
Expand All @@ -42,7 +42,7 @@ public static IObservable<MqttClientDisconnectedEventArgs> Disconnected(this IMa
handler => client.DisconnectedAsync -= handler);

/// <summary>
/// Connectings the failed.
/// Connecting failed.
/// </summary>
/// <param name="client">The client.</param>
/// <returns>A Connecting Failed Event Args.</returns>
Expand All @@ -52,7 +52,7 @@ public static IObservable<ConnectingFailedEventArgs> ConnectingFailed(this IMana
handler => client.ConnectingFailedAsync -= handler);

/// <summary>
/// Connections the state changed.
/// Connection state changed.
/// </summary>
/// <param name="client">The client.</param>
/// <returns>Event Args.</returns>
Expand All @@ -62,7 +62,7 @@ public static IObservable<EventArgs> ConnectionStateChanged(this IManagedMqttCli
handler => client.ConnectionStateChangedAsync -= handler);

/// <summary>
/// Synchronizings the subscriptions failed.
/// Synchronizing subscriptions failed.
/// </summary>
/// <param name="client">The client.</param>
/// <returns>A Managed Process Failed Event Args.</returns>
Expand All @@ -72,7 +72,7 @@ public static IObservable<ManagedProcessFailedEventArgs> SynchronizingSubscripti
handler => client.SynchronizingSubscriptionsFailedAsync -= handler);

/// <summary>
/// Applications the message processed.
/// Application messages processed.
/// </summary>
/// <param name="client">The client.</param>
/// <returns>A Application Message Skipped Event Args.</returns>
Expand All @@ -82,7 +82,7 @@ public static IObservable<ApplicationMessageSkippedEventArgs> ApplicationMessage
handler => client.ApplicationMessageSkippedAsync -= handler);

/// <summary>
/// Applications the message received.
/// Application messages received.
/// </summary>
/// <param name="client">The client.</param>
/// <returns>A Mqtt Application Message Received Event Args.</returns>
Expand Down
212 changes: 212 additions & 0 deletions src/MQTTnet.Rx.Client/MqttServerExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
// Copyright (c) Chris Pulman. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using MQTTnet.Server;

namespace MQTTnet.Rx.Client;

/// <summary>
/// MqttServerExtensions.
/// </summary>
public static class MqttServerExtensions
{
/// <summary>
/// Applications the message not consumed.
/// </summary>
/// <param name="server">The server.</param>
/// <returns>An Observable of ApplicationMessageNotConsumedEventArgs.</returns>
public static IObservable<ApplicationMessageNotConsumedEventArgs> ApplicationMessageNotConsumed(this MqttServer server) =>
CreateObservable.FromAsyncEvent<ApplicationMessageNotConsumedEventArgs>(
handler => server.ApplicationMessageNotConsumedAsync += handler,
handler => server.ApplicationMessageNotConsumedAsync -= handler);

/// <summary>
/// Clients the acknowledged publish packet.
/// </summary>
/// <param name="server">The server.</param>
/// <returns>An Observable of ClientAcknowledgedPublishPacketEventArgs.</returns>
public static IObservable<ClientAcknowledgedPublishPacketEventArgs> ClientAcknowledgedPublishPacket(this MqttServer server) =>
CreateObservable.FromAsyncEvent<ClientAcknowledgedPublishPacketEventArgs>(
handler => server.ClientAcknowledgedPublishPacketAsync += handler,
handler => server.ClientAcknowledgedPublishPacketAsync -= handler);

/// <summary>
/// Client connected.
/// </summary>
/// <param name="server">The server.</param>
/// <returns>An Observable of ClientConnectedEventArgs.</returns>
public static IObservable<ClientConnectedEventArgs> ClientConnected(this MqttServer server) =>
CreateObservable.FromAsyncEvent<ClientConnectedEventArgs>(
handler => server.ClientConnectedAsync += handler,
handler => server.ClientConnectedAsync -= handler);

/// <summary>
/// Clients the disconnected.
/// </summary>
/// <param name="server">The server.</param>
/// <returns>An Observable of ClientDisconnectedEventArgs.</returns>
public static IObservable<ClientDisconnectedEventArgs> ClientDisconnected(this MqttServer server) =>
CreateObservable.FromAsyncEvent<ClientDisconnectedEventArgs>(
handler => server.ClientDisconnectedAsync += handler,
handler => server.ClientDisconnectedAsync -= handler);

/// <summary>
/// Clients the subscribed topic.
/// </summary>
/// <param name="server">The server.</param>
/// <returns>An Observable of ClientSubscribedTopicEventArgs.</returns>
public static IObservable<ClientSubscribedTopicEventArgs> ClientSubscribedTopic(this MqttServer server) =>
CreateObservable.FromAsyncEvent<ClientSubscribedTopicEventArgs>(
handler => server.ClientSubscribedTopicAsync += handler,
handler => server.ClientSubscribedTopicAsync -= handler);

/// <summary>
/// Clients the unsubscribed topic.
/// </summary>
/// <param name="server">The server.</param>
/// <returns>An Observable of ClientUnsubscribedTopicEventArgs.</returns>
public static IObservable<ClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopic(this MqttServer server) =>
CreateObservable.FromAsyncEvent<ClientUnsubscribedTopicEventArgs>(
handler => server.ClientUnsubscribedTopicAsync += handler,
handler => server.ClientUnsubscribedTopicAsync -= handler);

/// <summary>
/// Interceptings the client enqueue.
/// </summary>
/// <param name="server">The server.</param>
/// <returns>An Observable of InterceptingClientApplicationMessageEnqueueEventArgs.</returns>
public static IObservable<InterceptingClientApplicationMessageEnqueueEventArgs> InterceptingClientEnqueue(this MqttServer server) =>
CreateObservable.FromAsyncEvent<InterceptingClientApplicationMessageEnqueueEventArgs>(
handler => server.InterceptingClientEnqueueAsync += handler,
handler => server.InterceptingClientEnqueueAsync -= handler);

/// <summary>
/// Interceptings the inbound packet.
/// </summary>
/// <param name="server">The server.</param>
/// <returns>An Observable of InterceptingPacketEventArgs.</returns>
public static IObservable<InterceptingPacketEventArgs> InterceptingInboundPacket(this MqttServer server) =>
CreateObservable.FromAsyncEvent<InterceptingPacketEventArgs>(
handler => server.InterceptingInboundPacketAsync += handler,
handler => server.InterceptingInboundPacketAsync -= handler);

/// <summary>
/// Interceptings the outbound packet.
/// </summary>
/// <param name="server">The server.</param>
/// <returns>An Observable of InterceptingPacketEventArgs.</returns>
public static IObservable<InterceptingPacketEventArgs> InterceptingOutboundPacket(this MqttServer server) =>
CreateObservable.FromAsyncEvent<InterceptingPacketEventArgs>(
handler => server.InterceptingOutboundPacketAsync += handler,
handler => server.InterceptingOutboundPacketAsync -= handler);

/// <summary>
/// Interceptings the publish.
/// </summary>
/// <param name="server">The server.</param>
/// <returns>An Observable of InterceptingPublishEventArgs.</returns>
public static IObservable<InterceptingPublishEventArgs> InterceptingPublish(this MqttServer server) =>
CreateObservable.FromAsyncEvent<InterceptingPublishEventArgs>(
handler => server.InterceptingPublishAsync += handler,
handler => server.InterceptingPublishAsync -= handler);

/// <summary>
/// Interceptings the subscription.
/// </summary>
/// <param name="server">The server.</param>
/// <returns>An Observable of InterceptingSubscriptionEventArgs.</returns>
public static IObservable<InterceptingSubscriptionEventArgs> InterceptingSubscription(this MqttServer server) =>
CreateObservable.FromAsyncEvent<InterceptingSubscriptionEventArgs>(
handler => server.InterceptingSubscriptionAsync += handler,
handler => server.InterceptingSubscriptionAsync -= handler);

/// <summary>
/// Interceptings the unsubscription.
/// </summary>
/// <param name="server">The server.</param>
/// <returns>An Observable of InterceptingUnsubscriptionEventArgs.</returns>
public static IObservable<InterceptingUnsubscriptionEventArgs> InterceptingUnsubscription(this MqttServer server) =>
CreateObservable.FromAsyncEvent<InterceptingUnsubscriptionEventArgs>(
handler => server.InterceptingUnsubscriptionAsync += handler,
handler => server.InterceptingUnsubscriptionAsync -= handler);

/// <summary>
/// Loadings the retained message.
/// </summary>
/// <param name="server">The server.</param>
/// <returns>An Observable of LoadingRetainedMessagesEventArgs.</returns>
public static IObservable<LoadingRetainedMessagesEventArgs> LoadingRetainedMessage(this MqttServer server) =>
CreateObservable.FromAsyncEvent<LoadingRetainedMessagesEventArgs>(
handler => server.LoadingRetainedMessageAsync += handler,
handler => server.LoadingRetainedMessageAsync -= handler);

/// <summary>
/// Preparings the session.
/// </summary>
/// <param name="server">The server.</param>
/// <returns>An Observable of EventArgs.</returns>
public static IObservable<EventArgs> PreparingSession(this MqttServer server) =>
CreateObservable.FromAsyncEvent<EventArgs>(
handler => server.PreparingSessionAsync += handler,
handler => server.PreparingSessionAsync -= handler);

/// <summary>
/// Retaineds the message changed.
/// </summary>
/// <param name="server">The server.</param>
/// <returns>An Observable of RetainedMessageChangedEventArgs.</returns>
public static IObservable<RetainedMessageChangedEventArgs> RetainedMessageChanged(this MqttServer server) =>
CreateObservable.FromAsyncEvent<RetainedMessageChangedEventArgs>(
handler => server.RetainedMessageChangedAsync += handler,
handler => server.RetainedMessageChangedAsync -= handler);

/// <summary>
/// Retaineds the messages cleared.
/// </summary>
/// <param name="server">The server.</param>
/// <returns>An Observable of EventArgs.</returns>
public static IObservable<EventArgs> RetainedMessagesCleared(this MqttServer server) =>
CreateObservable.FromAsyncEvent<EventArgs>(
handler => server.RetainedMessagesClearedAsync += handler,
handler => server.RetainedMessagesClearedAsync -= handler);

/// <summary>
/// Sessions the deleted.
/// </summary>
/// <param name="server">The server.</param>
/// <returns>An Observable of SessionDeletedEventArgs.</returns>
public static IObservable<SessionDeletedEventArgs> SessionDeleted(this MqttServer server) =>
CreateObservable.FromAsyncEvent<SessionDeletedEventArgs>(
handler => server.SessionDeletedAsync += handler,
handler => server.SessionDeletedAsync -= handler);

/// <summary>
/// Starteds the specified server.
/// </summary>
/// <param name="server">The server.</param>
/// <returns>An Observable of EventArgs.</returns>
public static IObservable<EventArgs> Started(this MqttServer server) =>
CreateObservable.FromAsyncEvent<EventArgs>(
handler => server.StartedAsync += handler,
handler => server.StartedAsync -= handler);

/// <summary>
/// Stoppeds the specified server.
/// </summary>
/// <param name="server">The server.</param>
/// <returns>An Observable of EventArgs.</returns>
public static IObservable<EventArgs> Stopped(this MqttServer server) =>
CreateObservable.FromAsyncEvent<EventArgs>(
handler => server.StoppedAsync += handler,
handler => server.StoppedAsync -= handler);

/// <summary>
/// Validatings the connection.
/// </summary>
/// <param name="server">The server.</param>
/// <returns>An Observable of ValidatingConnectionEventArgs.</returns>
public static IObservable<ValidatingConnectionEventArgs> ValidatingConnection(this MqttServer server) =>
CreateObservable.FromAsyncEvent<ValidatingConnectionEventArgs>(
handler => server.ValidatingConnectionAsync += handler,
handler => server.ValidatingConnectionAsync -= handler);
}
Loading

0 comments on commit 8aae5a0

Please sign in to comment.