diff --git a/src/main/java/redis/clients/jedis/BinaryClient.java b/src/main/java/redis/clients/jedis/BinaryClient.java index 7e511d543a..a4e9999911 100644 --- a/src/main/java/redis/clients/jedis/BinaryClient.java +++ b/src/main/java/redis/clients/jedis/BinaryClient.java @@ -1666,6 +1666,10 @@ public void xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int } } + public void xpendingSummary(final byte[] key, final byte[] groupname) { + sendCommand(XPENDING, key, groupname); + } + public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[][] ids) { diff --git a/src/main/java/redis/clients/jedis/BinaryJedis.java b/src/main/java/redis/clients/jedis/BinaryJedis.java index 7367c3ff00..584bf63bed 100644 --- a/src/main/java/redis/clients/jedis/BinaryJedis.java +++ b/src/main/java/redis/clients/jedis/BinaryJedis.java @@ -4497,6 +4497,13 @@ public List xpending(byte[] key, byte[] groupname, byte[] start, byte[] return client.getObjectMultiBulkReply(); } + @Override + public Object xpendingSummary(final byte[] key, final byte[] groupname) { + checkIsInMultiOrPipeline(); + client.xpendingSummary(key, groupname); + return client.getOne(); + } + @Override public List xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[]... ids) { diff --git a/src/main/java/redis/clients/jedis/BinaryJedisCluster.java b/src/main/java/redis/clients/jedis/BinaryJedisCluster.java index d8174e40f4..77e428d238 100644 --- a/src/main/java/redis/clients/jedis/BinaryJedisCluster.java +++ b/src/main/java/redis/clients/jedis/BinaryJedisCluster.java @@ -2450,6 +2450,16 @@ public List execute(Jedis connection) { }.runBinary(key); } + @Override + public Object xpendingSummary(final byte[] key, final byte[] groupname) { + return new JedisClusterCommand(connectionHandler, maxAttempts) { + @Override + public Object execute(Jedis connection) { + return connection.xpendingSummary(key, groupname); + } + }.runBinary(key); + } + @Override public List xclaim(final byte[] key, final byte[] groupname, final byte[] consumername, final long minIdleTime, final long newIdleTime, final int retries, final boolean force, diff --git a/src/main/java/redis/clients/jedis/BinaryShardedJedis.java b/src/main/java/redis/clients/jedis/BinaryShardedJedis.java index bb8849acdd..489e6542f6 100644 --- a/src/main/java/redis/clients/jedis/BinaryShardedJedis.java +++ b/src/main/java/redis/clients/jedis/BinaryShardedJedis.java @@ -1149,6 +1149,12 @@ public List xpending(byte[] key, byte[] groupname, byte[] start, byte[] return j.xpending(key, groupname, start, end, count, consumername); } + @Override + public Object xpendingSummary(final byte[] key, final byte[] groupname) { + Jedis j = getShard(key); + return j.xpendingSummary(key, groupname); + } + @Override public List xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[]... ids) { diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index 0a5340ab37..8a202ec062 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -898,6 +898,32 @@ public String toString() { } }; + public static final Builder STREAM_PENDING_SUMMARY = new Builder() { + @Override + @SuppressWarnings("unchecked") + public StreamPendingSummary build(Object data) { + if (null == data) { + return null; + } + + List objectList = (List) data; + long total = BuilderFactory.LONG.build(objectList.get(0)); + String minId = SafeEncoder.encode((byte[]) objectList.get(1)); + String maxId = SafeEncoder.encode((byte[]) objectList.get(2)); + List> consumerObjList = (List>) objectList.get(3); + Map map = new HashMap<>(consumerObjList.size()); + for (List consumerObj : consumerObjList) { + map.put(SafeEncoder.encode((byte[]) consumerObj.get(0)), Long.parseLong(SafeEncoder.encode((byte[]) consumerObj.get(1)))); + } + return new StreamPendingSummary(total, new StreamEntryID(minId), new StreamEntryID(maxId), map); + } + + @Override + public String toString() { + return "StreamPendingSummary"; + } + }; + private static Map createMapFromDecodingFunctions(Iterator iterator, Map mappingFunctions) { diff --git a/src/main/java/redis/clients/jedis/Client.java b/src/main/java/redis/clients/jedis/Client.java index b15295f3bf..16659e209b 100644 --- a/src/main/java/redis/clients/jedis/Client.java +++ b/src/main/java/redis/clients/jedis/Client.java @@ -1414,6 +1414,11 @@ public void xpending(String key, String groupname, StreamEntryID start, StreamEn SafeEncoder.encode(end==null ? "+" : end.toString()), count, consumername == null? null : SafeEncoder.encode(consumername)); } + @Override + public void xpendingSummary(String key, String groupname) { + xpendingSummary(SafeEncoder.encode(key), SafeEncoder.encode(groupname)); + } + @Override public void xclaim(String key, String group, String consumername, long minIdleTime, long newIdleTime, int retries, boolean force, StreamEntryID... ids) { diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index 76e93fcdbc..dcb706c55f 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -4117,6 +4117,13 @@ public List xpending(final String key, final String groupnam return BuilderFactory.STREAM_PENDING_ENTRY_LIST.build(client.getObjectMultiBulkReply()); } + @Override + public StreamPendingSummary xpendingSummary(final String key, final String groupname) { + checkIsInMultiOrPipeline(); + client.xpendingSummary(key, groupname); + return BuilderFactory.STREAM_PENDING_SUMMARY.build(client.getObjectMultiBulkReply()); + } + @Override public List xclaim(String key, String group, String consumername, long minIdleTime, long newIdleTime, int retries, boolean force, StreamEntryID... ids) { diff --git a/src/main/java/redis/clients/jedis/JedisCluster.java b/src/main/java/redis/clients/jedis/JedisCluster.java index feb5d75f16..4eacca2060 100644 --- a/src/main/java/redis/clients/jedis/JedisCluster.java +++ b/src/main/java/redis/clients/jedis/JedisCluster.java @@ -2522,6 +2522,16 @@ public List execute(Jedis connection) { }.run(key); } + @Override + public StreamPendingSummary xpendingSummary(final String key, final String groupname) { + return new JedisClusterCommand(connectionHandler, maxAttempts) { + @Override + public StreamPendingSummary execute(Jedis connection) { + return connection.xpendingSummary(key, groupname); + } + }.run(key); + } + @Override public Long xdel(final String key, final StreamEntryID... ids) { return new JedisClusterCommand(connectionHandler, maxAttempts) { diff --git a/src/main/java/redis/clients/jedis/ShardedJedis.java b/src/main/java/redis/clients/jedis/ShardedJedis.java index 54c948b6cc..c2b826bf94 100644 --- a/src/main/java/redis/clients/jedis/ShardedJedis.java +++ b/src/main/java/redis/clients/jedis/ShardedJedis.java @@ -1145,6 +1145,12 @@ public List xpending(String key, String groupname, StreamEnt return j.xpending(key, groupname, start, end, count, consumername); } + @Override + public StreamPendingSummary xpendingSummary(String key, String groupname) { + Jedis j = getShard(key); + return j.xpendingSummary(key, groupname); + } + @Override public List xclaim(String key, String group, String consumername, long minIdleTime, long newIdleTime, int retries, boolean force, StreamEntryID... ids) { diff --git a/src/main/java/redis/clients/jedis/StreamPendingSummary.java b/src/main/java/redis/clients/jedis/StreamPendingSummary.java new file mode 100644 index 0000000000..46a670df6a --- /dev/null +++ b/src/main/java/redis/clients/jedis/StreamPendingSummary.java @@ -0,0 +1,38 @@ +package redis.clients.jedis; + +import java.io.Serializable; +import java.util.Map; + +public class StreamPendingSummary implements Serializable { + + private static final long serialVersionUID = 1L; + + private final long total; + private final StreamEntryID minId; + private final StreamEntryID maxId; + private final Map consumerMessageCount; + + public StreamPendingSummary(long total, StreamEntryID minId, StreamEntryID maxId, + Map consumerMessageCount) { + this.total = total; + this.minId = minId; + this.maxId = maxId; + this.consumerMessageCount = consumerMessageCount; + } + + public long getTotal() { + return total; + } + + public StreamEntryID getMinId() { + return minId; + } + + public StreamEntryID getMaxId() { + return maxId; + } + + public Map getConsumerMessageCount() { + return consumerMessageCount; + } +} diff --git a/src/main/java/redis/clients/jedis/commands/BinaryJedisClusterCommands.java b/src/main/java/redis/clients/jedis/commands/BinaryJedisClusterCommands.java index 4d6d2aeb69..3e11e19543 100644 --- a/src/main/java/redis/clients/jedis/commands/BinaryJedisClusterCommands.java +++ b/src/main/java/redis/clients/jedis/commands/BinaryJedisClusterCommands.java @@ -394,6 +394,8 @@ List georadiusByMemberReadonly(byte[] key, byte[] member, dou List xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername); + Object xpendingSummary(final byte[] key, final byte[] groupname); + List xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[][] ids); Long waitReplicas(byte[] key, int replicas, long timeout); diff --git a/src/main/java/redis/clients/jedis/commands/BinaryJedisCommands.java b/src/main/java/redis/clients/jedis/commands/BinaryJedisCommands.java index bf1683aeed..2a6befa482 100644 --- a/src/main/java/redis/clients/jedis/commands/BinaryJedisCommands.java +++ b/src/main/java/redis/clients/jedis/commands/BinaryJedisCommands.java @@ -390,8 +390,8 @@ List georadiusByMemberReadonly(byte[] key, byte[] member, dou * @return lenth of the value for key */ Long hstrlen(byte[] key, byte[] field); - - + + byte[] xadd(byte[] key, byte[] id, Map hash, long maxLen, boolean approximateLength); Long xlen(byte[] key); @@ -409,7 +409,7 @@ default List xrange(byte[] key, byte[] start, byte[] end, long count) { List xrevrange(byte[] key, byte[] end, byte[] start, int count); Long xack(byte[] key, byte[] group, byte[]... ids); - + String xgroupCreate(byte[] key, byte[] consumer, byte[] id, boolean makeStream); String xgroupSetID(byte[] key, byte[] consumer, byte[] id); @@ -417,13 +417,15 @@ default List xrange(byte[] key, byte[] start, byte[] end, long count) { Long xgroupDestroy(byte[] key, byte[] consumer); Long xgroupDelConsumer(byte[] key, byte[] consumer, byte[] consumerName); - + Long xdel(byte[] key, byte[]... ids); Long xtrim(byte[] key, long maxLen, boolean approximateLength); List xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername); + Object xpendingSummary(byte[] key, byte[] groupname); + List xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[]... ids); /** diff --git a/src/main/java/redis/clients/jedis/commands/Commands.java b/src/main/java/redis/clients/jedis/commands/Commands.java index 0a09b2ef5c..c51f8ce931 100644 --- a/src/main/java/redis/clients/jedis/commands/Commands.java +++ b/src/main/java/redis/clients/jedis/commands/Commands.java @@ -450,6 +450,8 @@ default void restoreReplace(String key, int ttl, byte[] serializedValue) { void xpending(String key, String groupname, StreamEntryID start, StreamEntryID end, int count, String consumername); + void xpendingSummary(String key, String groupname); + void xclaim(String key, String group, String consumername, long minIdleTime, long newIdleTime, int retries, boolean force, StreamEntryID... ids); diff --git a/src/main/java/redis/clients/jedis/commands/JedisClusterCommands.java b/src/main/java/redis/clients/jedis/commands/JedisClusterCommands.java index 9bc758538d..23ae96d143 100644 --- a/src/main/java/redis/clients/jedis/commands/JedisClusterCommands.java +++ b/src/main/java/redis/clients/jedis/commands/JedisClusterCommands.java @@ -9,6 +9,7 @@ import redis.clients.jedis.ScanResult; import redis.clients.jedis.SortingParams; import redis.clients.jedis.StreamEntry; +import redis.clients.jedis.StreamPendingSummary; import redis.clients.jedis.Tuple; import redis.clients.jedis.params.GeoRadiusParam; import redis.clients.jedis.params.SetParams; @@ -517,6 +518,15 @@ List georadiusByMemberReadonly(String key, String member, dou */ List xpending(String key, String groupname, StreamEntryID start, StreamEntryID end, int count, String consumername); + /** + * XPENDING key group + * + * @param key + * @param groupname + * @return + */ + StreamPendingSummary xpendingSummary(String key, String groupname); + /** * XDEL key ID [ID ...] * @param key diff --git a/src/main/java/redis/clients/jedis/commands/JedisCommands.java b/src/main/java/redis/clients/jedis/commands/JedisCommands.java index 8c0c1cf127..5e53e63c50 100644 --- a/src/main/java/redis/clients/jedis/commands/JedisCommands.java +++ b/src/main/java/redis/clients/jedis/commands/JedisCommands.java @@ -18,6 +18,7 @@ import redis.clients.jedis.ScanResult; import redis.clients.jedis.SortingParams; import redis.clients.jedis.StreamEntry; +import redis.clients.jedis.StreamPendingSummary; import redis.clients.jedis.Tuple; import redis.clients.jedis.params.GeoRadiusParam; import redis.clients.jedis.params.SetParams; @@ -518,6 +519,15 @@ List georadiusByMemberReadonly(String key, String member, dou */ List xpending(String key, String groupname, StreamEntryID start, StreamEntryID end, int count, String consumername); + /** + * XPENDING key group + * + * @param key + * @param groupname + * @return + */ + StreamPendingSummary xpendingSummary(String key, String groupname); + /** * XDEL key ID [ID ...] * @param key diff --git a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java index ec4bca48a7..6e5f6ecf1d 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java @@ -329,6 +329,12 @@ public void xpendeing() { assertEquals(1, range.get(0).getValue().size()); assertEquals(map, range.get(0).getValue().get(0).getFields()); + // Get the summary about the pending messages + StreamPendingSummary pendingSummary = jedis.xpendingSummary("xpendeing-stream", "xpendeing-group"); + assertEquals(1, pendingSummary.getTotal()); + assertEquals(id1, pendingSummary.getMinId()); + assertEquals(1l, pendingSummary.getConsumerMessageCount().get("xpendeing-consumer").longValue()); + // Get the pending event List pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group", null, null, 3, "xpendeing-consumer");