Skip to content

Commit

Permalink
Merge pull request #73 from ChrisPulman/FixForMultiTopics
Browse files Browse the repository at this point in the history
Fix inability to Subscribe to multiple topics from the same client
  • Loading branch information
ChrisPulman authored Sep 5, 2024
2 parents d92dd25 + c6374f6 commit 511efb1
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 27 deletions.
2 changes: 1 addition & 1 deletion Version.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"$schema": "https://raw.githubusercontent.com/dotnet/Nerdbank.GitVersioning/master/src/NerdBank.GitVersioning/version.schema.json",
"version": "2.1.1",
"version": "2.2.0",
"publicReleaseRefSpec": [
"^refs/heads/master$",
"^refs/heads/main$"
Expand Down
6 changes: 0 additions & 6 deletions global.json

This file was deleted.

14 changes: 6 additions & 8 deletions src/MQTTnet.Rx.Client.TestApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ namespace MQTTnet.Rx.Client.TestApp
internal static class Program
{
private static readonly Subject<(string topic, string payload)> _message = new();
private static CompositeDisposable _disposables = new();
private static readonly JsonSerializerOptions _jsonOptions = new() { WriteIndented = true };
private static CompositeDisposable _disposables = [];

/// <summary>
/// Defines the entry point of the application.
Expand Down Expand Up @@ -128,9 +129,9 @@ private static void DiscoverTopicsManagedClient()
.Subscribe(r =>
{
Console.Clear();
foreach (var topic in r)
foreach (var (topic, lastSeen) in r)
{
Console.WriteLine($"{topic.Topic} Last Seen: {topic.LastSeen}");
Console.WriteLine($"{topic} Last Seen: {lastSeen}");
}
}));
WaitForExit();
Expand Down Expand Up @@ -161,18 +162,15 @@ private static void WaitForExit(string? message = null, bool clear = true)
}

_disposables.Dispose();
_disposables = new();
_disposables = [];
}

