diff --git a/libs/resources/RespCommandsDocs.json b/libs/resources/RespCommandsDocs.json
index 8be78e9a83..426502ee71 100644
--- a/libs/resources/RespCommandsDocs.json
+++ b/libs/resources/RespCommandsDocs.json
@@ -3858,6 +3858,54 @@
}
]
},
+ {
+ "Command": "PUBSUB",
+ "Name": "PUBSUB",
+ "Summary": "A container for Pub/Sub commands.",
+ "Group": "PubSub",
+ "Complexity": "Depends on subcommand.",
+ "SubCommands": [
+ {
+ "Command": "PUBSUB_NUMPAT",
+ "Name": "PUBSUB|NUMPAT",
+ "Summary": "Returns a count of unique pattern subscriptions.",
+ "Group": "PubSub",
+ "Complexity": "O(1)"
+ },
+ {
+ "Command": "PUBSUB_CHANNELS",
+ "Name": "PUBSUB|CHANNELS",
+ "Summary": "Returns the active channels.",
+ "Group": "PubSub",
+ "Complexity": "O(N) where N is the number of active channels, and assuming constant time pattern matching (relatively short channels and patterns)",
+ "Arguments": [
+ {
+ "TypeDiscriminator": "RespCommandBasicArgument",
+ "Name": "PATTERN",
+ "DisplayText": "pattern",
+ "Type": "Pattern",
+ "ArgumentFlags": "Optional"
+ }
+ ]
+ },
+ {
+ "Command": "PUBSUB_NUMSUB",
+ "Name": "PUBSUB|NUMSUB",
+ "Summary": "Returns a count of subscribers to channels.",
+ "Group": "PubSub",
+ "Complexity": "O(N) for the NUMSUB subcommand, where N is the number of requested channels",
+ "Arguments": [
+ {
+ "TypeDiscriminator": "RespCommandBasicArgument",
+ "Name": "CHANNEL",
+ "DisplayText": "channel",
+ "Type": "String",
+ "ArgumentFlags": "Optional, Multiple"
+ }
+ ]
+ }
+ ]
+ },
{
"Command": "PUNSUBSCRIBE",
"Name": "PUNSUBSCRIBE",
diff --git a/libs/resources/RespCommandsInfo.json b/libs/resources/RespCommandsInfo.json
index 978ca9be1b..2e7732a60e 100644
--- a/libs/resources/RespCommandsInfo.json
+++ b/libs/resources/RespCommandsInfo.json
@@ -2805,6 +2805,35 @@
"Flags": "Fast, Loading, PubSub, Stale",
"AclCategories": "Fast, PubSub"
},
+ {
+ "Command": "PUBSUB",
+ "Name": "PUBSUB",
+ "Arity": -2,
+ "AclCategories": "Slow",
+ "SubCommands": [
+ {
+ "Command": "PUBSUB_CHANNELS",
+ "Name": "PUBSUB|CHANNELS",
+ "Arity": -2,
+ "Flags": "Loading, PubSub, Stale",
+ "AclCategories": "PubSub, Slow"
+ },
+ {
+ "Command": "PUBSUB_NUMPAT",
+ "Name": "PUBSUB|NUMPAT",
+ "Arity": 2,
+ "Flags": "Loading, PubSub, Stale",
+ "AclCategories": "PubSub, Slow"
+ },
+ {
+ "Command": "PUBSUB_NUMSUB",
+ "Name": "PUBSUB|NUMSUB",
+ "Arity": -2,
+ "Flags": "Loading, PubSub, Stale",
+ "AclCategories": "PubSub, Slow"
+ }
+ ]
+ },
{
"Command": "PUNSUBSCRIBE",
"Name": "PUNSUBSCRIBE",
diff --git a/libs/server/PubSub/SubscribeBroker.cs b/libs/server/PubSub/SubscribeBroker.cs
index a3985a5eb5..53d073b922 100644
--- a/libs/server/PubSub/SubscribeBroker.cs
+++ b/libs/server/PubSub/SubscribeBroker.cs
@@ -2,8 +2,10 @@
// Licensed under the MIT license.
using System;
+using System.Buffers;
using System.Collections.Concurrent;
using System.Collections.Generic;
+using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Garnet.common;
@@ -408,6 +410,162 @@ public unsafe void Publish(byte* key, byte* value, int valueLength, bool ascii =
log.Enqueue(logEntryBytes);
}
+ ///
+ /// Retrieves the collection of channels that have active subscriptions.
+ ///
+ /// The collection of channels.
+ public unsafe void Channels(ref ObjectInput input, ref SpanByteAndMemory output)
+ {
+ var isMemory = false;
+ MemoryHandle ptrHandle = default;
+ var ptr = output.SpanByte.ToPointer();
+
+ var curr = ptr;
+ var end = curr + output.Length;
+
+ try
+ {
+ if (subscriptions is null || subscriptions.Count == 0)
+ {
+ while (!RespWriteUtils.WriteEmptyArray(ref curr, end))
+ ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
+ return;
+ }
+
+ if (input.parseState.Count == 0)
+ {
+ while (!RespWriteUtils.WriteArrayLength(subscriptions.Count, ref curr, end))
+ ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
+
+ foreach (var key in subscriptions.Keys)
+ {
+ while (!RespWriteUtils.WriteBulkString(key.AsSpan().Slice(sizeof(int)), ref curr, end))
+ ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
+ }
+ return;
+ }
+
+ // Below WriteArrayLength is primarily to move the start of the buffer to the max length that is required to write the array length. The actual length is written in the below line.
+ // This is done to avoid multiple two passes over the subscriptions or new array allocation if we use single pass over the subscriptions
+ var totalArrayHeaderLen = 0;
+ while (!RespWriteUtils.WriteArrayLength(subscriptions.Count, ref curr, end, out var _, out totalArrayHeaderLen))
+ ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
+
+ var noOfFoundChannels = 0;
+ var pattern = input.parseState.GetArgSliceByRef(0).SpanByte;
+ var patternPtr = pattern.ToPointer() - sizeof(int);
+ *(int*)patternPtr = pattern.Length;
+
+ foreach (var key in subscriptions.Keys)
+ {
+ fixed (byte* keyPtr = key)
+ {
+ var endKeyPtr = keyPtr;
+ var _patternPtr = patternPtr;
+ if (keySerializer.Match(ref keySerializer.ReadKeyByRef(ref endKeyPtr), true, ref keySerializer.ReadKeyByRef(ref _patternPtr), true))
+ {
+ while (!RespWriteUtils.WriteSimpleString(key.AsSpan().Slice(sizeof(int)), ref curr, end))
+ ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
+ noOfFoundChannels++;
+ }
+ }
+ }
+
+ if (noOfFoundChannels == 0)
+ {
+ curr = ptr;
+ while (!RespWriteUtils.WriteEmptyArray(ref curr, end))
+ ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
+ return;
+ }
+
+ // Below code is to write the actual array length in the buffer
+ // And move the array elements to the start of the new array length if new array length is less than the max array length that we orginally write in the above line
+ var newTotalArrayHeaderLen = 0;
+ var _ptr = ptr;
+ // ReallocateOutput is not needed here as there should be always be available space in the output buffer as we have already written the max array length
+ _ = RespWriteUtils.WriteArrayLength(noOfFoundChannels, ref _ptr, end, out var _, out newTotalArrayHeaderLen);
+
+ Debug.Assert(totalArrayHeaderLen >= newTotalArrayHeaderLen, "newTotalArrayHeaderLen can't be bigger than totalArrayHeaderLen as we have already written max array lenght in the buffer");
+ if (totalArrayHeaderLen != newTotalArrayHeaderLen)
+ {
+ var remainingLength = (curr - ptr) - totalArrayHeaderLen;
+ Buffer.MemoryCopy(ptr + totalArrayHeaderLen, ptr + newTotalArrayHeaderLen, remainingLength, remainingLength);
+ curr = curr - (totalArrayHeaderLen - newTotalArrayHeaderLen);
+ }
+ }
+ finally
+ {
+ if (isMemory)
+ ptrHandle.Dispose();
+ output.Length = (int)(curr - ptr);
+ }
+ }
+
+ ///
+ /// Retrieves the number of pattern subscriptions.
+ ///
+ /// The number of pattern subscriptions.
+ public int NumPatternSubscriptions()
+ {
+ return prefixSubscriptions?.Count ?? 0;
+ }
+
+ ///
+ /// PUBSUB NUMSUB
+ ///
+ ///
+ ///
+ public unsafe void NumSubscriptions(ref ObjectInput input, ref SpanByteAndMemory output)
+ {
+ var isMemory = false;
+ MemoryHandle ptrHandle = default;
+ var ptr = output.SpanByte.ToPointer();
+
+ var curr = ptr;
+ var end = curr + output.Length;
+
+ try
+ {
+ var numOfChannels = input.parseState.Count;
+ if (subscriptions is null || numOfChannels == 0)
+ {
+ while (!RespWriteUtils.WriteEmptyArray(ref curr, end))
+ ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
+ return;
+ }
+
+ while (!RespWriteUtils.WriteArrayLength(numOfChannels * 2, ref curr, end))
+ ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
+
+ var currChannelIdx = 0;
+ while (currChannelIdx < numOfChannels)
+ {
+ var channelArg = input.parseState.GetArgSliceByRef(currChannelIdx);
+ var channelSpan = channelArg.SpanByte;
+ var channelPtr = channelSpan.ToPointer() - sizeof(int); // Memory would have been already pinned
+ *(int*)channelPtr = channelSpan.Length;
+
+ while (!RespWriteUtils.WriteBulkString(channelArg.ReadOnlySpan, ref curr, end))
+ ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
+
+ var channel = new Span(channelPtr, channelSpan.Length + sizeof(int)).ToArray();
+
+ subscriptions.TryGetValue(channel, out var subscriptionDict);
+ while (!RespWriteUtils.WriteInteger(subscriptionDict is null ? 0 : subscriptionDict.Count, ref curr, end))
+ ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
+
+ currChannelIdx++;
+ }
+ }
+ finally
+ {
+ if (isMemory)
+ ptrHandle.Dispose();
+ output.Length = (int)(curr - ptr);
+ }
+ }
+
///
public void Dispose()
{
diff --git a/libs/server/Resp/CmdStrings.cs b/libs/server/Resp/CmdStrings.cs
index 4815c1c8e9..4dc7121f11 100644
--- a/libs/server/Resp/CmdStrings.cs
+++ b/libs/server/Resp/CmdStrings.cs
@@ -104,6 +104,10 @@ static partial class CmdStrings
public static ReadOnlySpan rank => "rank"u8;
public static ReadOnlySpan MAXLEN => "MAXLEN"u8;
public static ReadOnlySpan maxlen => "maxlen"u8;
+ public static ReadOnlySpan PUBSUB => "PUBSUB"u8;
+ public static ReadOnlySpan CHANNELS => "CHANNELS"u8;
+ public static ReadOnlySpan NUMPAT => "NUMPAT"u8;
+ public static ReadOnlySpan NUMSUB => "NUMSUB"u8;
///
/// Response strings
@@ -207,6 +211,7 @@ static partial class CmdStrings
public const string GenericParamShouldBeGreaterThanZero = "ERR {0} should be greater than 0";
public const string GenericUnknownClientType = "ERR Unknown client type '{0}'";
public const string GenericErrDuplicateFilter = "ERR Filter '{0}' defined multiple times";
+ public const string GenericPubSubCommandDisabled = "ERR {0} is disabled, enable it with --pubsub option.";
///
/// Response errors while scripting
diff --git a/libs/server/Resp/Parser/RespCommand.cs b/libs/server/Resp/Parser/RespCommand.cs
index 2036d29a0a..e9c912ff5f 100644
--- a/libs/server/Resp/Parser/RespCommand.cs
+++ b/libs/server/Resp/Parser/RespCommand.cs
@@ -179,6 +179,11 @@ public enum RespCommand : ushort
PING,
+ // Pub/Sub commands
+ PUBSUB,
+ PUBSUB_CHANNELS,
+ PUBSUB_NUMPAT,
+ PUBSUB_NUMSUB,
PUBLISH,
SUBSCRIBE,
PSUBSCRIBE,
@@ -1967,6 +1972,35 @@ private RespCommand SlowParseCommand(ref int count, ref ReadOnlySpan speci
return RespCommand.MODULE_LOADCS;
}
}
+ else if (command.SequenceEqual(CmdStrings.PUBSUB))
+ {
+ Span subCommand = GetCommand(out var gotSubCommand);
+ if (!gotSubCommand)
+ {
+ success = false;
+ return RespCommand.NONE;
+ }
+
+ count--;
+ AsciiUtils.ToUpperInPlace(subCommand);
+ if (subCommand.SequenceEqual(CmdStrings.CHANNELS))
+ {
+ return RespCommand.PUBSUB_CHANNELS;
+ }
+ else if (subCommand.SequenceEqual(CmdStrings.NUMSUB))
+ {
+ return RespCommand.PUBSUB_NUMSUB;
+ }
+ else if (subCommand.SequenceEqual(CmdStrings.NUMPAT))
+ {
+ return RespCommand.PUBSUB_NUMPAT;
+ }
+ else
+ {
+ success = false;
+ return RespCommand.NONE;
+ }
+ }
else
{
// Custom commands should have never been set when we reach this point
diff --git a/libs/server/Resp/PubSubCommands.cs b/libs/server/Resp/PubSubCommands.cs
index 2bd16051f5..0c1c666d12 100644
--- a/libs/server/Resp/PubSubCommands.cs
+++ b/libs/server/Resp/PubSubCommands.cs
@@ -370,5 +370,80 @@ private bool NetworkPUNSUBSCRIBE()
}
return true;
}
+
+ private bool NetworkPUBSUB_CHANNELS()
+ {
+ if (parseState.Count > 1)
+ {
+ return AbortWithWrongNumberOfArguments(nameof(RespCommand.PUBSUB_CHANNELS));
+ }
+
+ if (subscribeBroker is null)
+ {
+ while (!RespWriteUtils.WriteError(string.Format(CmdStrings.GenericPubSubCommandDisabled, "PUBSUB CHANNELS"), ref dcurr, dend))
+ SendAndReset();
+ return true;
+ }
+
+ var input = new ObjectInput()
+ {
+ parseState = parseState
+ };
+ var output = new SpanByteAndMemory(dcurr, (int)(dend - dcurr));
+ subscribeBroker.Channels(ref input, ref output);
+
+ if (!output.IsSpanByte)
+ SendAndReset(output.Memory, output.Length);
+ else
+ dcurr += output.Length;
+
+ return true;
+ }
+
+ private bool NetworkPUBSUB_NUMPAT()
+ {
+ if (parseState.Count > 0)
+ {
+ return AbortWithWrongNumberOfArguments(nameof(RespCommand.PUBSUB_NUMPAT));
+ }
+
+ if (subscribeBroker is null)
+ {
+ while (!RespWriteUtils.WriteError(string.Format(CmdStrings.GenericPubSubCommandDisabled, "PUBSUB NUMPAT"), ref dcurr, dend))
+ SendAndReset();
+ return true;
+ }
+
+ var numPatSubs = subscribeBroker.NumPatternSubscriptions();
+
+ while (!RespWriteUtils.WriteInteger(numPatSubs, ref dcurr, dend))
+ SendAndReset();
+
+ return true;
+ }
+
+ private bool NetworkPUBSUB_NUMSUB()
+ {
+ if (subscribeBroker is null)
+ {
+ while (!RespWriteUtils.WriteError(string.Format(CmdStrings.GenericPubSubCommandDisabled, "PUBSUB NUMSUB"), ref dcurr, dend))
+ SendAndReset();
+ return true;
+ }
+
+ var input = new ObjectInput
+ {
+ parseState = parseState
+ };
+ var output = new SpanByteAndMemory(dcurr, (int)(dend - dcurr));
+ subscribeBroker.NumSubscriptions(ref input, ref output);
+
+ if (!output.IsSpanByte)
+ SendAndReset(output.Memory, output.Length);
+ else
+ dcurr += output.Length;
+
+ return true;
+ }
}
}
\ No newline at end of file
diff --git a/libs/server/Resp/RespServerSession.cs b/libs/server/Resp/RespServerSession.cs
index c97f1b9c9f..89e921c51d 100644
--- a/libs/server/Resp/RespServerSession.cs
+++ b/libs/server/Resp/RespServerSession.cs
@@ -580,6 +580,9 @@ private bool ProcessArrayCommands(RespCommand cmd, ref TGarnetApi st
RespCommand.PSUBSCRIBE => NetworkPSUBSCRIBE(),
RespCommand.UNSUBSCRIBE => NetworkUNSUBSCRIBE(),
RespCommand.PUNSUBSCRIBE => NetworkPUNSUBSCRIBE(),
+ RespCommand.PUBSUB_CHANNELS => NetworkPUBSUB_CHANNELS(),
+ RespCommand.PUBSUB_NUMSUB => NetworkPUBSUB_NUMSUB(),
+ RespCommand.PUBSUB_NUMPAT => NetworkPUBSUB_NUMPAT(),
// Custom Object Commands
RespCommand.COSCAN => ObjectScan(GarnetObjectType.All, ref storageApi),
// Sorted Set commands
diff --git a/playground/CommandInfoUpdater/SupportedCommand.cs b/playground/CommandInfoUpdater/SupportedCommand.cs
index 33a3d82b23..2aaf0ad74b 100644
--- a/playground/CommandInfoUpdater/SupportedCommand.cs
+++ b/playground/CommandInfoUpdater/SupportedCommand.cs
@@ -199,6 +199,12 @@ public class SupportedCommand
new("PSUBSCRIBE", RespCommand.PSUBSCRIBE),
new("PTTL", RespCommand.PTTL),
new("PUBLISH", RespCommand.PUBLISH),
+ new("PUBSUB", RespCommand.PUBSUB,
+ [
+ new("PUBSUB|CHANNELS", RespCommand.PUBSUB_CHANNELS),
+ new("PUBSUB|NUMPAT", RespCommand.PUBSUB_NUMPAT),
+ new("PUBSUB|NUMSUB", RespCommand.PUBSUB_NUMSUB),
+ ]),
new("PUNSUBSCRIBE", RespCommand.PUNSUBSCRIBE),
new("REGISTERCS", RespCommand.REGISTERCS),
new("QUIT", RespCommand.QUIT),
diff --git a/test/Garnet.test/Resp/ACL/RespCommandTests.cs b/test/Garnet.test/Resp/ACL/RespCommandTests.cs
index 45679ef550..99f4616dc8 100644
--- a/test/Garnet.test/Resp/ACL/RespCommandTests.cs
+++ b/test/Garnet.test/Resp/ACL/RespCommandTests.cs
@@ -83,7 +83,7 @@ public void AllCommandsCovered()
ClassicAssert.IsTrue(RespCommandsInfo.TryGetRespCommandNames(out IReadOnlySet advertisedCommands), "Couldn't get advertised RESP commands");
// TODO: See if these commands could be identified programmatically
- IEnumerable withOnlySubCommands = ["ACL", "CLIENT", "CLUSTER", "CONFIG", "LATENCY", "MEMORY", "MODULE"];
+ IEnumerable withOnlySubCommands = ["ACL", "CLIENT", "CLUSTER", "CONFIG", "LATENCY", "MEMORY", "MODULE", "PUBSUB"];
IEnumerable notCoveredByACLs = allInfo.Where(static x => x.Value.Flags.HasFlag(RespCommandFlags.NoAuth)).Select(static kv => kv.Key);
// Check tests against RespCommandsInfo
@@ -4460,6 +4460,51 @@ static async Task DoPublishAsync(GarnetClient client)
}
}
+ [Test]
+ public async Task PubSubChannelsACLsAsync()
+ {
+ await CheckCommandsAsync(
+ "PUBSUB CHANNELS",
+ [DoPubSubChannelsAsync]
+ );
+
+ static async Task DoPubSubChannelsAsync(GarnetClient client)
+ {
+ var count = await client.ExecuteForStringArrayResultAsync("PUBSUB", ["CHANNELS"]);
+ CollectionAssert.IsEmpty(count);
+ }
+ }
+
+ [Test]
+ public async Task PubSubNumPatACLsAsync()
+ {
+ await CheckCommandsAsync(
+ "PUBSUB NUMPAT",
+ [DoPubSubNumPatAsync]
+ );
+
+ static async Task DoPubSubNumPatAsync(GarnetClient client)
+ {
+ var count = await client.ExecuteForLongResultAsync("PUBSUB", ["NUMPAT"]);
+ ClassicAssert.AreEqual(0, count);
+ }
+ }
+
+ [Test]
+ public async Task PubSubNumSubACLsAsync()
+ {
+ await CheckCommandsAsync(
+ "PUBSUB NUMSUB",
+ [DoPubSubNumSubAsync]
+ );
+
+ static async Task DoPubSubNumSubAsync(GarnetClient client)
+ {
+ var count = await client.ExecuteForStringArrayResultAsync("PUBSUB", ["NUMSUB"]);
+ CollectionAssert.IsEmpty(count);
+ }
+ }
+
[Test]
public async Task ReadOnlyACLsAsync()
{
diff --git a/test/Garnet.test/RespPubSubTests.cs b/test/Garnet.test/RespPubSubTests.cs
index d54cabd527..43d851a72a 100644
--- a/test/Garnet.test/RespPubSubTests.cs
+++ b/test/Garnet.test/RespPubSubTests.cs
@@ -2,7 +2,9 @@
// Licensed under the MIT license.
using System;
+using System.Linq;
using System.Threading;
+using System.Threading.Channels;
using NUnit.Framework;
using NUnit.Framework.Legacy;
using StackExchange.Redis;
@@ -36,28 +38,18 @@ public void BasicSUBSCRIBE()
using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
var sub = subRedis.GetSubscriber();
var db = redis.GetDatabase(0);
+ string value = "published message";
ManualResetEvent evt = new(false);
- sub.Subscribe(RedisChannel.Pattern("messages"), (channel, message) =>
+ SubscribeAndPublish(sub, db, RedisChannel.Literal("messages"), RedisChannel.Literal("messages"), value, onSubscribe: (channel, message) =>
{
ClassicAssert.AreEqual("messages", (string)channel);
- ClassicAssert.AreEqual("published message", (string)message);
+ ClassicAssert.AreEqual(value, (string)message);
evt.Set();
});
- // Repeat to work-around bug in StackExchange.Redis subscribe behavior
- // where it returns before the SUBSCRIBE call is processed.
- int repeat = 5;
- while (true)
- {
- db.Publish(RedisChannel.Pattern("messages"), "published message");
- var ret = evt.WaitOne(TimeSpan.FromSeconds(1));
- if (ret) break;
- repeat--;
- ClassicAssert.IsTrue(repeat != 0, "Timeout waiting for subsciption receive");
- }
- sub.Unsubscribe(RedisChannel.Pattern("messages"));
+ sub.Unsubscribe(RedisChannel.Literal("messages"));
}
[Test]
@@ -68,15 +60,15 @@ public void BasicPSUBSCRIBE()
var sub = subRedis.GetSubscriber();
var db = redis.GetDatabase(0);
- string glob = "com.messages.*";
- string actual = "com.messages.testmessage";
+ string glob = "messagesA*";
+ string actual = "messagesAtest";
string value = "published message";
var channel = new RedisChannel(glob, RedisChannel.PatternMode.Pattern);
ManualResetEvent evt = new(false);
- sub.Subscribe(channel, (receivedChannel, message) =>
+ SubscribeAndPublish(sub, db, channel, RedisChannel.Pattern(actual), value, (receivedChannel, message) =>
{
ClassicAssert.AreEqual(glob, (string)channel);
ClassicAssert.AreEqual(actual, (string)receivedChannel);
@@ -84,18 +76,133 @@ public void BasicPSUBSCRIBE()
evt.Set();
});
+ sub.Unsubscribe(channel);
+ }
+
+ [Test]
+ public void BasicPUBSUB_CHANNELS()
+ {
+ using var subRedis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
+ using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
+ var sub = subRedis.GetSubscriber();
+ var db = redis.GetDatabase(0);
+ var server = redis.GetServers()[0];
+
+ var channelA = "messagesAtest";
+ var channelB = "messagesB";
+
+ SubscribeAndPublish(sub, db, RedisChannel.Literal(channelA), RedisChannel.Pattern(channelA));
+ SubscribeAndPublish(sub, db, RedisChannel.Literal(channelB), RedisChannel.Pattern(channelB));
+
+ var result = server.SubscriptionChannels();
+ string[] expectedResult = [channelA, channelB];
+ CollectionAssert.IsSubsetOf(expectedResult, result.Select(x => x.ToString()));
+
+ result = server.SubscriptionChannels(RedisChannel.Pattern("messages*"));
+ expectedResult = [channelA, channelB];
+ CollectionAssert.AreEquivalent(expectedResult, result.Select(x => x.ToString()));
+
+ result = server.SubscriptionChannels(RedisChannel.Pattern("messages?test"));
+ expectedResult = [channelA];
+ CollectionAssert.AreEquivalent(expectedResult, result.Select(x => x.ToString()));
+
+ result = server.SubscriptionChannels(RedisChannel.Pattern("messagesC*"));
+ ClassicAssert.AreEqual(0, result.Length);
+
+ sub.Unsubscribe(RedisChannel.Literal(channelA));
+ sub.Unsubscribe(RedisChannel.Literal(channelB));
+ }
+
+ [Test]
+ public void BasicPUBSUB_NUMPAT()
+ {
+ using var subRedis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
+ using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
+ var sub = subRedis.GetSubscriber();
+ var db = redis.GetDatabase(0);
+ var server = redis.GetServers()[0];
+
+ string glob = "com.messages.*";
+ string globB = "com.messagesB.*";
+ string actual = "com.messages.testmessage";
+ string actualB = "com.messagesB.testmessage";
+ string value = "published message";
+
+ var channel = new RedisChannel(glob, RedisChannel.PatternMode.Pattern);
+ var channelB = new RedisChannel(globB, RedisChannel.PatternMode.Pattern);
+
+ var result = server.SubscriptionPatternCount();
+ ClassicAssert.AreEqual(0, result);
+
+ SubscribeAndPublish(sub, db, channel, RedisChannel.Literal(actual), value);
+ SubscribeAndPublish(sub, db, channelB, RedisChannel.Literal(actualB), value);
+
+ result = server.SubscriptionPatternCount();
+ ClassicAssert.AreEqual(2, result);
+
+ sub.Unsubscribe(channel);
+ sub.Unsubscribe(channelB);
+ }
+
+ [Test]
+ public void BasicPUBSUB_NUMSUB()
+ {
+ using var subRedis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
+ using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
+ var sub = subRedis.GetSubscriber();
+ var db = redis.GetDatabase(0);
+ var server = redis.GetServers()[0];
+
+ var multiChannelResult = server.Execute("PUBSUB", ["NUMSUB"]);
+ ClassicAssert.AreEqual(0, multiChannelResult.Length);
+
+ multiChannelResult = server.Execute("PUBSUB", ["NUMSUB", "messagesA", "messagesB"]);
+ ClassicAssert.AreEqual(4, multiChannelResult.Length);
+ ClassicAssert.AreEqual("messagesA", multiChannelResult[0].ToString());
+ ClassicAssert.AreEqual("0", multiChannelResult[1].ToString());
+ ClassicAssert.AreEqual("messagesB", multiChannelResult[2].ToString());
+ ClassicAssert.AreEqual("0", multiChannelResult[3].ToString());
+
+ SubscribeAndPublish(sub, db, RedisChannel.Literal("messagesA"));
+ SubscribeAndPublish(sub, db, RedisChannel.Literal("messagesB"));
+
+ var result = server.SubscriptionSubscriberCount(RedisChannel.Literal("messagesA"));
+ ClassicAssert.AreEqual(1, result);
+
+ multiChannelResult = server.Execute("PUBSUB", ["NUMSUB", "messagesA", "messagesB"]);
+ ClassicAssert.AreEqual(4, multiChannelResult.Length);
+ ClassicAssert.AreEqual("messagesA", multiChannelResult[0].ToString());
+ ClassicAssert.AreEqual("1", multiChannelResult[1].ToString());
+ ClassicAssert.AreEqual("messagesB", multiChannelResult[2].ToString());
+ ClassicAssert.AreEqual("1", multiChannelResult[3].ToString());
+
+ sub.Unsubscribe(RedisChannel.Literal("messagesA"));
+ sub.Unsubscribe(RedisChannel.Literal("messagesB"));
+ }
+
+ private void SubscribeAndPublish(ISubscriber sub, IDatabase db, RedisChannel channel, RedisChannel? publishChannel = null, string message = null, Action onSubscribe = null)
+ {
+ message ??= "published message";
+ publishChannel ??= channel;
+ ManualResetEvent evt = new(false);
+ sub.Subscribe(channel, (receivedChannel, receivedMessage) =>
+ {
+ onSubscribe?.Invoke(receivedChannel, receivedMessage);
+ evt.Set();
+ });
+
+ // Doing publish to make sure the channel is subscribed
// Repeat to work-around bug in StackExchange.Redis subscribe behavior
// where it returns before the SUBSCRIBE call is processed.
int repeat = 5;
while (true)
{
- db.Publish(RedisChannel.Pattern(actual), value);
+ db.Publish(publishChannel.Value, message);
var ret = evt.WaitOne(TimeSpan.FromSeconds(1));
if (ret) break;
repeat--;
- ClassicAssert.IsTrue(repeat != 0, "Timeout waiting for subsciption receive");
+ ClassicAssert.IsTrue(repeat != 0, "Timeout waiting for subscription receive");
}
- sub.Unsubscribe(channel);
}
}
}
\ No newline at end of file
diff --git a/website/docs/commands/analytics.md b/website/docs/commands/analytics.md
index 7ab10a7c4d..69a1c43851 100644
--- a/website/docs/commands/analytics.md
+++ b/website/docs/commands/analytics.md
@@ -218,6 +218,52 @@ Posts a message to the given channel.
Integer Reply: the number of clients that received the message.
---
+
+### PUBSUB CHANNELS
+#### Syntax
+
+```bash
+PUBSUB CHANNELS [pattern]
+```
+
+Lists the currently active channels. An active channel is a Pub/Sub channel with one or more subscribers (excluding clients subscribed to patterns).
+
+#### Resp Reply
+
+Array reply: a list of active channels, optionally matching the specified pattern.
+
+---
+
+### PUBSUB NUMPAT
+#### Syntax
+
+```bash
+PUBSUB NUMPAT
+```
+
+Returns the number of unique patterns that are subscribed to by clients (that are performed using the PSUBSCRIBE command).
+
+#### Resp Reply
+
+Integer reply: the number of patterns all the clients are subscribed to.
+
+---
+
+### PUBSUB NUMSUB
+#### Syntax
+
+```bash
+PUBSUB NUMSUB [channel [channel ...]]
+```
+
+Returns the number of subscribers (exclusive of clients subscribed to patterns) for the specified channels.
+
+#### Resp Reply
+
+Array reply: the number of subscribers per channel, each even element (including the 0th) is channel name, each odd element is the number of subscribers
+
+---
+
### PUNSUBSCRIBE
#### Syntax
diff --git a/website/docs/commands/api-compatibility.md b/website/docs/commands/api-compatibility.md
index 1e398e0525..e44e169500 100644
--- a/website/docs/commands/api-compatibility.md
+++ b/website/docs/commands/api-compatibility.md
@@ -248,10 +248,10 @@ Note that this list is subject to change as we continue to expand our API comman
| | REFCOUNT | ➖ | |
| **PUB/SUB** | [PSUBSCRIBE](analytics.md#psubscribe) | ➕ | |
| | [PUBLISH](analytics.md#publish) | ➕ | |
-| | PUBSUB CHANNELS | ➖ | |
+| | [PUBSUB CHANNELS](analytics.md#pubsub-channels) | ➖ | |
| | PUBSUB HELP | ➖ | |
-| | PUBSUB NUMPAT | ➖ | |
-| | PUBSUB NUMSUB | ➖ | |
+| | [PUBSUB NUMPAT](analytics.md#pubsub-numpat) | ➖ | |
+| | [PUBSUB NUMSUB](analytics.md#pubsub-numsub) | ➖ | |
| | PUBSUB SHARDCHANNELS | ➖ | |
| | PUBSUB SHARDNUMSUB | ➖ | |
| | [PUNSUBSCRIBE](analytics.md#punsubscribe) | ➕ | |