From 62e5d4730b98f8f56ceacc42b886e09b00cb949b Mon Sep 17 00:00:00 2001 From: dengliming Date: Thu, 11 Mar 2021 01:05:32 +0800 Subject: [PATCH] Add support for getting Summary info by XPENDING --- .../redis/clients/jedis/BinaryClient.java | 68 +++---- .../java/redis/clients/jedis/BinaryJedis.java | 75 ++++---- .../clients/jedis/BinaryJedisCluster.java | 42 +++-- .../clients/jedis/BinaryShardedJedis.java | 8 +- .../redis/clients/jedis/BuilderFactory.java | 38 +++- src/main/java/redis/clients/jedis/Client.java | 49 +++--- src/main/java/redis/clients/jedis/Jedis.java | 83 +++++---- .../redis/clients/jedis/JedisCluster.java | 34 ++-- .../redis/clients/jedis/ShardedJedis.java | 12 +- .../clients/jedis/StreamPendingSummary.java | 38 ++++ .../commands/BinaryJedisClusterCommands.java | 20 ++- .../jedis/commands/BinaryJedisCommands.java | 18 +- .../clients/jedis/commands/Commands.java | 20 ++- .../jedis/commands/JedisClusterCommands.java | 70 ++++---- .../clients/jedis/commands/JedisCommands.java | 72 ++++---- .../tests/commands/StreamsCommandsTest.java | 166 +++++++++--------- 16 files changed, 482 insertions(+), 331 deletions(-) create mode 100644 src/main/java/redis/clients/jedis/StreamPendingSummary.java diff --git a/src/main/java/redis/clients/jedis/BinaryClient.java b/src/main/java/redis/clients/jedis/BinaryClient.java index 993be834da..d2d9ce677e 100644 --- a/src/main/java/redis/clients/jedis/BinaryClient.java +++ b/src/main/java/redis/clients/jedis/BinaryClient.java @@ -121,10 +121,10 @@ public void setPassword(final String password) { this.password = password; } - + /** * This method should be called only after a successful SELECT command. - * @param db + * @param db */ public void setDb(int db) { this.db = db; @@ -190,7 +190,7 @@ public void set(final byte[] key, final byte[] value, final SetParams params) { public void get(final byte[] key) { sendCommand(GET, key); } - + public void getDel(final byte[] key) { sendCommand(GETDEL, key); } @@ -1155,11 +1155,11 @@ public void srandmember(final byte[] key, final int count) { public void memoryDoctor() { sendCommand(MEMORY, Keyword.DOCTOR.getRaw()); } - + public void memoryUsage(final byte[] key) { sendCommand(MEMORY, Keyword.USAGE.getRaw(), key); } - + public void memoryUsage(final byte[] key, final int samples) { sendCommand(MEMORY, Keyword.USAGE.getRaw(), key, Keyword.SAMPLES.getRaw(), toByteArray(samples)); } @@ -1464,17 +1464,17 @@ public void bitfieldReadonly(final byte[] key, final byte[]... arguments) { public void hstrlen(final byte[] key, final byte[] field) { sendCommand(HSTRLEN, key, field); } - + public void xadd(final byte[] key, final byte[] id, final Map hash, long maxLen, boolean approximateLength) { int maxLexArgs = 0; if(maxLen < Long.MAX_VALUE) { // optional arguments if(approximateLength) { - maxLexArgs = 3; // e.g. MAXLEN ~ 1000 + maxLexArgs = 3; // e.g. MAXLEN ~ 1000 } else { maxLexArgs = 2; // e.g. MAXLEN 1000 } } - + final byte[][] params = new byte[2 + maxLexArgs + hash.size() * 2][]; int index = 0; params[index++] = key; @@ -1485,7 +1485,7 @@ public void xadd(final byte[] key, final byte[] id, final Map ha } params[index++] = toByteArray(maxLen); } - + params[index++] = id; for (final Entry entry : hash.entrySet()) { params[index++] = entry.getKey(); @@ -1493,7 +1493,7 @@ public void xadd(final byte[] key, final byte[] id, final Map ha } sendCommand(XADD, params); } - + public void xlen(final byte[] key) { sendCommand(XLEN, key); } @@ -1524,7 +1524,7 @@ public void xread(final int count, final long block, final Map s params[streamsIndex++] = Keyword.BLOCK.getRaw(); params[streamsIndex++] = toByteArray(block); } - + params[streamsIndex++] = Keyword.STREAMS.getRaw(); int idsIndex = streamsIndex + streams.size(); @@ -1532,10 +1532,10 @@ public void xread(final int count, final long block, final Map s params[streamsIndex++] = entry.getKey(); params[idsIndex++] = entry.getValue(); } - + sendCommand(XREAD, params); } - + public void xack(final byte[] key, final byte[] group, final byte[]... ids) { final byte[][] params = new byte[2 + ids.length][]; int index = 0; @@ -1546,27 +1546,27 @@ public void xack(final byte[] key, final byte[] group, final byte[]... ids) { } sendCommand(XACK, params); } - + public void xgroupCreate(final byte[] key, final byte[] groupname, final byte[] id, boolean makeStream) { if(makeStream) { - sendCommand(XGROUP, Keyword.CREATE.getRaw(), key, groupname, id, Keyword.MKSTREAM.getRaw()); + sendCommand(XGROUP, Keyword.CREATE.getRaw(), key, groupname, id, Keyword.MKSTREAM.getRaw()); } else { - sendCommand(XGROUP, Keyword.CREATE.getRaw(), key, groupname, id); + sendCommand(XGROUP, Keyword.CREATE.getRaw(), key, groupname, id); } } public void xgroupSetID(final byte[] key, final byte[] groupname, final byte[] id) { - sendCommand(XGROUP, Keyword.SETID.getRaw(), key, groupname, id); + sendCommand(XGROUP, Keyword.SETID.getRaw(), key, groupname, id); } public void xgroupDestroy(final byte[] key, final byte[] groupname) { - sendCommand(XGROUP, Keyword.DESTROY.getRaw(), key, groupname); + sendCommand(XGROUP, Keyword.DESTROY.getRaw(), key, groupname); } public void xgroupDelConsumer(final byte[] key, final byte[] groupname, final byte[] consumerName) { - sendCommand(XGROUP, Keyword.DELCONSUMER.getRaw(), key, groupname, consumerName); + sendCommand(XGROUP, Keyword.DELCONSUMER.getRaw(), key, groupname, consumerName); } - + public void xdel(final byte[] key, final byte[]... ids) { final byte[][] params = new byte[1 + ids.length][]; int index = 0; @@ -1576,7 +1576,7 @@ public void xdel(final byte[] key, final byte[]... ids) { } sendCommand(XDEL, params); } - + public void xtrim(byte[] key, long maxLen, boolean approximateLength) { if(approximateLength) { sendCommand(XTRIM, key, Keyword.MAXLEN.getRaw(), Protocol.BYTES_TILDE ,toByteArray(maxLen)); @@ -1584,9 +1584,9 @@ public void xtrim(byte[] key, long maxLen, boolean approximateLength) { sendCommand(XTRIM, key, Keyword.MAXLEN.getRaw(), toByteArray(maxLen)); } } - + public void xreadGroup(byte[] groupname, byte[] consumer, int count, long block, boolean noAck, Map streams) { - + int optional = 0; if(count>0) { optional += 2; @@ -1597,8 +1597,8 @@ public void xreadGroup(byte[] groupname, byte[] consumer, int count, long block, if(noAck) { optional += 1; } - - + + final byte[][] params = new byte[4 + optional + streams.size() * 2][]; int streamsIndex = 0; @@ -1617,17 +1617,17 @@ public void xreadGroup(byte[] groupname, byte[] consumer, int count, long block, params[streamsIndex++] = Keyword.NOACK.getRaw(); } params[streamsIndex++] = Keyword.STREAMS.getRaw(); - + int idsIndex = streamsIndex + streams.size(); for (final Entry entry : streams.entrySet()) { params[streamsIndex++] = entry.getKey(); params[idsIndex++] = entry.getValue(); } - + sendCommand(XREADGROUP, params); } - + public void xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername) { if(consumername == null) { sendCommand(XPENDING, key, groupname, start, end, toByteArray(count)); @@ -1636,15 +1636,19 @@ public void xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int } } + public void xpendingSummary(final byte[] key, final byte[] groupname) { + sendCommand(XPENDING, key, groupname); + } + public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[][] ids) { - + ArrayList arguments = new ArrayList<>(10 + ids.length); arguments.add(key); arguments.add(groupname); arguments.add(consumername); arguments.add(toByteArray(minIdleTime)); - + Collections.addAll(arguments, ids); if(newIdleTime > 0) { @@ -1653,10 +1657,10 @@ public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minId } if(retries > 0) { arguments.add(Keyword.RETRYCOUNT.getRaw()); - arguments.add(toByteArray(retries)); + arguments.add(toByteArray(retries)); } if(force) { - arguments.add(Keyword.FORCE.getRaw()); + arguments.add(Keyword.FORCE.getRaw()); } sendCommand(XCLAIM, arguments.toArray(new byte[arguments.size()][])); } diff --git a/src/main/java/redis/clients/jedis/BinaryJedis.java b/src/main/java/redis/clients/jedis/BinaryJedis.java index e00fae0723..c7af638c08 100644 --- a/src/main/java/redis/clients/jedis/BinaryJedis.java +++ b/src/main/java/redis/clients/jedis/BinaryJedis.java @@ -392,7 +392,7 @@ public byte[] get(final byte[] key) { client.get(key); return client.getBinaryBulkReply(); } - + /** * Get the value of key and delete the key. This command is similar to GET, except for the fact * that it also deletes the key on success (if and only if the key's value type is a string). @@ -1835,14 +1835,14 @@ public Long sunionstore(final byte[] dstkey, final byte[]... keys) { * Return the difference between the Set stored at key1 and all the Sets key2, ..., keyN *

* Example: - * + * *

    * key1 = [x, a, b, c]
    * key2 = [c]
    * key3 = [a, d]
    * SDIFF key1,key2,key3 => [x, b]
    * 
- * + * * Non existing keys are considered like empty sets. *

* Time complexity: @@ -2198,65 +2198,65 @@ public List sort(final byte[] key) { * examples: *

* Given are the following sets and key/values: - * + * *

    * x = [1, 2, 3]
    * y = [a, b, c]
-   * 
+   *
    * k1 = z
    * k2 = y
    * k3 = x
-   * 
+   *
    * w1 = 9
    * w2 = 8
    * w3 = 7
    * 
- * + * * Sort Order: - * + * *
    * sort(x) or sort(x, sp.asc())
    * -> [1, 2, 3]
-   * 
+   *
    * sort(x, sp.desc())
    * -> [3, 2, 1]
-   * 
+   *
    * sort(y)
    * -> [c, a, b]
-   * 
+   *
    * sort(y, sp.alpha())
    * -> [a, b, c]
-   * 
+   *
    * sort(y, sp.alpha().desc())
    * -> [c, a, b]
    * 
- * + * * Limit (e.g. for Pagination): - * + * *
    * sort(x, sp.limit(0, 2))
    * -> [1, 2]
-   * 
+   *
    * sort(y, sp.alpha().desc().limit(1, 2))
    * -> [b, a]
    * 
- * + * * Sorting by external keys: - * + * *
    * sort(x, sb.by(w*))
    * -> [3, 2, 1]
-   * 
+   *
    * sort(x, sb.by(w*).desc())
    * -> [1, 2, 3]
    * 
- * + * * Getting external keys: - * + * *
    * sort(x, sp.by(w*).get(k*))
    * -> [x, y, z]
-   * 
+   *
    * sort(x, sp.by(w*).get(#).get(k*))
    * -> [3, x, 2, y, 1, z]
    * 
@@ -3190,7 +3190,7 @@ public String shutdown() { * Format of the returned String: *

* All the fields are in the form field:value - * + * *

    * edis_version:0.07
    * connected_clients:1
@@ -3203,7 +3203,7 @@ public String shutdown() {
    * uptime_in_seconds:25
    * uptime_in_days:0
    * 
- * + * * Notes *

* used_memory is returned in bytes, and is the total number of bytes allocated by the program @@ -3285,7 +3285,7 @@ public String slaveofNoOne() { * are reported as a list of key-value pairs. *

* Example: - * + * *

    * $ redis-cli config get '*'
    * 1. "dbfilename"
@@ -3300,7 +3300,7 @@ public String slaveofNoOne() {
    * 10. "everysec"
    * 11. "save"
    * 12. "3600 1 300 100 60 10000"
-   * 
+   *
    * $ redis-cli config get 'm*'
    * 1. "masterauth"
    * 2. (nil)
@@ -3332,7 +3332,7 @@ public String configResetStat() {
    * The CONFIG REWRITE command rewrites the redis.conf file the server was started with, applying
    * the minimal changes needed to make it reflect the configuration currently used by the server,
    * which may be different compared to the original one because of the use of the CONFIG SET command.
-   * 
+   *
    * The rewrite is performed in a very conservative way:
    * 
    *
  • Comments and the overall structure of the original redis.conf are preserved as much as possible.
  • @@ -3342,7 +3342,7 @@ public String configResetStat() { *
  • Non used lines are blanked. For instance if you used to have multiple save directives, but * the current configuration has fewer or none as you disabled RDB persistence, all the lines will be blanked.
  • *
- * + * * CONFIG REWRITE is also able to rewrite the configuration file from scratch if the original one * no longer exists for some reason. However if the server was started without a configuration * file at all, the CONFIG REWRITE will just return an error. @@ -3820,14 +3820,14 @@ public byte[] memoryDoctorBinary() { client.memoryDoctor(); return client.getBinaryBulkReply(); } - + @Override public Long memoryUsage(final byte[] key) { checkIsInMultiOrPipeline(); client.memoryUsage(key); return client.getIntegerReply(); } - + @Override public Long memoryUsage(final byte[] key, final int samples) { checkIsInMultiOrPipeline(); @@ -4192,7 +4192,7 @@ public List georadiusReadonly(final byte[] key, final double client.georadiusReadonly(key, longitude, latitude, radius, unit, param); return BuilderFactory.GEORADIUS_WITH_PARAMS_RESULT.build(client.getObjectMultiBulkReply()); } - + @Override public List georadiusByMember(final byte[] key, final byte[] member, final double radius, final GeoUnit unit) { @@ -4390,28 +4390,28 @@ public List xreadGroup(byte[] groupname, byte[] consumer, int count, lon public byte[] xadd(byte[] key, byte[] id, Map hash, long maxLen, boolean approximateLength) { checkIsInMultiOrPipeline(); client.xadd(key, id, hash, maxLen, approximateLength); - return client.getBinaryBulkReply(); + return client.getBinaryBulkReply(); } @Override public Long xlen(byte[] key) { checkIsInMultiOrPipeline(); client.xlen(key); - return client.getIntegerReply(); + return client.getIntegerReply(); } @Override public List xrange(byte[] key, byte[] start, byte[] end, int count) { checkIsInMultiOrPipeline(); client.xrange(key, start, end, count); - return client.getBinaryMultiBulkReply(); + return client.getBinaryMultiBulkReply(); } @Override public List xrevrange(byte[] key, byte[] end, byte[] start, int count) { checkIsInMultiOrPipeline(); client.xrevrange(key, end, start, count); - return client.getBinaryMultiBulkReply(); + return client.getBinaryMultiBulkReply(); } @Override @@ -4470,6 +4470,13 @@ public List xpending(byte[] key, byte[] groupname, byte[] start, byte[] return client.getObjectMultiBulkReply(); } + @Override + public Object xpendingSummary(final byte[] key, final byte[] groupname) { + checkIsInMultiOrPipeline(); + client.xpendingSummary(key, groupname); + return client.getOne(); + } + @Override public List xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[]... ids) { checkIsInMultiOrPipeline(); diff --git a/src/main/java/redis/clients/jedis/BinaryJedisCluster.java b/src/main/java/redis/clients/jedis/BinaryJedisCluster.java index f4814ff996..2e54b0aef8 100644 --- a/src/main/java/redis/clients/jedis/BinaryJedisCluster.java +++ b/src/main/java/redis/clients/jedis/BinaryJedisCluster.java @@ -168,7 +168,7 @@ public byte[] execute(Jedis connection) { } }.runBinary(key); } - + @Override public byte[] getDel(final byte[] key) { return new JedisClusterCommand(connectionHandler, maxAttempts) { @@ -2139,7 +2139,7 @@ public ScanResult execute(Jedis connection) { } }.runBinary(matchPattern); } - + @Override public ScanResult> hscan(final byte[] key, final byte[] cursor) { return new JedisClusterCommand>>(connectionHandler, @@ -2232,7 +2232,7 @@ public Long execute(Jedis connection) { } }.runBinary(key); } - + @Override public Long memoryUsage(final byte[] key) { return new JedisClusterCommand(connectionHandler, maxAttempts) { @@ -2242,7 +2242,7 @@ public Long execute(Jedis connection) { } }.runBinary(key); } - + @Override public Long memoryUsage(final byte[] key, final int samples) { return new JedisClusterCommand(connectionHandler, maxAttempts) { @@ -2252,7 +2252,7 @@ public Long execute(Jedis connection) { } }.runBinary(key); } - + @Override public byte[] xadd(final byte[] key, final byte[] id, final Map hash, final long maxLen, final boolean approximateLength){ return new JedisClusterCommand(connectionHandler, maxAttempts) { @@ -2300,19 +2300,19 @@ public List xrevrange(final byte[] key, final byte[] end, final byte[] s public List execute(Jedis connection) { return connection.xrevrange(key, end, start, count); } - }.runBinary(key); + }.runBinary(key); } @Override public List xread(final int count, final long block, final Map streams) { byte[][] keys = streams.keySet().toArray(new byte[streams.size()][]); - + return new JedisClusterCommand>(connectionHandler, maxAttempts) { @Override public List execute(Jedis connection) { return connection.xread(count, block, streams); } - }.runBinary(keys.length, keys); + }.runBinary(keys.length, keys); } @Override @@ -2322,7 +2322,7 @@ public Long xack(final byte[] key, final byte[] group, final byte[]... ids) { public Long execute(Jedis connection) { return connection.xack(key, group, ids); } - }.runBinary(key); + }.runBinary(key); } @Override @@ -2332,7 +2332,7 @@ public String xgroupCreate(final byte[] key, final byte[] consumer, final byte[] public String execute(Jedis connection) { return connection.xgroupCreate(key, consumer, id, makeStream); } - }.runBinary(key); + }.runBinary(key); } @Override @@ -2366,11 +2366,11 @@ public Long execute(Jedis connection) { } @Override - public List xreadGroup(final byte[] groupname, final byte[] consumer, final int count, final long block, + public List xreadGroup(final byte[] groupname, final byte[] consumer, final int count, final long block, final boolean noAck, final Map streams){ - + byte[][] keys = streams.keySet().toArray(new byte[streams.size()][]); - + return new JedisClusterCommand>(connectionHandler, maxAttempts) { @Override public List execute(Jedis connection) { @@ -2398,9 +2398,9 @@ public Long execute(Jedis connection) { } }.runBinary(key); } - + @Override - public List xpending(final byte[] key, final byte[] groupname, final byte[] start, final byte[] end, + public List xpending(final byte[] key, final byte[] groupname, final byte[] start, final byte[] end, final int count, final byte[] consumername) { return new JedisClusterCommand>(connectionHandler, maxAttempts) { @Override @@ -2411,7 +2411,17 @@ public List execute(Jedis connection) { } @Override - public List xclaim(final byte[] key, final byte[] groupname, final byte[] consumername, + public Object xpendingSummary(final byte[] key, final byte[] groupname) { + return new JedisClusterCommand(connectionHandler, maxAttempts) { + @Override + public Object execute(Jedis connection) { + return connection.xpendingSummary(key, groupname); + } + }.runBinary(key); + } + + @Override + public List xclaim(final byte[] key, final byte[] groupname, final byte[] consumername, final long minIdleTime, final long newIdleTime, final int retries, final boolean force, final byte[][] ids) { return new JedisClusterCommand>(connectionHandler, maxAttempts) { @Override diff --git a/src/main/java/redis/clients/jedis/BinaryShardedJedis.java b/src/main/java/redis/clients/jedis/BinaryShardedJedis.java index aedc646fda..6d4b551ded 100644 --- a/src/main/java/redis/clients/jedis/BinaryShardedJedis.java +++ b/src/main/java/redis/clients/jedis/BinaryShardedJedis.java @@ -90,7 +90,7 @@ public byte[] getDel(final byte[] key) { Jedis j = getShard(key); return j.getDel(key); } - + @Override public Boolean exists(final byte[] key) { Jedis j = getShard(key); @@ -1130,6 +1130,12 @@ public List xpending(byte[] key, byte[] groupname, byte[] start, byte[] return j.xpending(key, groupname, start, end, count, consumername); } + @Override + public Object xpendingSummary(final byte[] key, final byte[] groupname) { + Jedis j = getShard(key); + return j.xpendingSummary(key, groupname); + } + @Override public List xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[]... ids) { diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index de45c28f5a..e969e95e63 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -323,7 +323,7 @@ public String toString() { } }; - + public static final Builder EVAL_RESULT = new Builder() { @Override @@ -665,7 +665,7 @@ public String toString() { return "StreamEntryID"; } }; - + public static final Builder> STREAM_ENTRY_LIST = new Builder>() { @Override @@ -689,7 +689,7 @@ public List build(Object data) { String entryIdString = SafeEncoder.encode((byte[])res.get(0)); StreamEntryID entryID = new StreamEntryID(entryIdString); List hash = (List)res.get(1); - + Iterator hashIterator = hash.iterator(); Map map = new HashMap<>(hash.size()/2); while(hashIterator.hasNext()) { @@ -738,7 +738,7 @@ public String toString() { return "StreamEntry"; } }; - + public static final Builder> STREAM_PENDING_ENTRY_LIST = new Builder>() { @Override @SuppressWarnings("unchecked") @@ -746,14 +746,14 @@ public List build(Object data) { if (null == data) { return null; } - + List streamsEntries = (List)data; List result = new ArrayList<>(streamsEntries.size()); for(Object streamObj : streamsEntries) { List stream = (List)streamObj; String id = SafeEncoder.encode((byte[])stream.get(0)); String consumerName = SafeEncoder.encode((byte[])stream.get(1)); - long idleTime = BuilderFactory.LONG.build(stream.get(2)); + long idleTime = BuilderFactory.LONG.build(stream.get(2)); long deliveredTimes = BuilderFactory.LONG.build(stream.get(3)); result.add(new StreamPendingEntry(new StreamEntryID(id), consumerName, idleTime, deliveredTimes)); } @@ -898,6 +898,32 @@ public String toString() { } }; + public static final Builder STREAM_PENDING_SUMMARY = new Builder() { + @Override + @SuppressWarnings("unchecked") + public StreamPendingSummary build(Object data) { + if (null == data) { + return null; + } + + List objectList = (List) data; + long total = BuilderFactory.LONG.build(objectList.get(0)); + String minId = SafeEncoder.encode((byte[]) objectList.get(1)); + String maxId = SafeEncoder.encode((byte[]) objectList.get(2)); + List> consumerObjList = (List>) objectList.get(3); + Map map = new HashMap<>(consumerObjList.size()); + for (List consumerObj : consumerObjList) { + map.put(SafeEncoder.encode((byte[]) consumerObj.get(0)), Long.parseLong(SafeEncoder.encode((byte[]) consumerObj.get(1)))); + } + return new StreamPendingSummary(total, new StreamEntryID(minId), new StreamEntryID(maxId), map); + } + + @Override + public String toString() { + return "StreamPendingSummary"; + } + }; + private static Map createMapFromDecodingFunctions( Iterator iterator, Map mappingFunctions) { Map resultMap = new HashMap<>(); diff --git a/src/main/java/redis/clients/jedis/Client.java b/src/main/java/redis/clients/jedis/Client.java index 3343562e3b..f8409467b2 100644 --- a/src/main/java/redis/clients/jedis/Client.java +++ b/src/main/java/redis/clients/jedis/Client.java @@ -73,7 +73,7 @@ public Client(final JedisSocketFactory jedisSocketFactory) { public void ping(final String message) { ping(SafeEncoder.encode(message)); } - + @Override public void set(final String key, final String value) { set(SafeEncoder.encode(key), SafeEncoder.encode(value)); @@ -88,7 +88,7 @@ public void set(final String key, final String value, final SetParams params) { public void get(final String key) { get(SafeEncoder.encode(key)); } - + @Override public void getDel(final String key) { getDel(SafeEncoder.encode(key)); @@ -973,11 +973,11 @@ public void psetex(final String key, final long milliseconds, final String value public void srandmember(final String key, final int count) { srandmember(SafeEncoder.encode(key), count); } - + public void memoryUsage(final String key) { memoryUsage(SafeEncoder.encode(key)); } - + public void memoryUsage(final String key, final int samples) { memoryUsage(SafeEncoder.encode(key), samples); } @@ -1301,22 +1301,22 @@ public void xadd(final String key, final StreamEntryID id, final Map... streams) { final Map bhash = new HashMap<>(streams.length); @@ -1325,17 +1325,17 @@ public void xread(final int count, final long block, final Entry entry : streams) { bhash.put(SafeEncoder.encode(entry.getKey()), SafeEncoder.encode(entry.getValue()==null ? ">" : entry.getValue().toString())); } - xreadGroup(SafeEncoder.encode(groupname), SafeEncoder.encode(consumer), count, block, noAck, bhash); + xreadGroup(SafeEncoder.encode(groupname), SafeEncoder.encode(consumer), count, block, noAck, bhash); } @Override public void xpending(String key, String groupname, StreamEntryID start, StreamEntryID end, int count, String consumername) { xpending(SafeEncoder.encode(key), SafeEncoder.encode(groupname), SafeEncoder.encode(start==null ? "-" : start.toString()), - SafeEncoder.encode(end==null ? "+" : end.toString()), count, consumername == null? null : SafeEncoder.encode(consumername)); + SafeEncoder.encode(end==null ? "+" : end.toString()), count, consumername == null? null : SafeEncoder.encode(consumername)); + } + + @Override + public void xpendingSummary(String key, String groupname) { + xpendingSummary(SafeEncoder.encode(key), SafeEncoder.encode(groupname)); } @Override public void xclaim(String key, String group, String consumername, long minIdleTime, long newIdleTime, int retries, boolean force, StreamEntryID... ids) { - + final byte[][] bids = new byte[ids.length][]; for (int i = 0; i < ids.length; i++) { bids[i] = SafeEncoder.encode(ids[i].toString()); } - xclaim(SafeEncoder.encode(key), SafeEncoder.encode(group), SafeEncoder.encode(consumername), minIdleTime, newIdleTime, retries, force, bids); + xclaim(SafeEncoder.encode(key), SafeEncoder.encode(group), SafeEncoder.encode(consumername), minIdleTime, newIdleTime, retries, force, bids); } @Override @@ -1417,5 +1422,5 @@ public void xinfoConsumers(String key, String group) { xinfoConsumers(SafeEncoder.encode(key),SafeEncoder.encode(group)); } - + } diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index 1c47aad8c8..05f1d4c8a4 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -241,7 +241,7 @@ public String getDel(final String key) { client.getDel(key); return client.getBulkReply(); } - + /** * Test if the specified keys exist. The command returns the number of keys exist. * Time complexity: O(N) @@ -1519,14 +1519,14 @@ public Long sunionstore(final String dstkey, final String... keys) { * Return the difference between the Set stored at key1 and all the Sets key2, ..., keyN *

* Example: - * + * *

    * key1 = [x, a, b, c]
    * key2 = [c]
    * key3 = [a, d]
    * SDIFF key1,key2,key3 => [x, b]
    * 
- * + * * Non existing keys are considered like empty sets. *

* Time complexity: @@ -1861,65 +1861,65 @@ public List sort(final String key) { * examples: *

* Given are the following sets and key/values: - * + * *

    * x = [1, 2, 3]
    * y = [a, b, c]
-   * 
+   *
    * k1 = z
    * k2 = y
    * k3 = x
-   * 
+   *
    * w1 = 9
    * w2 = 8
    * w3 = 7
    * 
- * + * * Sort Order: - * + * *
    * sort(x) or sort(x, sp.asc())
    * -> [1, 2, 3]
-   * 
+   *
    * sort(x, sp.desc())
    * -> [3, 2, 1]
-   * 
+   *
    * sort(y)
    * -> [c, a, b]
-   * 
+   *
    * sort(y, sp.alpha())
    * -> [a, b, c]
-   * 
+   *
    * sort(y, sp.alpha().desc())
    * -> [c, a, b]
    * 
- * + * * Limit (e.g. for Pagination): - * + * *
    * sort(x, sp.limit(0, 2))
    * -> [1, 2]
-   * 
+   *
    * sort(y, sp.alpha().desc().limit(1, 2))
    * -> [b, a]
    * 
- * + * * Sorting by external keys: - * + * *
    * sort(x, sb.by(w*))
    * -> [3, 2, 1]
-   * 
+   *
    * sort(x, sb.by(w*).desc())
    * -> [1, 2, 3]
    * 
- * + * * Getting external keys: - * + * *
    * sort(x, sp.by(w*).get(k*))
    * -> [x, y, z]
-   * 
+   *
    * sort(x, sp.by(w*).get(#).get(k*))
    * -> [3, x, 2, y, 1, z]
    * 
@@ -2006,8 +2006,8 @@ public List blpop(final int timeout, final String... keys) { private String[] getArgsAddTimeout(int timeout, String[] keys) { final int keyCount = keys.length; final String[] args = new String[keyCount + 1]; - - System.arraycopy(keys, 0, args, 0, keyCount); + + System.arraycopy(keys, 0, args, 0, keyCount); args[keyCount] = String.valueOf(timeout); return args; @@ -2488,7 +2488,7 @@ public Set zrevrangeByScoreWithScores(final String key, final String max, * @param key * @param start * @param stop - * @return + * @return */ @Override public Long zremrangeByRank(final String key, final long start, final long stop) { @@ -2860,7 +2860,7 @@ public Long bitpos(final String key, final boolean value, final BitPosParams par * are reported as a list of key-value pairs. *

* Example: - * + * *

    * $ redis-cli config get '*'
    * 1. "dbfilename"
@@ -2875,7 +2875,7 @@ public Long bitpos(final String key, final boolean value, final BitPosParams par
    * 10. "everysec"
    * 11. "save"
    * 12. "3600 1 300 100 60 10000"
-   * 
+   *
    * $ redis-cli config get 'm*'
    * 1. "masterauth"
    * 2. (nil)
@@ -3125,7 +3125,7 @@ public Long bitop(final BitOP op, final String destKey, final String... srcKeys)
    *    22) "2"
    *    23) "quorum"
    *    24) "2"
-   * 
+   *
    * 
* @return */ @@ -3919,26 +3919,26 @@ public String memoryDoctor() { client.memoryDoctor(); return client.getBulkReply(); } - + @Override public Long memoryUsage(final String key) { checkIsInMultiOrPipeline(); client.memoryUsage(key); return client.getIntegerReply(); } - + @Override public Long memoryUsage(final String key, final int samples) { checkIsInMultiOrPipeline(); client.memoryUsage(key, samples); return client.getIntegerReply(); } - + @Override public StreamEntryID xadd(final String key, final StreamEntryID id, final Map hash) { return xadd(key, id, hash, Long.MAX_VALUE, false); } - + @Override public StreamEntryID xadd(final String key, StreamEntryID id, final Map hash, final long maxLen, final boolean approximateLength) { checkIsInMultiOrPipeline(); @@ -3963,7 +3963,7 @@ public List xrange(final String key, final StreamEntryID start, fin client.xrange(key, start, end, count); return BuilderFactory.STREAM_ENTRY_LIST.build(client.getObjectMultiBulkReply()); } - + /** * {@inheritDoc} */ @@ -3983,13 +3983,13 @@ public List>> xread(final int count, final long checkIsInMultiOrPipeline(); client.xread(count, block, streams); client.setTimeoutInfinite(); - + try { List streamsEntries = client.getObjectMultiBulkReply(); if(streamsEntries == null) { return new ArrayList<>(); } - + List>> result = new ArrayList<>(streamsEntries.size()); for(Object streamObj : streamsEntries) { List stream = (List)streamObj; @@ -3997,7 +3997,7 @@ public List>> xread(final int count, final long List streamEntries = BuilderFactory.STREAM_ENTRY_LIST.build(stream.get(1)); result.add(new AbstractMap.SimpleEntry<>(streamId, streamEntries)); } - + return result; } finally { client.rollbackTimeout(); @@ -4071,7 +4071,7 @@ public List>> xreadGroup(final String groupname, if(streamsEntries == null) { return null; } - + List>> result = new ArrayList<>(streamsEntries.size()); for(Object streamObj : streamsEntries) { List stream = (List)streamObj; @@ -4092,16 +4092,23 @@ public List xpending(final String key, final String groupnam client.xpending(key, groupname, start, end, count, consumername); // TODO handle consumername == NULL case - + return BuilderFactory.STREAM_PENDING_ENTRY_LIST.build(client.getObjectMultiBulkReply()); } + @Override + public StreamPendingSummary xpendingSummary(final String key, final String groupname) { + checkIsInMultiOrPipeline(); + client.xpendingSummary(key, groupname); + return BuilderFactory.STREAM_PENDING_SUMMARY.build(client.getObjectMultiBulkReply()); + } + @Override public List xclaim(String key, String group, String consumername, long minIdleTime, long newIdleTime, int retries, boolean force, StreamEntryID... ids) { checkIsInMultiOrPipeline(); client.xclaim( key, group, consumername, minIdleTime, newIdleTime, retries, force, ids); - + return BuilderFactory.STREAM_ENTRY_LIST.build(client.getObjectMultiBulkReply()); } diff --git a/src/main/java/redis/clients/jedis/JedisCluster.java b/src/main/java/redis/clients/jedis/JedisCluster.java index ee2142e90d..1a8acb45cf 100644 --- a/src/main/java/redis/clients/jedis/JedisCluster.java +++ b/src/main/java/redis/clients/jedis/JedisCluster.java @@ -256,7 +256,7 @@ public String execute(Jedis connection) { } }.run(key); } - + @Override public String getDel(final String key) { return new JedisClusterCommand(connectionHandler, maxAttempts) { @@ -1601,7 +1601,7 @@ public ScanResult execute(Jedis connection) { } }.run(matchPattern); } - + @Override public ScanResult> hscan(final String key, final String cursor) { return new JedisClusterCommand>>(connectionHandler, @@ -2310,7 +2310,7 @@ public Long execute(Jedis connection) { }.run(key); } - + @Override public Long memoryUsage(final String key) { return new JedisClusterCommand(connectionHandler, maxAttempts) { @@ -2320,7 +2320,7 @@ public Long execute(Jedis connection) { } }.run(key); } - + @Override public Long memoryUsage(final String key, final int samples) { return new JedisClusterCommand(connectionHandler, maxAttempts) { @@ -2330,7 +2330,7 @@ public Long execute(Jedis connection) { } }.run(key); } - + @Override public StreamEntryID xadd(final String key, final StreamEntryID id, final Map hash) { return new JedisClusterCommand(connectionHandler, maxAttempts) { @@ -2378,7 +2378,7 @@ public List xrevrange(final String key, final StreamEntryID end, fi public List execute(Jedis connection) { return connection.xrevrange(key, end, start, count); } - }.run(key); + }.run(key); } @Override @@ -2387,13 +2387,13 @@ public List>> xread(final int count, final long for(int i=0; i>>>(connectionHandler, maxAttempts) { @Override public List>> execute(Jedis connection) { return connection.xread(count, block, streams); } - }.run(keys.length, keys); + }.run(keys.length, keys); } @Override @@ -2403,7 +2403,7 @@ public Long xack(final String key, final String group, final StreamEntryID... id public Long execute(Jedis connection) { return connection.xack(key, group, ids); } - }.run(key); + }.run(key); } @Override @@ -2413,7 +2413,7 @@ public String xgroupCreate(final String key, final String groupname, final Strea public String execute(Jedis connection) { return connection.xgroupCreate(key, groupname, id, makeStream); } - }.run(key); + }.run(key); } @Override @@ -2449,12 +2449,12 @@ public Long execute(Jedis connection) { @Override public List>> xreadGroup(final String groupname, final String consumer, final int count, final long block, final boolean noAck, final Entry... streams) { - + String[] keys = new String[streams.length]; for(int i=0; i>>>(connectionHandler, maxAttempts) { @Override public List>> execute(Jedis connection) { @@ -2474,6 +2474,16 @@ public List execute(Jedis connection) { }.run(key); } + @Override + public StreamPendingSummary xpendingSummary(final String key, final String groupname) { + return new JedisClusterCommand(connectionHandler, maxAttempts) { + @Override + public StreamPendingSummary execute(Jedis connection) { + return connection.xpendingSummary(key, groupname); + } + }.run(key); + } + @Override public Long xdel(final String key, final StreamEntryID... ids) { return new JedisClusterCommand(connectionHandler, maxAttempts) { diff --git a/src/main/java/redis/clients/jedis/ShardedJedis.java b/src/main/java/redis/clients/jedis/ShardedJedis.java index 3a407cc36d..268e99cc18 100644 --- a/src/main/java/redis/clients/jedis/ShardedJedis.java +++ b/src/main/java/redis/clients/jedis/ShardedJedis.java @@ -53,7 +53,7 @@ public String get(final String key) { Jedis j = getShard(key); return j.get(key); } - + @Override public String getDel(final String key) { Jedis j = getShard(key); @@ -1052,7 +1052,7 @@ public StreamEntryID xadd(String key, StreamEntryID id, Map hash Jedis j = getShard(key); return j.xadd(key, id, hash); } - + @Override public StreamEntryID xadd(String key, StreamEntryID id, Map hash, long maxLen, boolean approximateLength) { Jedis j = getShard(key); @@ -1064,7 +1064,7 @@ public Long xlen(String key) { Jedis j = getShard(key); return j.xlen(key); } - + @Override public List xrange(String key, StreamEntryID start, StreamEntryID end, int count) { Jedis j = getShard(key); @@ -1127,6 +1127,12 @@ public List xpending(String key, String groupname, StreamEnt return j.xpending(key, groupname, start, end, count, consumername); } + @Override + public StreamPendingSummary xpendingSummary(String key, String groupname) { + Jedis j = getShard(key); + return j.xpendingSummary(key, groupname); + } + @Override public List xclaim(String key, String group, String consumername, long minIdleTime, long newIdleTime, int retries, boolean force, StreamEntryID... ids) { diff --git a/src/main/java/redis/clients/jedis/StreamPendingSummary.java b/src/main/java/redis/clients/jedis/StreamPendingSummary.java new file mode 100644 index 0000000000..46a670df6a --- /dev/null +++ b/src/main/java/redis/clients/jedis/StreamPendingSummary.java @@ -0,0 +1,38 @@ +package redis.clients.jedis; + +import java.io.Serializable; +import java.util.Map; + +public class StreamPendingSummary implements Serializable { + + private static final long serialVersionUID = 1L; + + private final long total; + private final StreamEntryID minId; + private final StreamEntryID maxId; + private final Map consumerMessageCount; + + public StreamPendingSummary(long total, StreamEntryID minId, StreamEntryID maxId, + Map consumerMessageCount) { + this.total = total; + this.minId = minId; + this.maxId = maxId; + this.consumerMessageCount = consumerMessageCount; + } + + public long getTotal() { + return total; + } + + public StreamEntryID getMinId() { + return minId; + } + + public StreamEntryID getMaxId() { + return maxId; + } + + public Map getConsumerMessageCount() { + return consumerMessageCount; + } +} diff --git a/src/main/java/redis/clients/jedis/commands/BinaryJedisClusterCommands.java b/src/main/java/redis/clients/jedis/commands/BinaryJedisClusterCommands.java index d155fee5e1..13715eb21f 100644 --- a/src/main/java/redis/clients/jedis/commands/BinaryJedisClusterCommands.java +++ b/src/main/java/redis/clients/jedis/commands/BinaryJedisClusterCommands.java @@ -348,20 +348,20 @@ List georadiusByMemberReadonly(byte[] key, byte[] member, dou * Executes BITFIELD Redis command * @param key * @param arguments - * @return + * @return */ List bitfield(byte[] key, byte[]... arguments); List bitfieldReadonly(byte[] key, byte[]... arguments); - + /** * Used for HSTRLEN Redis command - * @param key + * @param key * @param field - * @return + * @return */ Long hstrlen(byte[] key, byte[] field); - + byte[] xadd(final byte[] key, final byte[] id, final Map hash, long maxLen, boolean approximateLength); Long xlen(final byte[] key); @@ -377,7 +377,7 @@ List georadiusByMemberReadonly(byte[] key, byte[] member, dou List xrevrange(final byte[] key, final byte[] end, final byte[] start, final int count); Long xack(final byte[] key, final byte[] group, final byte[]... ids); - + String xgroupCreate(final byte[] key, final byte[] consumer, final byte[] id, boolean makeStream); String xgroupSetID(final byte[] key, final byte[] consumer, final byte[] id); @@ -385,18 +385,20 @@ List georadiusByMemberReadonly(byte[] key, byte[] member, dou Long xgroupDestroy(final byte[] key, final byte[] consumer); Long xgroupDelConsumer(final byte[] key, final byte[] consumer, final byte[] consumerName); - + Long xdel(final byte[] key, final byte[]... ids); Long xtrim(byte[] key, long maxLen, boolean approximateLength); List xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername); + Object xpendingSummary(final byte[] key, final byte[] groupname); + List xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[][] ids); Long waitReplicas(byte[] key, final int replicas, final long timeout); - + Long memoryUsage(final byte[] key); - + Long memoryUsage(final byte[] key, final int samples); } diff --git a/src/main/java/redis/clients/jedis/commands/BinaryJedisCommands.java b/src/main/java/redis/clients/jedis/commands/BinaryJedisCommands.java index 632a78cd10..dc71a96246 100644 --- a/src/main/java/redis/clients/jedis/commands/BinaryJedisCommands.java +++ b/src/main/java/redis/clients/jedis/commands/BinaryJedisCommands.java @@ -30,7 +30,7 @@ public interface BinaryJedisCommands { String set(byte[] key, byte[] value, SetParams params); byte[] get(byte[] key); - + byte[] getDel(byte[] key); Boolean exists(byte[] key); @@ -375,21 +375,21 @@ List georadiusByMemberReadonly(byte[] key, byte[] member, dou * Executes BITFIELD Redis command * @param key * @param arguments - * @return + * @return */ List bitfield(byte[] key, byte[]... arguments); List bitfieldReadonly(byte[] key, byte[]... arguments); - + /** * Used for HSTRLEN Redis command - * @param key + * @param key * @param field * @return lenth of the value for key */ Long hstrlen(byte[] key, byte[] field); - - + + byte[] xadd(final byte[] key, final byte[] id, final Map hash, long maxLen, boolean approximateLength); Long xlen(final byte[] key); @@ -407,7 +407,7 @@ default List xrange(final byte[] key, final byte[] start, final byte[] e List xrevrange(final byte[] key, final byte[] end, final byte[] start, final int count); Long xack(final byte[] key, final byte[] group, final byte[]... ids); - + String xgroupCreate(final byte[] key, final byte[] consumer, final byte[] id, boolean makeStream); String xgroupSetID(final byte[] key, final byte[] consumer, final byte[] id); @@ -415,13 +415,15 @@ default List xrange(final byte[] key, final byte[] start, final byte[] e Long xgroupDestroy(final byte[] key, final byte[] consumer); Long xgroupDelConsumer(final byte[] key, final byte[] consumer, final byte[] consumerName); - + Long xdel(final byte[] key, final byte[]... ids); Long xtrim(byte[] key, long maxLen, boolean approximateLength); List xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername); + Object xpendingSummary(final byte[] key, final byte[] groupname); + List xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[]... ids); /** diff --git a/src/main/java/redis/clients/jedis/commands/Commands.java b/src/main/java/redis/clients/jedis/commands/Commands.java index 19b3338d92..17cf2d7bc5 100644 --- a/src/main/java/redis/clients/jedis/commands/Commands.java +++ b/src/main/java/redis/clients/jedis/commands/Commands.java @@ -19,13 +19,13 @@ public interface Commands { void ping(String message); - + void set(String key, String value); void set(String key, String value, SetParams params); void get(String key); - + void getDel(String key); void exists(String... keys); @@ -229,9 +229,9 @@ default void setex(String key, int seconds, String value) { void zscore(String key, String member); void zmscore(String key, String... members); - + void zpopmax(String key); - + void zpopmax(String key, int count); void zpopmin(String key); @@ -421,17 +421,17 @@ default void restoreReplace(String key, int ttl, byte[] serializedValue) { void memoryDoctor(); void xadd(String key, StreamEntryID id, Map hash, long maxLen, boolean approximateLength); - + void xlen(String key); void xrange(String key, StreamEntryID start, StreamEntryID end, long count); - + void xrevrange(String key, StreamEntryID end, StreamEntryID start, int count); - + void xread(int count, long block, Entry... streams); - + void xack(String key, String group, StreamEntryID... ids); - + void xgroupCreate(String key, String consumer, StreamEntryID id, boolean makeStream); void xgroupSetID(String key, String consumer, StreamEntryID id); @@ -448,6 +448,8 @@ default void restoreReplace(String key, int ttl, byte[] serializedValue) { void xpending(String key, String groupname, StreamEntryID start, StreamEntryID end, int count, String consumername); + void xpendingSummary(String key, String groupname); + void xclaim(String key, String group, String consumername, long minIdleTime, long newIdleTime, int retries, boolean force, StreamEntryID... ids); diff --git a/src/main/java/redis/clients/jedis/commands/JedisClusterCommands.java b/src/main/java/redis/clients/jedis/commands/JedisClusterCommands.java index 4d8b83edea..9daf3ba355 100644 --- a/src/main/java/redis/clients/jedis/commands/JedisClusterCommands.java +++ b/src/main/java/redis/clients/jedis/commands/JedisClusterCommands.java @@ -9,6 +9,7 @@ import redis.clients.jedis.ScanResult; import redis.clients.jedis.SortingParams; import redis.clients.jedis.StreamEntry; +import redis.clients.jedis.StreamPendingSummary; import redis.clients.jedis.Tuple; import redis.clients.jedis.params.GeoRadiusParam; import redis.clients.jedis.params.SetParams; @@ -26,7 +27,7 @@ public interface JedisClusterCommands { String set(String key, String value, SetParams params); String get(String key); - + String getDel(String key); Boolean exists(String key); @@ -357,15 +358,15 @@ List georadiusByMemberReadonly(String key, String member, dou * Executes BITFIELD Redis command * @param key * @param arguments - * @return + * @return */ List bitfield(String key, String...arguments); List bitfieldReadonly(String key, String...arguments); - + /** * Used for HSTRLEN Redis command - * @param key + * @param key * @param field * @return lenth of the value for key */ @@ -373,25 +374,25 @@ List georadiusByMemberReadonly(String key, String member, dou /** * MEMORY USAGE key - * + * * @param key * @return the memory usage */ Long memoryUsage(String key); /** - * MEMORY USAGE key [SAMPLES count] - * + * MEMORY USAGE key [SAMPLES count] + * * @param key * @param samples * @return the memory usage */ Long memoryUsage(String key, int samples); - + /** * XADD key ID field string [field string ...] - * + * * @param key * @param id * @param hash @@ -401,7 +402,7 @@ List georadiusByMemberReadonly(String key, String member, dou /** * XADD key MAXLEN ~ LEN ID field string [field string ...] - * + * * @param key * @param id * @param hash @@ -410,10 +411,10 @@ List georadiusByMemberReadonly(String key, String member, dou * @return */ StreamEntryID xadd(String key, StreamEntryID id, Map hash, long maxLen, boolean approximateLength); - + /** * XLEN key - * + * * @param key * @return */ @@ -421,7 +422,7 @@ List georadiusByMemberReadonly(String key, String member, dou /** * XRANGE key start end [COUNT count] - * + * * @param key * @param start * @param end @@ -439,14 +440,14 @@ List georadiusByMemberReadonly(String key, String member, dou * @return */ List xrevrange(String key, StreamEntryID end, StreamEntryID start, int count); - + /** * @deprecated Will be removed in future version. Use * {@link MultiKeyJedisClusterCommands#xread(int, long, java.util.Map.Entry...)}. */ @Deprecated List>> xread(int count, long block, Map.Entry... streams); - + /** * XACK key group ID [ID ...] * @param key @@ -455,38 +456,38 @@ List georadiusByMemberReadonly(String key, String member, dou * @return */ Long xack(String key, String group, StreamEntryID... ids); - + /** * XGROUP CREATE - * + * * @param key * @param groupname * @param id * @return */ String xgroupCreate( String key, String groupname, StreamEntryID id, boolean makeStream); - + /** * XGROUP SETID - * + * * @param key * @param groupname * @param id * @return */ String xgroupSetID( String key, String groupname, StreamEntryID id); - + /** * XGROUP DESTROY - * + * * @param key * @param groupname * @return */ Long xgroupDestroy( String key, String groupname); - + /** - * XGROUP DELCONSUMER + * XGROUP DELCONSUMER * @param key * @param groupname * @param consumername @@ -501,10 +502,10 @@ List georadiusByMemberReadonly(String key, String member, dou @Deprecated List>> xreadGroup(String groupname, String consumer, int count, long block, boolean noAck, Map.Entry... streams); - + /** * XPENDING key group [start end count] [consumer] - * + * * @param key * @param groupname * @param start @@ -514,7 +515,16 @@ List georadiusByMemberReadonly(String key, String member, dou * @return */ List xpending(String key, String groupname, StreamEntryID start, StreamEntryID end, int count, String consumername); - + + /** + * XPENDING key group + * + * @param key + * @param groupname + * @return + */ + StreamPendingSummary xpendingSummary(String key, String groupname); + /** * XDEL key ID [ID ...] * @param key @@ -522,7 +532,7 @@ List georadiusByMemberReadonly(String key, String member, dou * @return */ Long xdel( String key, StreamEntryID... ids); - + /** * XTRIM key MAXLEN [~] count * @param key @@ -531,13 +541,13 @@ List georadiusByMemberReadonly(String key, String member, dou * @return */ Long xtrim( String key, long maxLen, boolean approximateLength); - + /** * XCLAIM * [IDLE ] [TIME ] [RETRYCOUNT ] * [FORCE] [JUSTID] - */ - List xclaim( String key, String group, String consumername, long minIdleTime, + */ + List xclaim( String key, String group, String consumername, long minIdleTime, long newIdleTime, int retries, boolean force, StreamEntryID... ids); Long waitReplicas(final String key, final int replicas, final long timeout); diff --git a/src/main/java/redis/clients/jedis/commands/JedisCommands.java b/src/main/java/redis/clients/jedis/commands/JedisCommands.java index cb50b08ae5..07bdd33a71 100644 --- a/src/main/java/redis/clients/jedis/commands/JedisCommands.java +++ b/src/main/java/redis/clients/jedis/commands/JedisCommands.java @@ -18,6 +18,7 @@ import redis.clients.jedis.ScanResult; import redis.clients.jedis.SortingParams; import redis.clients.jedis.StreamEntry; +import redis.clients.jedis.StreamPendingSummary; import redis.clients.jedis.Tuple; import redis.clients.jedis.params.GeoRadiusParam; import redis.clients.jedis.params.SetParams; @@ -34,7 +35,7 @@ public interface JedisCommands { String set(String key, String value, SetParams params); String get(String key); - + String getDel(String key); Boolean exists(String key); @@ -65,7 +66,7 @@ default String restoreReplace(String key, int ttl, byte[] serializedValue) { String restoreReplace(String key, long ttl, byte[] serializedValue); - + /** * @deprecated Use {@link #expire(java.lang.String, long)}. */ @@ -388,7 +389,7 @@ List georadiusByMemberReadonly(String key, String member, dou * Executes BITFIELD Redis command * @param key * @param arguments - * @return + * @return */ List bitfield(String key, String...arguments); @@ -396,7 +397,7 @@ List georadiusByMemberReadonly(String key, String member, dou /** * Used for HSTRLEN Redis command - * @param key + * @param key * @param field * @return length of the value for key */ @@ -404,7 +405,7 @@ List georadiusByMemberReadonly(String key, String member, dou /** * XADD key ID field string [field string ...] - * + * * @param key * @param id * @param hash @@ -414,7 +415,7 @@ List georadiusByMemberReadonly(String key, String member, dou /** * XADD key MAXLEN ~ LEN ID field string [field string ...] - * + * * @param key * @param id * @param hash @@ -423,10 +424,10 @@ List georadiusByMemberReadonly(String key, String member, dou * @return */ StreamEntryID xadd(String key, StreamEntryID id, Map hash, long maxLen, boolean approximateLength); - + /** * XLEN key - * + * * @param key * @return */ @@ -434,39 +435,39 @@ List georadiusByMemberReadonly(String key, String member, dou /** * XRANGE key start end [COUNT count] - * + * * @param key - * @param start minimum {@link StreamEntryID} for the retrieved range, passing null will indicate minimum ID possible in the stream + * @param start minimum {@link StreamEntryID} for the retrieved range, passing null will indicate minimum ID possible in the stream * @param end maximum {@link StreamEntryID} for the retrieved range, passing null will indicate maximum ID possible in the stream - * @param count maximum number of entries returned - * @return The entries with IDs matching the specified range. + * @param count maximum number of entries returned + * @return The entries with IDs matching the specified range. */ List xrange(String key, StreamEntryID start, StreamEntryID end, int count); /** * XREVRANGE key end start [COUNT ] - * + * * @param key - * @param start minimum {@link StreamEntryID} for the retrieved range, passing null will indicate minimum ID possible in the stream + * @param start minimum {@link StreamEntryID} for the retrieved range, passing null will indicate minimum ID possible in the stream * @param end maximum {@link StreamEntryID} for the retrieved range, passing null will indicate maximum ID possible in the stream - * @param count The entries with IDs matching the specified range. + * @param count The entries with IDs matching the specified range. * @return the entries with IDs matching the specified range, from the higher ID to the lower ID matching. */ List xrevrange(String key, StreamEntryID end, StreamEntryID start, int count); - + /** * XACK key group ID [ID ...] - * + * * @param key * @param group * @param ids * @return */ long xack(String key, String group, StreamEntryID... ids); - + /** * XGROUP CREATE - * + * * @param key * @param groupname * @param id @@ -474,28 +475,28 @@ List georadiusByMemberReadonly(String key, String member, dou * @return */ String xgroupCreate( String key, String groupname, StreamEntryID id, boolean makeStream); - + /** * XGROUP SETID - * + * * @param key * @param groupname * @param id * @return */ String xgroupSetID( String key, String groupname, StreamEntryID id); - + /** * XGROUP DESTROY - * + * * @param key * @param groupname * @return */ long xgroupDestroy( String key, String groupname); - + /** - * XGROUP DELCONSUMER + * XGROUP DELCONSUMER * @param key * @param groupname * @param consumername @@ -505,7 +506,7 @@ List georadiusByMemberReadonly(String key, String member, dou /** * XPENDING key group [start end count] [consumer] - * + * * @param key * @param groupname * @param start @@ -515,7 +516,16 @@ List georadiusByMemberReadonly(String key, String member, dou * @return */ List xpending(String key, String groupname, StreamEntryID start, StreamEntryID end, int count, String consumername); - + + /** + * XPENDING key group + * + * @param key + * @param groupname + * @return + */ + StreamPendingSummary xpendingSummary(String key, String groupname); + /** * XDEL key ID [ID ...] * @param key @@ -523,7 +533,7 @@ List georadiusByMemberReadonly(String key, String member, dou * @return */ long xdel( String key, StreamEntryID... ids); - + /** * XTRIM key MAXLEN [~] count * @param key @@ -532,13 +542,13 @@ List georadiusByMemberReadonly(String key, String member, dou * @return */ long xtrim( String key, long maxLen, boolean approximate); - + /** * XCLAIM * [IDLE ] [TIME ] [RETRYCOUNT ] * [FORCE] [JUSTID] - */ - List xclaim( String key, String group, String consumername, long minIdleTime, + */ + List xclaim( String key, String group, String consumername, long minIdleTime, long newIdleTime, int retries, boolean force, StreamEntryID... ids); /** diff --git a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java index d206c2d38c..037d8004af 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java @@ -37,7 +37,7 @@ public class StreamsCommandsTest extends JedisCommandTestBase { @Test public void xadd() { - + try { Map map1 = new HashMap<>(); jedis.xadd("stream1", null, map1); @@ -45,11 +45,11 @@ public void xadd() { } catch (JedisDataException expected) { assertEquals("ERR wrong number of arguments for 'xadd' command", expected.getMessage()); } - + Map map1 = new HashMap<>(); map1.put("f1", "v1"); StreamEntryID id1 = jedis.xadd("xadd-stream1", null, map1); - assertNotNull(id1); + assertNotNull(id1); Map map2 = new HashMap<>(); map2.put("f1", "v1"); @@ -69,13 +69,13 @@ public void xadd() { StreamEntryID id4 = jedis.xadd("xadd-stream2", idIn, map4); assertEquals(idIn, id4); assertTrue(id4.compareTo(id3) > 0); - + Map map5 = new HashMap<>(); map5.put("f4", "v4"); map5.put("f5", "v5"); StreamEntryID id5 = jedis.xadd("xadd-stream2", null, map5); - assertTrue(id5.compareTo(id4) > 0); - + assertTrue(id5.compareTo(id4) > 0); + Map map6 = new HashMap<>(); map6.put("f4", "v4"); map6.put("f5", "v5"); @@ -83,15 +83,15 @@ public void xadd() { assertTrue(id6.compareTo(id5) > 0); assertEquals(3L, jedis.xlen("xadd-stream2").longValue()); } - + @Test public void xdel() { Map map1 = new HashMap<>(); map1.put("f1", "v1"); - + StreamEntryID id1 = jedis.xadd("xdel-stream", null, map1); - assertNotNull(id1); - + assertNotNull(id1); + StreamEntryID id2 = jedis.xadd("xdel-stream", null, map1); assertNotNull(id2); assertEquals(2L, jedis.xlen("xdel-stream").longValue()); @@ -104,76 +104,76 @@ public void xdel() { @Test public void xlen() { assertEquals(0L, jedis.xlen("xlen-stream").longValue()); - + Map map = new HashMap<>(); map.put("f1", "v1"); jedis.xadd("xlen-stream", null, map); assertEquals(1L, jedis.xlen("xlen-stream").longValue()); - + jedis.xadd("xlen-stream", null, map); assertEquals(2L, jedis.xlen("xlen-stream").longValue()); } @Test public void xrange() { - List range = jedis.xrange("xrange-stream", (StreamEntryID)null, (StreamEntryID)null, Integer.MAX_VALUE); + List range = jedis.xrange("xrange-stream", (StreamEntryID)null, (StreamEntryID)null, Integer.MAX_VALUE); assertEquals(0, range.size()); - + Map map = new HashMap<>(); map.put("f1", "v1"); StreamEntryID id1 = jedis.xadd("xrange-stream", null, map); StreamEntryID id2 = jedis.xadd("xrange-stream", null, map); - List range2 = jedis.xrange("xrange-stream", (StreamEntryID)null, (StreamEntryID)null, 3); + List range2 = jedis.xrange("xrange-stream", (StreamEntryID)null, (StreamEntryID)null, 3); assertEquals(2, range2.size()); - - List range3 = jedis.xrange("xrange-stream", id1, null, 2); + + List range3 = jedis.xrange("xrange-stream", id1, null, 2); assertEquals(2, range3.size()); - - List range4 = jedis.xrange("xrange-stream", id1, id2, 2); + + List range4 = jedis.xrange("xrange-stream", id1, id2, 2); assertEquals(2, range4.size()); - List range5 = jedis.xrange("xrange-stream", id1, id2, 1); + List range5 = jedis.xrange("xrange-stream", id1, id2, 1); assertEquals(1, range5.size()); - - List range6 = jedis.xrange("xrange-stream", id2, null, 4); + + List range6 = jedis.xrange("xrange-stream", id2, null, 4); assertEquals(1, range6.size()); - + StreamEntryID id3 = jedis.xadd("xrange-stream", null, map); - List range7 = jedis.xrange("xrange-stream", id2, id2, 4); + List range7 = jedis.xrange("xrange-stream", id2, id2, 4); assertEquals(1, range7.size()); } - + @Test public void xread() { - + Entry streamQeury1 = new AbstractMap.SimpleImmutableEntry<>("xread-stream1", new StreamEntryID()); // Empty Stream - List>> range = jedis.xread(1, 1L, streamQeury1); + List>> range = jedis.xread(1, 1L, streamQeury1); assertEquals(0, range.size()); - + Map map = new HashMap<>(); map.put("f1", "v1"); StreamEntryID id1 = jedis.xadd("xread-stream1", null, map); StreamEntryID id2 = jedis.xadd("xread-stream2", null, map); - + // Read only a single Stream - List>> streams1 = jedis.xread(1, 1L, streamQeury1); + List>> streams1 = jedis.xread(1, 1L, streamQeury1); assertEquals(1, streams1.size()); // Read from two Streams Entry streamQuery2 = new AbstractMap.SimpleImmutableEntry<>("xread-stream1", new StreamEntryID()); Entry streamQuery3 = new AbstractMap.SimpleImmutableEntry<>("xread-stream2", new StreamEntryID()); - List>> streams2 = jedis.xread(2, 1L, streamQuery2, streamQuery3); + List>> streams2 = jedis.xread(2, 1L, streamQuery2, streamQuery3); assertEquals(2, streams2.size()); } - + @Test public void xtrim() { Map map1 = new HashMap(); map1.put("f1", "v1"); - + jedis.xadd("xtrim-stream", null, map1); jedis.xadd("xtrim-stream", null, map1); jedis.xadd("xtrim-stream", null, map1); @@ -184,44 +184,44 @@ public void xtrim() { jedis.xtrim("xtrim-stream", 3, false); assertEquals(3L, jedis.xlen("xtrim-stream").longValue()); } - + @Test public void xrevrange() { - List range = jedis.xrevrange("xrevrange-stream", (StreamEntryID)null, (StreamEntryID)null, Integer.MAX_VALUE); + List range = jedis.xrevrange("xrevrange-stream", (StreamEntryID)null, (StreamEntryID)null, Integer.MAX_VALUE); assertEquals(0, range.size()); - + Map map = new HashMap<>(); map.put("f1", "v1"); StreamEntryID id1 = jedis.xadd("xrevrange-stream", null, map); StreamEntryID id2 = jedis.xadd("xrevrange-stream", null, map); - List range2 = jedis.xrange("xrevrange-stream", (StreamEntryID)null, (StreamEntryID)null, 3); + List range2 = jedis.xrange("xrevrange-stream", (StreamEntryID)null, (StreamEntryID)null, 3); assertEquals(2, range2.size()); - - List range3 = jedis.xrevrange("xrevrange-stream", null, id1, 2); + + List range3 = jedis.xrevrange("xrevrange-stream", null, id1, 2); assertEquals(2, range3.size()); - - List range4 = jedis.xrevrange("xrevrange-stream", id2, id1, 2); + + List range4 = jedis.xrevrange("xrevrange-stream", id2, id1, 2); assertEquals(2, range4.size()); - List range5 = jedis.xrevrange("xrevrange-stream", id2, id1, 1); + List range5 = jedis.xrevrange("xrevrange-stream", id2, id1, 1); assertEquals(1, range5.size()); - - List range6 = jedis.xrevrange("xrevrange-stream", null, id2, 4); + + List range6 = jedis.xrevrange("xrevrange-stream", null, id2, 4); assertEquals(1, range6.size()); - + StreamEntryID id3 = jedis.xadd("xrevrange-stream", null, map); - List range7 = jedis.xrevrange("xrevrange-stream", id2, id2, 4); + List range7 = jedis.xrevrange("xrevrange-stream", id2, id2, 4); assertEquals(1, range7.size()); } - + @Test public void xgroup() { - + Map map = new HashMap(); map.put("f1", "v1"); StreamEntryID id1 = jedis.xadd("xgroup-stream", null, map); - + String status = jedis.xgroupCreate("xgroup-stream", "consumer-group-name", null, false); assertTrue(Keyword.OK.name().equalsIgnoreCase(status)); @@ -231,112 +231,118 @@ public void xgroup() { status = jedis.xgroupCreate("xgroup-stream", "consumer-group-name1", StreamEntryID.LAST_ENTRY, false); assertTrue(Keyword.OK.name().equalsIgnoreCase(status)); - + jedis.xgroupDestroy("xgroup-stream", "consumer-group-name"); Long pendingMessageNum = jedis.xgroupDelConsumer("xgroup-stream", "consumer-group-name1", "myconsumer1"); - assertEquals(0L, pendingMessageNum.longValue()); + assertEquals(0L, pendingMessageNum.longValue()); } - + @Test public void xreadGroup() { - + // Simple xreadGroup with NOACK Map map = new HashMap<>(); map.put("f1", "v1"); StreamEntryID id1 = jedis.xadd("xreadGroup-stream1", null, map); String status1 = jedis.xgroupCreate("xreadGroup-stream1", "xreadGroup-group", null, false); Entry streamQeury1 = new AbstractMap.SimpleImmutableEntry<>("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY); - List>> range = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", 1, 0, true, streamQeury1); + List>> range = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", 1, 0, true, streamQeury1); assertEquals(1, range.size()); assertEquals(1, range.get(0).getValue().size()); - + StreamEntryID id2 = jedis.xadd("xreadGroup-stream1", null, map); StreamEntryID id3 = jedis.xadd("xreadGroup-stream2", null, map); String status2 = jedis.xgroupCreate("xreadGroup-stream2", "xreadGroup-group", null, false); - + // Read only a single Stream Entry streamQeury11 = new AbstractMap.SimpleImmutableEntry<>("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY); - List>> streams1 = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", 1, 1L, true, streamQeury11); + List>> streams1 = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", 1, 1L, true, streamQeury11); assertEquals(1, streams1.size()); assertEquals(1, streams1.get(0).getValue().size()); // Read from two Streams Entry streamQuery2 = new AbstractMap.SimpleImmutableEntry("xreadGroup-stream1", new StreamEntryID()); Entry streamQuery3 = new AbstractMap.SimpleImmutableEntry("xreadGroup-stream2", new StreamEntryID()); - List>> streams2 = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", 1, 1L, true, streamQuery2, streamQuery3); + List>> streams2 = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", 1, 1L, true, streamQuery2, streamQuery3); assertEquals(2, streams2.size()); // Read only fresh messages StreamEntryID id4 = jedis.xadd("xreadGroup-stream1", null, map); Entry streamQeuryFresh = new AbstractMap.SimpleImmutableEntry("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY); - List>> streams3 = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", 4, 100L, true, streamQeuryFresh); - assertEquals(1, streams3.size()); + List>> streams3 = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", 4, 100L, true, streamQeuryFresh); + assertEquals(1, streams3.size()); assertEquals(id4, streams3.get(0).getValue().get(0).getID()); } - - + + @Test public void xack() { - + Map map = new HashMap(); map.put("f1", "v1"); StreamEntryID id1 = jedis.xadd("xack-stream", null, map); - + String status = jedis.xgroupCreate("xack-stream", "xack-group", null, false); - + Entry streamQeury1 = new AbstractMap.SimpleImmutableEntry<>("xack-stream", StreamEntryID.UNRECEIVED_ENTRY); // Empty Stream - List>> range = jedis.xreadGroup("xack-group", "xack-consumer", 1, 1L, false, streamQeury1); + List>> range = jedis.xreadGroup("xack-group", "xack-consumer", 1, 1L, false, streamQeury1); assertEquals(1, range.size()); assertEquals(1L, jedis.xack("xack-stream", "xack-group", range.get(0).getValue().get(0).getID())); } - + @Test - public void xpendeing() { + public void xpendeing() { Map map = new HashMap(); map.put("f1", "v1"); StreamEntryID id1 = jedis.xadd("xpendeing-stream", null, map); - + assertEquals("OK", jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false)); - - + + Entry streamQeury1 = new AbstractMap.SimpleImmutableEntry<>("xpendeing-stream", StreamEntryID.UNRECEIVED_ENTRY); - // Read the event from Stream put it on pending - List>> range = jedis.xreadGroup("xpendeing-group", "xpendeing-consumer", 1, 1L, false, streamQeury1); + // Read the event from Stream put it on pending + List>> range = jedis.xreadGroup("xpendeing-group", "xpendeing-consumer", 1, 1L, false, streamQeury1); assertEquals(1, range.size()); assertEquals(1, range.get(0).getValue().size()); assertEquals(map, range.get(0).getValue().get(0).getFields()); - + + // Get the summary about the pending messages + StreamPendingSummary pendingSummary = jedis.xpendingSummary("xpendeing-stream", "xpendeing-group"); + assertEquals(1, pendingSummary.getTotal()); + assertEquals(id1, pendingSummary.getMinId()); + assertEquals(1l, pendingSummary.getConsumerMessageCount().get("xpendeing-consumer").longValue()); + // Get the pending event List pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group", null, null, 3, "xpendeing-consumer"); assertEquals(1, pendingRange.size()); assertEquals(id1, pendingRange.get(0).getID()); assertEquals(1, pendingRange.get(0).getDeliveredTimes()); assertEquals("xpendeing-consumer", pendingRange.get(0).getConsumerName()); - + // Sleep for 1000ms so we can claim events pending for more than 500ms try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } - + List claimRange = jedis.xclaim("xpendeing-stream", "xpendeing-group", "xpendeing-consumer2", 500, 0, 0, false, id1); assertEquals(1, claimRange.size()); - // Deleted events should return as null on XClaim - assertEquals(1, jedis.xdel("xpendeing-stream", id1)); + // Deleted events should return as null on XClaim + assertEquals(1, jedis.xdel("xpendeing-stream", id1)); List claimRangeDel = jedis.xclaim("xpendeing-stream", "xpendeing-group", "xpendeing-consumer2", 0, 0, 0, false, id1); assertEquals(1, claimRangeDel.size()); assertNull(claimRangeDel.get(0)); - + Long pendingMessageNum = jedis.xgroupDelConsumer("xpendeing-stream", "xpendeing-group", "xpendeing-consumer2"); - assertEquals(1L, pendingMessageNum.longValue()); + assertEquals(1L, pendingMessageNum.longValue()); } @Test