private static TObject DumpToConsole<TObject>(this TObject @object)
{
var output = "NULL";
if (@object != null)
{
output = JsonSerializer.Serialize(@object, new JsonSerializerOptions
{
WriteIndented = true
});
output = JsonSerializer.Serialize(@object, _jsonOptions);
}

Console.WriteLine($"[{@object?.GetType().Name}]:\r\n{output}");
Expand Down
2 changes: 1 addition & 1 deletion src/MQTTnet.Rx.Client/CreateObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace MQTTnet.Rx.Client;
/// <summary>
/// Mqtt Extensions.
/// </summary>
public static class CreateObservable
internal static class CreateObservable
{
internal static IObservable<T> FromAsyncEvent<T>(Action<Func<T, Task>> addHandler, Action<Func<T, Task>> removeHandler) =>
Observable.Create<T>(observer =>
Expand Down
26 changes: 17 additions & 9 deletions src/MQTTnet.Rx.Client/MqttdSubscribeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ namespace MQTTnet.Rx.Client;
/// </summary>
public static class MqttdSubscribeExtensions
{
private static readonly Dictionary<string, IObservable<object?>> _dictJsonValues = new();
private static readonly Dictionary<IManagedMqttClient, List<(string topic, int count)>> _managedSubscribeToTopicClients = new();
private static readonly Dictionary<IMqttClient, List<(string topic, int count)>> _unmanagedSubscribeToTopicClients = new();
private static readonly Dictionary<string, IObservable<object?>> _dictJsonValues = [];
private static readonly Dictionary<IManagedMqttClient, List<(string topic, int count)>> _managedSubscribeToTopicClients = [];
private static readonly Dictionary<IMqttClient, List<(string topic, int count)>> _unmanagedSubscribeToTopicClients = [];

/// <summary>
/// Converts to dictionary.
Expand Down Expand Up @@ -126,16 +126,20 @@ public static IObservable<MqttApplicationMessageReceivedEventArgs> SubscribeToTo
disposable.Add(client.Subscribe(async c =>
{
mqttClient = c;
disposable.Add(mqttClient.ApplicationMessageReceived().Subscribe(observer));
if (!_unmanagedSubscribeToTopicClients.TryGetValue(mqttClient, out var value))
{
value = new List<(string topic, int count)>(new[] { (topic, 0) });
value = new([(topic, 0)]);
_unmanagedSubscribeToTopicClients.Add(mqttClient, value);
}
else if (!value.Any(x => x.topic == topic))
{
value.Add((topic, 0));
}

var check = value.Find(x => x.topic == topic);
if (!EqualityComparer<(string topic, int count)>.Default.Equals(check, default))
{
disposable.Add(mqttClient.ApplicationMessageReceived().Where(x => x.ApplicationMessage.Topic == topic).Subscribe(observer));
check.count++;
if (check.count == 1)
{
Expand Down Expand Up @@ -309,24 +313,28 @@ public static IObservable<MqttApplicationMessageReceivedEventArgs> SubscribeToTo
disposable.Add(client.Subscribe(async c =>
{
mqttClient = c;
disposable.Add(mqttClient.ApplicationMessageReceived().Subscribe(observer));
if (!_managedSubscribeToTopicClients.TryGetValue(mqttClient, out var value))
{
value = new List<(string topic, int count)>(new[] { (topic, 0) });
value = new([(topic, 0)]);
_managedSubscribeToTopicClients.Add(mqttClient, value);
}
else if (!value.Any(x => x.topic == topic))
{
value.Add((topic, 0));
}

var check = value.Find(x => x.topic == topic);
if (!EqualityComparer<(string topic, int count)>.Default.Equals(check, default))
{
disposable.Add(mqttClient.ApplicationMessageReceived().Where(x => x.ApplicationMessage.Topic == topic).Subscribe(observer));
check.count++;
if (check.count == 1)
{
var mqttSubscribeOptions = Create.MqttFactory.CreateTopicFilterBuilder()
.WithTopic(topic)
.Build();

await mqttClient.SubscribeAsync(new[] { mqttSubscribeOptions });
await mqttClient.SubscribeAsync([mqttSubscribeOptions]);
}
}
}));
Expand All @@ -342,7 +350,7 @@ public static IObservable<MqttApplicationMessageReceivedEventArgs> SubscribeToTo
check.count--;
if (check.count == 0)
{
await mqttClient.UnsubscribeAsync(new[] { topic }).ConfigureAwait(false);
await mqttClient.UnsubscribeAsync([topic]).ConfigureAwait(false);
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/MQTTnet.Rx.sln
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
# 17
VisualStudioVersion = 17.7.34003.232
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Rx.ABPlc", "MQTTnet.Rx.ABPlc\MQTTnet.Rx.ABPlc.csproj", "{5DB1DAEB-5A4E-436C-8D23-AC185DD67276}"
Expand All @@ -9,7 +9,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "SolutionConfig", "SolutionC
..\.editorconfig = ..\.editorconfig
..\.gitignore = ..\.gitignore
..\Directory.Build.props = ..\Directory.Build.props
..\global.json = ..\global.json
..\LICENSE = ..\LICENSE
..\README.md = ..\README.md
..\stylecop.json = ..\stylecop.json
Expand Down
11 changes: 11 additions & 0 deletions src/TestSingleClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,19 @@
var s4 = obsClient2.SubscribeToTopic("FromMilliseconds")
.Subscribe(r => Console.WriteLine($"\tCLIENT S4: {r.ReasonCode} [{r.ApplicationMessage.Topic}] value : {r.ApplicationMessage.ConvertPayloadToString()}"));

var s5 = obsClient1.SubscribeToTopic("FromMilliseconds1")
.Subscribe(r => Console.WriteLine($"\tCLIENT S5: {r.ReasonCode} [{r.ApplicationMessage.Topic}] value : {r.ApplicationMessage.ConvertPayloadToString()}"));

Subject<(string topic, string payload)> message = new();

sub.Disposable.Add(Observable.Interval(TimeSpan.FromMilliseconds(1000)).Subscribe(i => message.OnNext(("FromMilliseconds", "{" + $"payload: {i}" + "}"))));
sub.Disposable.Add(obsClient1.PublishMessage(message).Subscribe());

Subject<(string topic, string payload)> message1 = new();

sub.Disposable.Add(Observable.Interval(TimeSpan.FromMilliseconds(1000)).Subscribe(i => message.OnNext(("FromMilliseconds1", "{" + $"payload: {i}" + "}"))));
sub.Disposable.Add(obsClient1.PublishMessage(message1).Subscribe());

await Task.Delay(3000);
s2.Dispose();
Console.WriteLine("Dispose S2 ---------------");
Expand All @@ -68,6 +76,9 @@
Console.Read();
s3.Dispose();
Console.WriteLine("Dispose S3 ---------------");
Console.Read();
s5.Dispose();
Console.WriteLine("Dispose S5 ---------------");
});
Console.WriteLine("Press 'Escape' or 'Q' to exit.");

Expand Down

0 comments on commit 511efb1

Please sign in to comment.