diff --git a/libs/resources/RespCommandsDocs.json b/libs/resources/RespCommandsDocs.json index c7f576aad0..10fc7b6e57 100644 --- a/libs/resources/RespCommandsDocs.json +++ b/libs/resources/RespCommandsDocs.json @@ -638,6 +638,64 @@ } ] }, + { + "Command": "BLMPOP", + "Name": "BLMPOP", + "Summary": "Pops the first element from one of multiple lists. Blocks until an element is available otherwise. Deletes the list if the last element was popped.", + "Group": "List", + "Complexity": "O(N\u002BM) where N is the number of provided keys and M is the number of elements returned.", + "Arguments": [ + { + "TypeDiscriminator": "RespCommandBasicArgument", + "Name": "TIMEOUT", + "DisplayText": "timeout", + "Type": "Double" + }, + { + "TypeDiscriminator": "RespCommandBasicArgument", + "Name": "NUMKEYS", + "DisplayText": "numkeys", + "Type": "Integer" + }, + { + "TypeDiscriminator": "RespCommandKeyArgument", + "Name": "KEY", + "DisplayText": "key", + "Type": "Key", + "ArgumentFlags": "Multiple", + "KeySpecIndex": 0 + }, + { + "TypeDiscriminator": "RespCommandContainerArgument", + "Name": "WHERE", + "Type": "OneOf", + "Arguments": [ + { + "TypeDiscriminator": "RespCommandBasicArgument", + "Name": "LEFT", + "DisplayText": "left", + "Type": "PureToken", + "Token": "LEFT" + }, + { + "TypeDiscriminator": "RespCommandBasicArgument", + "Name": "RIGHT", + "DisplayText": "right", + "Type": "PureToken", + "Token": "RIGHT" + } + ] + }, + { + "TypeDiscriminator": "RespCommandBasicArgument", + "Name": "COUNT", + "DisplayText": "count", + "Type": "Integer", + "Token": "COUNT", + "ArgumentFlags": "Optional" + } + ] + }, { "Command": "BLPOP", "Name": "BLPOP", diff --git a/libs/resources/RespCommandsInfo.json b/libs/resources/RespCommandsInfo.json index 0216f7f582..dfaf6dcd5e 100644 --- a/libs/resources/RespCommandsInfo.json +++ b/libs/resources/RespCommandsInfo.json @@ -305,6 +305,28 @@ } ] }, + { + "Command": "BLMPOP", + "Name": "BLMPOP", + "Arity": -5, + "Flags": "Blocking, MovableKeys, Write", + "AclCategories": "Blocking, List, Slow, Write", + "KeySpecifications": [ + { + "BeginSearch": { + "TypeDiscriminator": "BeginSearchIndex", + "Index": 2 + }, + "FindKeys": { + "TypeDiscriminator": "FindKeysKeyNum", + "KeyNumIdx": 0, + "FirstKey": 1, + "KeyStep": 1 + }, + "Flags": "RW, Access, Delete" + } + ] + }, { "Command": "BLPOP", "Name": "BLPOP", diff --git a/libs/server/Objects/ItemBroker/CollectionItemBroker.cs b/libs/server/Objects/ItemBroker/CollectionItemBroker.cs index 065438e97f..ac3aba7a9d 100644 --- a/libs/server/Objects/ItemBroker/CollectionItemBroker.cs +++ b/libs/server/Objects/ItemBroker/CollectionItemBroker.cs @@ -5,7 +5,6 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; -using System.Linq; using System.Threading; using System.Threading.Tasks; using Tsavorite.core; @@ -54,11 +53,12 @@ public class CollectionItemBroker : IDisposable /// Keys of objects to observe /// Calling session instance /// Timeout of operation (in seconds, 0 for waiting indefinitely) + /// Additional arguments for command /// Result of operation internal async Task GetCollectionItemAsync(RespCommand command, byte[][] keys, - RespServerSession session, double timeoutInSeconds) + RespServerSession session, double timeoutInSeconds, ArgSlice[] cmdArgs = null) { - var observer = new CollectionItemObserver(session, command); + var observer = new CollectionItemObserver(session, command, cmdArgs); return await this.GetCollectionItemAsync(observer, keys, timeoutInSeconds); } @@ -223,12 +223,12 @@ private void InitializeObserver(CollectionItemObserver observer, byte[][] keys) // If the key already has a non-empty observer queue, it does not have an item to retrieve // Otherwise, try to retrieve next available item if ((KeysToObservers.ContainsKey(key) && KeysToObservers[key].Count > 0) || - !TryGetNextItem(key, observer.Session.storageSession, observer.Command, observer.CommandArgs, - out _, out var nextItem)) continue; + !TryGetResult(key, observer.Session.storageSession, observer.Command, observer.CommandArgs, + out _, out var result)) continue; // An item was found - set the observer result and return SessionIdToObserver.TryRemove(observer.Session.ObjectStoreSessionID, out _); - observer.HandleSetResult(new CollectionItemResult(key, nextItem)); + observer.HandleSetResult(result); return; } @@ -279,8 +279,8 @@ private bool TryAssignItemFromKey(byte[] key) } // Try to get next available item from object stored in key - if (!TryGetNextItem(key, observer.Session.storageSession, observer.Command, observer.CommandArgs, - out var currCount, out var nextItem)) + if (!TryGetResult(key, observer.Session.storageSession, observer.Command, observer.CommandArgs, + out var currCount, out var result)) { // If unsuccessful getting next item but there is at least one item in the collection, // continue to next observer in the queue, otherwise return @@ -292,7 +292,7 @@ private bool TryAssignItemFromKey(byte[] key) observers.TryDequeue(out observer); SessionIdToObserver.TryRemove(observer!.Session.ObjectStoreSessionID, out _); - observer.HandleSetResult(new CollectionItemResult(key, nextItem)); + observer.HandleSetResult(result); return true; } @@ -412,17 +412,17 @@ private static bool TryGetNextSetObject(SortedSetObject sortedSetObj, RespComman /// RESP command /// Additional command arguments /// Collection size - /// Retrieved item + /// Result of command /// True if found available item - private bool TryGetNextItem(byte[] key, StorageSession storageSession, RespCommand command, ArgSlice[] cmdArgs, out int currCount, out byte[] nextItem) + private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, RespCommand command, ArgSlice[] cmdArgs, out int currCount, out CollectionItemResult result) { currCount = default; - nextItem = default; + result = default; var createTransaction = false; var objectType = command switch { - RespCommand.BLPOP or RespCommand.BRPOP or RespCommand.BLMOVE => GarnetObjectType.List, + RespCommand.BLPOP or RespCommand.BRPOP or RespCommand.BLMOVE or RespCommand.BLMPOP => GarnetObjectType.List, _ => throw new NotSupportedException() }; @@ -472,11 +472,15 @@ private bool TryGetNextItem(byte[] key, StorageSession storageSession, RespComma case ListObject listObj: currCount = listObj.LnkList.Count; if (objectType != GarnetObjectType.List) return false; + if (currCount == 0) return false; + switch (command) { case RespCommand.BLPOP: case RespCommand.BRPOP: - return TryGetNextListItem(listObj, command, out nextItem); + var isSuccessful = TryGetNextListItem(listObj, command, out var nextItem); + result = new CollectionItemResult(key, nextItem); + return isSuccessful; case RespCommand.BLMOVE: ListObject dstList; var newObj = false; @@ -491,8 +495,9 @@ private bool TryGetNextItem(byte[] key, StorageSession storageSession, RespComma } else return false; - var isSuccessful = TryMoveNextListItem(listObj, dstList, (OperationDirection)cmdArgs[1].ReadOnlySpan[0], + isSuccessful = TryMoveNextListItem(listObj, dstList, (OperationDirection)cmdArgs[1].ReadOnlySpan[0], (OperationDirection)cmdArgs[2].ReadOnlySpan[0], out nextItem); + result = new CollectionItemResult(key, nextItem); if (isSuccessful && newObj) { @@ -501,13 +506,29 @@ private bool TryGetNextItem(byte[] key, StorageSession storageSession, RespComma } return isSuccessful; + case RespCommand.BLMPOP: + var popDirection = (OperationDirection)cmdArgs[0].ReadOnlySpan[0]; + var popCount = *(int*)(cmdArgs[1].ptr); + popCount = Math.Min(popCount, listObj.LnkList.Count); + + var items = new byte[popCount][]; + for (var i = 0; i < popCount; i++) + { + var _ = TryGetNextListItem(listObj, popDirection == OperationDirection.Left ? RespCommand.BLPOP : RespCommand.BRPOP, out items[i]); // Return can be ignored because it is guaranteed to return true + } + + result = new CollectionItemResult(key, items); + return true; default: return false; } case SortedSetObject setObj: currCount = setObj.Dictionary.Count; if (objectType != GarnetObjectType.SortedSet) return false; - return TryGetNextSetObject(setObj, command, out nextItem); + + var hasValue = TryGetNextSetObject(setObj, command, out var sortedSetNextItem); + result = new CollectionItemResult(key, sortedSetNextItem); + return hasValue; default: return false; } diff --git a/libs/server/Objects/ItemBroker/CollectionItemResult.cs b/libs/server/Objects/ItemBroker/CollectionItemResult.cs index 99845a2ebd..a3fc8311d5 100644 --- a/libs/server/Objects/ItemBroker/CollectionItemResult.cs +++ b/libs/server/Objects/ItemBroker/CollectionItemResult.cs @@ -6,8 +6,20 @@ namespace Garnet.server /// /// Result of item retrieved from observed collection /// - internal readonly struct CollectionItemResult(byte[] key, byte[] item) + internal readonly struct CollectionItemResult { + public CollectionItemResult(byte[] key, byte[] item) + { + Key = key; + Item = item; + } + + public CollectionItemResult(byte[] key, byte[][] items) + { + Key = key; + Items = items; + } + /// /// True if item was found /// @@ -16,16 +28,21 @@ internal readonly struct CollectionItemResult(byte[] key, byte[] item) /// /// Key of collection from which item was retrieved /// - internal byte[] Key { get; } = key; + internal byte[] Key { get; } + + /// + /// Item retrieved from collection + /// + internal byte[] Item { get; } /// /// Item retrieved from collection /// - internal byte[] Item { get; } = item; + internal byte[][] Items { get; } /// /// Instance of empty result /// - internal static readonly CollectionItemResult Empty = new(null, null); + internal static readonly CollectionItemResult Empty = new(null, item: null); } } \ No newline at end of file diff --git a/libs/server/Resp/Objects/ListCommands.cs b/libs/server/Resp/Objects/ListCommands.cs index 9163d852d4..a1cf1087cc 100644 --- a/libs/server/Resp/Objects/ListCommands.cs +++ b/libs/server/Resp/Objects/ListCommands.cs @@ -901,5 +901,108 @@ public bool ListSet(ref TGarnetApi storageApi) return true; } + + /// + /// BLMPOP timeout numkeys key [key ...] LEFT|RIGHT [COUNT count] + /// + /// + private unsafe bool ListBlockingPopMultiple() + { + if (parseState.Count < 4) + { + return AbortWithWrongNumberOfArguments(nameof(RespCommand.BLMPOP)); + } + + var currTokenId = 0; + + // Read timeout + if (!parseState.TryGetDouble(currTokenId++, out var timeout)) + { + return AbortWithErrorMessage(CmdStrings.RESP_ERR_TIMEOUT_NOT_VALID_FLOAT); + } + + // Read count of keys + if (!parseState.TryGetInt(currTokenId++, out var numKeys)) + { + var err = string.Format(CmdStrings.GenericParamShouldBeGreaterThanZero, "numkeys"); + return AbortWithErrorMessage(Encoding.ASCII.GetBytes(err)); + } + + if (parseState.Count != numKeys + 3 && parseState.Count != numKeys + 5) + { + return AbortWithErrorMessage(CmdStrings.RESP_ERR_GENERIC_SYNTAX_ERROR); + } + + // Get the keys for Lists + var keysBytes = new byte[numKeys][]; + for (var i = 0; i < keysBytes.Length; i++) + { + keysBytes[i] = parseState.GetArgSliceByRef(currTokenId++).SpanByte.ToByteArray(); + } + + var cmdArgs = new ArgSlice[2]; + + // Get the direction + var dir = parseState.GetArgSliceByRef(currTokenId++); + var popDirection = GetOperationDirection(dir); + + if (popDirection == OperationDirection.Unknown) + { + return AbortWithErrorMessage(CmdStrings.RESP_ERR_GENERIC_SYNTAX_ERROR); + } + cmdArgs[0] = new ArgSlice((byte*)&popDirection, 1); + + var popCount = 1; + + // Get the COUNT keyword & parameter value, if specified + if (parseState.Count == numKeys + 5) + { + var countKeyword = parseState.GetArgSliceByRef(currTokenId++); + + if (!countKeyword.ReadOnlySpan.EqualsUpperCaseSpanIgnoringCase(CmdStrings.COUNT)) + { + return AbortWithErrorMessage(CmdStrings.RESP_ERR_GENERIC_SYNTAX_ERROR); + } + + // Read count + if (!parseState.TryGetInt(currTokenId, out popCount) || popCount < 1) + { + var err = string.Format(CmdStrings.GenericParamShouldBeGreaterThanZero, "count"); + return AbortWithErrorMessage(Encoding.ASCII.GetBytes(err)); + } + } + + cmdArgs[1] = new ArgSlice((byte*)&popCount, sizeof(int)); + + if (storeWrapper.itemBroker == null) + throw new GarnetException("Object store is disabled"); + + var result = storeWrapper.itemBroker.GetCollectionItemAsync(RespCommand.BLMPOP, keysBytes, this, timeout, cmdArgs).Result; + + if (!result.Found) + { + while (!RespWriteUtils.WriteNull(ref dcurr, dend)) + SendAndReset(); + return true; + } + + while (!RespWriteUtils.WriteArrayLength(2, ref dcurr, dend)) + SendAndReset(); + + while (!RespWriteUtils.WriteBulkString(result.Key, ref dcurr, dend)) + SendAndReset(); + + var elements = result.Items; + while (!RespWriteUtils.WriteArrayLength(elements.Length, ref dcurr, dend)) + SendAndReset(); + + foreach (var element in elements) + { + while (!RespWriteUtils.WriteBulkString(element, ref dcurr, dend)) + SendAndReset(); + } + + return true; + } } } \ No newline at end of file diff --git a/libs/server/Resp/Parser/RespCommand.cs b/libs/server/Resp/Parser/RespCommand.cs index a7916d157e..1473a014b1 100644 --- a/libs/server/Resp/Parser/RespCommand.cs +++ b/libs/server/Resp/Parser/RespCommand.cs @@ -125,6 +125,7 @@ public enum RespCommand : ushort BRPOP, BLMOVE, BRPOPLPUSH, + BLMPOP, MIGRATE, MSET, MSETNX, @@ -1061,6 +1062,10 @@ private RespCommand FastParseArrayCommand(ref int count, ref ReadOnlySpan { return RespCommand.BLMOVE; } + else if (*(ulong*)(ptr + 4) == MemoryMarshal.Read("BLMPOP\r\n"u8)) + { + return RespCommand.BLMPOP; + } break; case 'D': if (*(ulong*)(ptr + 4) == MemoryMarshal.Read("DBSIZE\r\n"u8)) diff --git a/libs/server/Resp/RespServerSession.cs b/libs/server/Resp/RespServerSession.cs index fe14d4b157..9ccd49a9f9 100644 --- a/libs/server/Resp/RespServerSession.cs +++ b/libs/server/Resp/RespServerSession.cs @@ -663,6 +663,7 @@ private bool ProcessArrayCommands(RespCommand cmd, ref TGarnetApi st RespCommand.BRPOP => ListBlockingPop(cmd), RespCommand.BLMOVE => ListBlockingMove(), RespCommand.BRPOPLPUSH => ListBlockingPopPush(), + RespCommand.BLMPOP => ListBlockingPopMultiple(), // Hash Commands RespCommand.HSET => HashSet(cmd, ref storageApi), RespCommand.HMSET => HashSet(cmd, ref storageApi), diff --git a/playground/CommandInfoUpdater/SupportedCommand.cs b/playground/CommandInfoUpdater/SupportedCommand.cs index 40d6ca496d..5e79dbe4d0 100644 --- a/playground/CommandInfoUpdater/SupportedCommand.cs +++ b/playground/CommandInfoUpdater/SupportedCommand.cs @@ -37,6 +37,7 @@ public class SupportedCommand new("BRPOP", RespCommand.BRPOP), new("BLMOVE", RespCommand.BLMOVE), new("BRPOPLPUSH", RespCommand.BRPOPLPUSH), + new("BLMPOP", RespCommand.BLMPOP), new("CLIENT", RespCommand.CLIENT, [ new("CLIENT|ID", RespCommand.CLIENT_ID), diff --git a/test/Garnet.test/Resp/ACL/RespCommandTests.cs b/test/Garnet.test/Resp/ACL/RespCommandTests.cs index 02c2aa37e7..63f38dac7c 100644 --- a/test/Garnet.test/Resp/ACL/RespCommandTests.cs +++ b/test/Garnet.test/Resp/ACL/RespCommandTests.cs @@ -3685,6 +3685,21 @@ static async Task DoBRPopLPushAsync(GarnetClient client) } } + [Test] + public async Task BLMPopACLsAsync() + { + await CheckCommandsAsync( + "BLMPOP", + [DoBLMPopAsync] + ); + + static async Task DoBLMPopAsync(GarnetClient client) + { + string val = await client.ExecuteForStringResultAsync("BLMPOP", ["1", "1", "foo", "RIGHT"]); + ClassicAssert.IsNull(val); + } + } + [Test] public async Task BLPopACLsAsync() { diff --git a/test/Garnet.test/RespBlockingListTests.cs b/test/Garnet.test/RespBlockingListTests.cs index ca2eee001e..2e92524161 100644 --- a/test/Garnet.test/RespBlockingListTests.cs +++ b/test/Garnet.test/RespBlockingListTests.cs @@ -303,5 +303,100 @@ public void BasicBlockingListPopPushTest() actualValue = Encoding.ASCII.GetString(response).Substring(0, expectedResponse.Length); ClassicAssert.AreEqual(expectedResponse, actualValue); } + + [Test] + [TestCase(OperationDirection.Left, "value1", Description = "Pop from left")] + [TestCase(OperationDirection.Right, "value3", Description = "Pop from right")] + public void BasicBlmpopTest(OperationDirection direction, string expectedValue) + { + var key = "mykey"; + using var lightClientRequest = TestUtils.CreateRequest(); + + lightClientRequest.SendCommand($"RPUSH {key} value1 value2 value3"); + var response = lightClientRequest.SendCommand($"BLMPOP 1 1 {key} {direction}"); + var expectedResponse = $"*2\r\n${key.Length}\r\n{key}\r\n*1\r\n${expectedValue.Length}\r\n{expectedValue}\r\n"; + var actualValue = Encoding.ASCII.GetString(response).Substring(0, expectedResponse.Length); + ClassicAssert.AreEqual(expectedResponse, actualValue); + } + + [Test] + [TestCase(1, "key1", "value1", Description = "First key has value")] + [TestCase(2, "key2", "value2", Description = "Second key has value")] + public void BlmpopMultipleKeysTest(int valueKeyIndex, string expectedKey, string expectedValue) + { + var keys = new[] { "key1", "key2", "key3" }; + using var lightClientRequest = TestUtils.CreateRequest(); + + lightClientRequest.SendCommand($"RPUSH {keys[valueKeyIndex - 1]} {expectedValue}"); + var response = lightClientRequest.SendCommand($"BLMPOP 1 {keys.Length} {string.Join(" ", keys)} LEFT"); + var expectedResponse = $"*2\r\n${expectedKey.Length}\r\n{expectedKey}\r\n*1\r\n${expectedValue.Length}\r\n{expectedValue}\r\n"; + var actualValue = Encoding.ASCII.GetString(response).Substring(0, expectedResponse.Length); + ClassicAssert.AreEqual(expectedResponse, actualValue); + } + + [Test] + public void BlmpopTimeoutTest() + { + using var lightClientRequest = TestUtils.CreateRequest(); + var response = lightClientRequest.SendCommand("BLMPOP 1 1 nonexistentkey LEFT"); + var expectedResponse = "$-1\r\n"; + var actualValue = Encoding.ASCII.GetString(response).Substring(0, expectedResponse.Length); + ClassicAssert.AreEqual(expectedResponse, actualValue); + } + + [Test] + public void BlmpopBlockingBehaviorTest() + { + var key = "blockingkey"; + var value = "testvalue"; + + var blockingTask = taskFactory.StartNew(() => + { + using var lcr = TestUtils.CreateRequest(); + var response = lcr.SendCommand($"BLMPOP 30 1 {key} LEFT"); + var expectedResponse = $"*2\r\n${key.Length}\r\n{key}\r\n*1\r\n${value.Length}\r\n{value}\r\n"; + var actualValue = Encoding.ASCII.GetString(response).Substring(0, expectedResponse.Length); + ClassicAssert.AreEqual(expectedResponse, actualValue); + }); + + var pushingTask = taskFactory.StartNew(() => + { + using var lcr = TestUtils.CreateRequest(); + Task.Delay(TimeSpan.FromSeconds(2)).Wait(); + return lcr.SendCommand($"LPUSH {key} {value}"); + }); + + Task.WaitAll([blockingTask, pushingTask], TimeSpan.FromSeconds(5)); + ClassicAssert.IsTrue(blockingTask.IsCompletedSuccessfully); + ClassicAssert.IsTrue(pushingTask.IsCompletedSuccessfully); + } + + [Test] + public void BlmpopBlockingWithCountTest() + { + var key = "countkey"; + var values = new[] { "value1", "value2", "value3", "value4" }; + + var blockingTask = taskFactory.StartNew(() => + { + using var lcr = TestUtils.CreateRequest(); + var response = lcr.SendCommand($"BLMPOP 30 1 {key} LEFT COUNT 3"); + var expectedResponse = $"*2\r\n${key.Length}\r\n{key}\r\n*3\r\n$6\r\nvalue1\r\n$6\r\nvalue2\r\n$6\r\nvalue3\r\n"; + var actualValue = Encoding.ASCII.GetString(response).Substring(0, expectedResponse.Length); + ClassicAssert.AreEqual(expectedResponse, actualValue); + }); + + var pushingTask = taskFactory.StartNew(() => + { + using var lcr = TestUtils.CreateRequest(); + Task.Delay(TimeSpan.FromSeconds(2)).Wait(); + return lcr.SendCommand($"RPUSH {key} {string.Join(" ", values)}"); + }); + + Task.WaitAll([blockingTask, pushingTask], TimeSpan.FromSeconds(5)); + ClassicAssert.IsTrue(blockingTask.IsCompletedSuccessfully); + ClassicAssert.IsTrue(pushingTask.IsCompletedSuccessfully); + } + } } \ No newline at end of file diff --git a/website/docs/commands/api-compatibility.md b/website/docs/commands/api-compatibility.md index af196f3838..3e9ba3f4bc 100644 --- a/website/docs/commands/api-compatibility.md +++ b/website/docs/commands/api-compatibility.md @@ -209,7 +209,7 @@ Note that this list is subject to change as we continue to expand our API comman | | LATEST | ➖ | | | | [RESET](server.md#latency-reset) | ➕ | | | **LIST** | [BLMOVE](data-structures.md#blmove) | ➕ | | -| | BLMPOP | ➖ | | +| | [BLMPOP](data-structures.md#blmpop) | ➕ | | | | [BLPOP](data-structures.md#blpop) | ➕ | | | | [BRPOP](data-structures.md#brpop) | ➕ | | | | [BRPOPLPUSH](data-structures.md#brpoplpush) | ➕ | (Deprecated) | diff --git a/website/docs/commands/data-structures.md b/website/docs/commands/data-structures.md index 35df98bbb8..fccfe83e2e 100644 --- a/website/docs/commands/data-structures.md +++ b/website/docs/commands/data-structures.md @@ -248,6 +248,18 @@ Bulk string reply: the element being popped and pushed. --- +### BLMPOP + +#### Syntax + +```bash + BLMPOP timeout numkeys key [key ...] [COUNT count] +``` + +BLMPOP is the blocking variant of [LMPOP](#lmpop). When any of the lists contains elements, this command behaves exactly like LMPOP. When used inside a MULTI/EXEC block, this command behaves exactly like LMPOP. When all lists are empty, Garnet will block the connection until another client pushes to it or until timeout (a double value specifying the maximum number of seconds to block) is reached. A timeout of zero can be used to block indefinitely. + +--- + ### BLPOP #### Syntax