diff --git a/src/main/java/redis/clients/jedis/BinaryClient.java b/src/main/java/redis/clients/jedis/BinaryClient.java index b3131b6ea8..386cca44b4 100644 --- a/src/main/java/redis/clients/jedis/BinaryClient.java +++ b/src/main/java/redis/clients/jedis/BinaryClient.java @@ -1589,6 +1589,11 @@ public void xrevrange(final byte[] key, final byte[] end, final byte[] start, fi sendCommand(XREVRANGE, key, end, start, Keyword.COUNT.getRaw(), toByteArray(count)); } + /** + * @deprecated This method will be removed due to bug regarding {@code block} param. Use + * {@link #xread(redis.clients.jedis.params.XReadParams, java.util.Map.Entry...)}. + */ + @Deprecated public void xread(final int count, final long block, final Map streams) { final byte[][] params = new byte[3 + streams.size() * 2 + (block > 0 ? 2 : 0)][]; @@ -1611,6 +1616,24 @@ public void xread(final int count, final long block, final Map s sendCommand(XREAD, params); } + public void xread(final XReadParams params, final Entry... streams) { + final byte[][] bparams = params.getByteParams(); + final int paramLength = bparams.length; + + final byte[][] args = new byte[paramLength + 1 + streams.length * 2][]; + System.arraycopy(bparams, 0, args, 0, paramLength); + + args[paramLength] = Keyword.STREAMS.raw; + int keyIndex = paramLength + 1; + int idsIndex = keyIndex + streams.length; + for (final Entry entry : streams) { + args[keyIndex++] = entry.getKey(); + args[idsIndex++] = entry.getValue(); + } + + sendCommand(XREAD, args); + } + public void xack(final byte[] key, final byte[] group, final byte[]... ids) { final byte[][] params = new byte[2 + ids.length][]; int index = 0; @@ -1661,6 +1684,11 @@ public void xtrim(byte[] key, long maxLen, boolean approximateLength) { } } + /** + * @deprecated This method will be removed due to bug regarding {@code block} param. Use + * {@link #xreadGroup(byte..., byte..., redis.clients.jedis.params.XReadGroupParams, java.util.Map.Entry...)}. + */ + @Deprecated public void xreadGroup(byte[] groupname, byte[] consumer, int count, long block, boolean noAck, Map streams) { @@ -1703,6 +1731,30 @@ public void xreadGroup(byte[] groupname, byte[] consumer, int count, long block, sendCommand(XREADGROUP, params); } + public void xreadGroup(byte[] groupname, byte[] consumer, final XReadGroupParams params, + final Entry... streams) { + final byte[][] bparams = params.getByteParams(); + final int paramLength = bparams.length; + + final byte[][] args = new byte[3 + paramLength + 1 + streams.length * 2][]; + int index = 0; + args[index++] = Keyword.GROUP.raw; + args[index++] = groupname; + args[index++] = consumer; + System.arraycopy(bparams, 0, args, index, paramLength); + index += paramLength; + + args[index++] = Keyword.STREAMS.raw; + int keyIndex = index; + int idsIndex = keyIndex + streams.length; + for (final Entry entry : streams) { + args[keyIndex++] = entry.getKey(); + args[idsIndex++] = entry.getValue(); + } + + sendCommand(XREADGROUP, args); + } + public void xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername) { if (consumername == null) { diff --git a/src/main/java/redis/clients/jedis/BinaryJedis.java b/src/main/java/redis/clients/jedis/BinaryJedis.java index a7398586fb..00e9edfaad 100644 --- a/src/main/java/redis/clients/jedis/BinaryJedis.java +++ b/src/main/java/redis/clients/jedis/BinaryJedis.java @@ -14,6 +14,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import javax.net.ssl.HostnameVerifier; @@ -30,17 +31,7 @@ import redis.clients.jedis.exceptions.InvalidURIException; import redis.clients.jedis.exceptions.JedisDataException; import redis.clients.jedis.exceptions.JedisException; -import redis.clients.jedis.params.ClientKillParams; -import redis.clients.jedis.params.GeoAddParams; -import redis.clients.jedis.params.GeoRadiusParam; -import redis.clients.jedis.params.GeoRadiusStoreParam; -import redis.clients.jedis.params.GetExParams; -import redis.clients.jedis.params.MigrateParams; -import redis.clients.jedis.params.SetParams; -import redis.clients.jedis.params.XClaimParams; -import redis.clients.jedis.params.ZAddParams; -import redis.clients.jedis.params.ZIncrByParams; -import redis.clients.jedis.params.LPosParams; +import redis.clients.jedis.params.*; import redis.clients.jedis.util.JedisByteHashMap; import redis.clients.jedis.util.JedisURIHelper; @@ -4500,6 +4491,23 @@ public List xread(int count, long block, Map streams) { } } + @Override + public List xread(XReadParams xReadParams, Entry... streams) { + checkIsInMultiOrPipeline(); + client.xread(xReadParams, streams); + + if (!xReadParams.hasBlock()) { + return client.getBinaryMultiBulkReply(); + } + + client.setTimeoutInfinite(); + try { + return client.getBinaryMultiBulkReply(); + } finally { + client.rollbackTimeout(); + } + } + @Override public List xreadGroup(byte[] groupname, byte[] consumer, int count, long block, boolean noAck, Map streams) { @@ -4513,6 +4521,24 @@ public List xreadGroup(byte[] groupname, byte[] consumer, int count, lon } } + @Override + public List xreadGroup(byte[] groupname, byte[] consumer, + XReadGroupParams xReadGroupParams, Entry... streams) { + checkIsInMultiOrPipeline(); + client.xreadGroup(groupname, consumer, xReadGroupParams, streams); + + if (!xReadGroupParams.hasBlock()) { + return client.getBinaryMultiBulkReply(); + } + + client.setTimeoutInfinite(); + try { + return client.getBinaryMultiBulkReply(); + } finally { + client.rollbackTimeout(); + } + } + @Override public byte[] xadd(byte[] key, byte[] id, Map hash, long maxLen, boolean approximateLength) { diff --git a/src/main/java/redis/clients/jedis/BinaryJedisCluster.java b/src/main/java/redis/clients/jedis/BinaryJedisCluster.java index 6c01d2ef7c..d4eb5e4de4 100644 --- a/src/main/java/redis/clients/jedis/BinaryJedisCluster.java +++ b/src/main/java/redis/clients/jedis/BinaryJedisCluster.java @@ -4,15 +4,7 @@ import redis.clients.jedis.commands.JedisClusterBinaryScriptingCommands; import redis.clients.jedis.commands.MultiKeyBinaryJedisClusterCommands; import redis.clients.jedis.commands.ProtocolCommand; -import redis.clients.jedis.params.GeoAddParams; -import redis.clients.jedis.params.GeoRadiusParam; -import redis.clients.jedis.params.GeoRadiusStoreParam; -import redis.clients.jedis.params.GetExParams; -import redis.clients.jedis.params.SetParams; -import redis.clients.jedis.params.XClaimParams; -import redis.clients.jedis.params.ZAddParams; -import redis.clients.jedis.params.ZIncrByParams; -import redis.clients.jedis.params.LPosParams; +import redis.clients.jedis.params.*; import redis.clients.jedis.util.JedisClusterHashTagUtil; import redis.clients.jedis.util.KeyMergeUtil; import redis.clients.jedis.util.SafeEncoder; @@ -20,6 +12,7 @@ import java.io.Closeable; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLParameters; @@ -2427,6 +2420,16 @@ public List execute(Jedis connection) { }.runBinary(keys.length, keys); } + @Override + public List xread(final XReadParams xReadParams, final Entry... streams) { + return new JedisClusterCommand>(connectionHandler, maxAttempts) { + @Override + public List execute(Jedis connection) { + return connection.xread(xReadParams, streams); + } + }.runBinary(streams.length, getKeys(streams)); + } + @Override public Long xack(final byte[] key, final byte[] group, final byte[]... ids) { return new JedisClusterCommand(connectionHandler, maxAttempts) { @@ -2492,6 +2495,17 @@ public List execute(Jedis connection) { }.runBinary(keys.length, keys); } + @Override + public List xreadGroup(final byte[] groupname, final byte[] consumer, final XReadGroupParams xReadGroupParams, + final Entry... streams) { + return new JedisClusterCommand>(connectionHandler, maxAttempts) { + @Override + public List execute(Jedis connection) { + return connection.xreadGroup(groupname, consumer, xReadGroupParams, streams); + } + }.runBinary(streams.length, getKeys(streams)); + } + @Override public Long xdel(final byte[] key, final byte[]... ids) { return new JedisClusterCommand(connectionHandler, maxAttempts) { @@ -2596,4 +2610,12 @@ public Object execute(Jedis connection) { } }.runBinary(sampleKey); } + + private static byte[][] getKeys(final Entry... entries) { + byte[][] keys = new byte[entries.length][]; + for (int i = 0; i < entries.length; i++) { + keys[i] = entries[i].getKey(); + } + return keys; + } } diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index 8c6eb5ae95..5917d7ccf8 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -1,5 +1,6 @@ package redis.clients.jedis; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -703,10 +704,36 @@ public List build(Object data) { } return responses; } + }; + + public static final Builder STREAM_ENTRY = new Builder() { + @Override + @SuppressWarnings("unchecked") + public StreamEntry build(Object data) { + if (null == data) { + return null; + } + List objectList = (List) data; + + if (objectList.isEmpty()) { + return null; + } + + String entryIdString = SafeEncoder.encode((byte[]) objectList.get(0)); + StreamEntryID entryID = new StreamEntryID(entryIdString); + List hash = (List) objectList.get(1); + + Iterator hashIterator = hash.iterator(); + Map map = new HashMap<>(hash.size() / 2); + while (hashIterator.hasNext()) { + map.put(SafeEncoder.encode(hashIterator.next()), SafeEncoder.encode(hashIterator.next())); + } + return new StreamEntry(entryID, map); + } @Override public String toString() { - return "List"; + return "StreamEntry"; } }; @@ -750,34 +777,29 @@ public String toString() { } }; - public static final Builder STREAM_ENTRY = new Builder() { + public static final Builder>>> STREAM_READ_RESPONSE + = new Builder>>>() { @Override - @SuppressWarnings("unchecked") - public StreamEntry build(Object data) { - if (null == data) { + public List>> build(Object data) { + if (data == null) { return null; } - List objectList = (List) data; + List streams = (List) data; - if (objectList.isEmpty()) { - return null; + List>> result = new ArrayList<>(streams.size()); + for (Object streamObj : streams) { + List stream = (List) streamObj; + String streamId = SafeEncoder.encode((byte[]) stream.get(0)); + List streamEntries = BuilderFactory.STREAM_ENTRY_LIST.build(stream.get(1)); + result.add(new AbstractMap.SimpleEntry<>(streamId, streamEntries)); } - String entryIdString = SafeEncoder.encode((byte[]) objectList.get(0)); - StreamEntryID entryID = new StreamEntryID(entryIdString); - List hash = (List) objectList.get(1); - - Iterator hashIterator = hash.iterator(); - Map map = new HashMap<>(hash.size() / 2); - while (hashIterator.hasNext()) { - map.put(SafeEncoder.encode(hashIterator.next()), SafeEncoder.encode(hashIterator.next())); - } - return new StreamEntry(entryID, map); + return result; } @Override public String toString() { - return "StreamEntry"; + return "List>>"; } }; diff --git a/src/main/java/redis/clients/jedis/Client.java b/src/main/java/redis/clients/jedis/Client.java index 2409393561..2d6ef51474 100644 --- a/src/main/java/redis/clients/jedis/Client.java +++ b/src/main/java/redis/clients/jedis/Client.java @@ -14,16 +14,7 @@ import javax.net.ssl.SSLSocketFactory; import redis.clients.jedis.commands.Commands; -import redis.clients.jedis.params.GeoAddParams; -import redis.clients.jedis.params.GeoRadiusParam; -import redis.clients.jedis.params.GeoRadiusStoreParam; -import redis.clients.jedis.params.GetExParams; -import redis.clients.jedis.params.MigrateParams; -import redis.clients.jedis.params.SetParams; -import redis.clients.jedis.params.XClaimParams; -import redis.clients.jedis.params.ZAddParams; -import redis.clients.jedis.params.ZIncrByParams; -import redis.clients.jedis.params.LPosParams; +import redis.clients.jedis.params.*; import redis.clients.jedis.util.SafeEncoder; public class Client extends BinaryClient implements Commands { @@ -1384,6 +1375,25 @@ public void xread(final int count, final long block, xread(count, block, bhash); } + @Override + public void xread(final XReadParams params, final Map streams) { + final byte[][] bparams = params.getByteParams(); + final int paramLength = bparams.length; + + final byte[][] args = new byte[paramLength + 1 + streams.size() * 2][]; + System.arraycopy(bparams, 0, args, 0, paramLength); + + args[paramLength] = Protocol.Keyword.STREAMS.raw; + int keyIndex = paramLength + 1; + int idsIndex = keyIndex + streams.size(); + for (Entry entry : streams.entrySet()) { + args[keyIndex++] = SafeEncoder.encode(entry.getKey()); + args[idsIndex++] = SafeEncoder.encode(entry.getValue().toString()); + } + + sendCommand(Protocol.Command.XREAD, args); + } + @Override public void xack(final String key, final String group, final StreamEntryID... ids) { final byte[][] bids = new byte[ids.length][]; @@ -1442,6 +1452,30 @@ public void xreadGroup(String groupname, String consumer, int count, long block, xreadGroup(SafeEncoder.encode(groupname), SafeEncoder.encode(consumer), count, block, noAck, bhash); } + @Override + public void xreadGroup(String groupname, String consumer, XReadGroupParams params, Map streams) { + final byte[][] bparams = params.getByteParams(); + final int paramLength = bparams.length; + + final byte[][] args = new byte[3 + paramLength + 1 + streams.size() * 2][]; + int index = 0; + args[index++] = Protocol.Keyword.GROUP.raw; + args[index++] = SafeEncoder.encode(groupname); + args[index++] = SafeEncoder.encode(consumer); + System.arraycopy(bparams, 0, args, index, paramLength); + index += paramLength; + + args[index++] = Protocol.Keyword.STREAMS.raw; + int keyIndex = index; + int idsIndex = keyIndex + streams.size(); + for (Entry entry : streams.entrySet()) { + args[keyIndex++] = SafeEncoder.encode(entry.getKey()); + args[idsIndex++] = SafeEncoder.encode(entry.getValue().toString()); + } + + sendCommand(Protocol.Command.XREADGROUP, args); + } + @Override public void xpending(String key, String groupname, StreamEntryID start, StreamEntryID end, int count, String consumername) { diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index f78effee72..ba8134e2b3 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -13,26 +13,9 @@ import javax.net.ssl.SSLParameters; import javax.net.ssl.SSLSocketFactory; +import redis.clients.jedis.commands.*; +import redis.clients.jedis.params.*; import redis.clients.jedis.args.UnblockType; -import redis.clients.jedis.commands.AdvancedJedisCommands; -import redis.clients.jedis.commands.BasicCommands; -import redis.clients.jedis.commands.ClusterCommands; -import redis.clients.jedis.commands.JedisCommands; -import redis.clients.jedis.commands.ModuleCommands; -import redis.clients.jedis.commands.MultiKeyCommands; -import redis.clients.jedis.commands.ProtocolCommand; -import redis.clients.jedis.commands.ScriptingCommands; -import redis.clients.jedis.commands.SentinelCommands; -import redis.clients.jedis.params.GeoAddParams; -import redis.clients.jedis.params.GeoRadiusParam; -import redis.clients.jedis.params.GeoRadiusStoreParam; -import redis.clients.jedis.params.GetExParams; -import redis.clients.jedis.params.MigrateParams; -import redis.clients.jedis.params.SetParams; -import redis.clients.jedis.params.XClaimParams; -import redis.clients.jedis.params.ZAddParams; -import redis.clients.jedis.params.ZIncrByParams; -import redis.clients.jedis.params.LPosParams; import redis.clients.jedis.util.SafeEncoder; import redis.clients.jedis.util.Slowlog; @@ -4125,6 +4108,23 @@ public List>> xread(final int count, final long } } + @Override + public List>> xread(final XReadParams xReadParams, final Map streams) { + checkIsInMultiOrPipeline(); + client.xread(xReadParams, streams); + + if (!xReadParams.hasBlock()) { + return BuilderFactory.STREAM_READ_RESPONSE.build(client.getObjectMultiBulkReply()); + } + + client.setTimeoutInfinite(); + try { + return BuilderFactory.STREAM_READ_RESPONSE.build(client.getObjectMultiBulkReply()); + } finally { + client.rollbackTimeout(); + } + } + /** * {@inheritDoc} */ @@ -4208,6 +4208,25 @@ public List>> xreadGroup(final String groupname, } } + @Override + public List>> xreadGroup(final String groupname, + final String consumer, final XReadGroupParams xReadGroupParams, + final Map streams) { + checkIsInMultiOrPipeline(); + client.xreadGroup(groupname, consumer, xReadGroupParams, streams); + + if (!xReadGroupParams.hasBlock()) { + return BuilderFactory.STREAM_READ_RESPONSE.build(client.getObjectMultiBulkReply()); + } + + client.setTimeoutInfinite(); + try { + return BuilderFactory.STREAM_READ_RESPONSE.build(client.getObjectMultiBulkReply()); + } finally { + client.rollbackTimeout(); + } + } + @Override public List xpending(final String key, final String groupname, final StreamEntryID start, final StreamEntryID end, final int count, final String consumername) { diff --git a/src/main/java/redis/clients/jedis/JedisCluster.java b/src/main/java/redis/clients/jedis/JedisCluster.java index cae8e0e7a3..b280b9884d 100644 --- a/src/main/java/redis/clients/jedis/JedisCluster.java +++ b/src/main/java/redis/clients/jedis/JedisCluster.java @@ -1,18 +1,10 @@ package redis.clients.jedis; -import redis.clients.jedis.commands.ProtocolCommand; -import redis.clients.jedis.params.GeoAddParams; -import redis.clients.jedis.params.GeoRadiusParam; -import redis.clients.jedis.params.GeoRadiusStoreParam; -import redis.clients.jedis.params.GetExParams; -import redis.clients.jedis.params.SetParams; -import redis.clients.jedis.params.XClaimParams; -import redis.clients.jedis.params.ZAddParams; -import redis.clients.jedis.params.ZIncrByParams; -import redis.clients.jedis.params.LPosParams; import redis.clients.jedis.commands.JedisClusterCommands; import redis.clients.jedis.commands.JedisClusterScriptingCommands; import redis.clients.jedis.commands.MultiKeyJedisClusterCommands; +import redis.clients.jedis.commands.ProtocolCommand; +import redis.clients.jedis.params.*; import redis.clients.jedis.util.JedisClusterHashTagUtil; import redis.clients.jedis.util.KeyMergeUtil; @@ -2514,6 +2506,16 @@ public List>> execute(Jedis connection) { }.run(keys.length, keys); } + @Override + public List>> xread(final XReadParams xReadParams, final Map streams) { + return new JedisClusterCommand>>>(connectionHandler, maxAttempts) { + @Override + public List>> execute(Jedis connection) { + return connection.xread(xReadParams, streams); + } + }.run(streams.size(), getKeys(streams)); + } + @Override public Long xack(final String key, final String group, final StreamEntryID... ids) { return new JedisClusterCommand(connectionHandler, maxAttempts) { @@ -2584,6 +2586,18 @@ public List>> execute(Jedis connection) { }.run(keys.length, keys); } + @Override + public List>> xreadGroup(final String groupname, + final String consumer, final XReadGroupParams xReadGroupParams, + final Map streams) { + return new JedisClusterCommand>>>(connectionHandler, maxAttempts) { + @Override + public List>> execute(Jedis connection) { + return connection.xreadGroup(groupname, consumer, xReadGroupParams, streams); + } + }.run(streams.size(), getKeys(streams)); + } + @Override public List xpending(final String key, final String groupname, final StreamEntryID start, final StreamEntryID end, final int count, final String consumername) { @@ -2688,4 +2702,7 @@ public Object execute(Jedis connection) { }.run(sampleKey); } + private static String[] getKeys(final Map map) { + return map.keySet().toArray(new String[map.size()]); + } } diff --git a/src/main/java/redis/clients/jedis/commands/Commands.java b/src/main/java/redis/clients/jedis/commands/Commands.java index b50835da6e..a80beb1909 100644 --- a/src/main/java/redis/clients/jedis/commands/Commands.java +++ b/src/main/java/redis/clients/jedis/commands/Commands.java @@ -18,6 +18,8 @@ import redis.clients.jedis.params.ZAddParams; import redis.clients.jedis.params.ZIncrByParams; import redis.clients.jedis.params.LPosParams; +import redis.clients.jedis.params.XReadGroupParams; +import redis.clients.jedis.params.XReadParams; public interface Commands { @@ -443,8 +445,15 @@ default void restoreReplace(String key, int ttl, byte[] serializedValue) { void xrevrange(String key, StreamEntryID end, StreamEntryID start, int count); + /** + * @deprecated This method will be removed due to bug regarding {@code block} param. Use + * {@link #xread(redis.clients.jedis.params.XReadParams, java.util.Map)}. + */ + @Deprecated void xread(int count, long block, Entry... streams); + void xread(XReadParams params, Map streams); + void xack(String key, String group, StreamEntryID... ids); void xgroupCreate(String key, String consumer, StreamEntryID id, boolean makeStream); @@ -459,8 +468,15 @@ default void restoreReplace(String key, int ttl, byte[] serializedValue) { void xtrim(String key, long maxLen, boolean approximateLength); + /** + * @deprecated This method will be removed due to bug regarding {@code block} param. Use + * {@link #xreadGroup(java.lang.String, java.lang.String, redis.clients.jedis.params.XReadGroupParams, java.util.Map)}. + */ + @Deprecated void xreadGroup(String groupname, String consumer, int count, long block, boolean noAck, Entry... streams); + void xreadGroup(String groupname, String consumer, XReadGroupParams params, Map streams); + void xpending(String key, String groupname, StreamEntryID start, StreamEntryID end, int count, String consumername); void xpendingSummary(String key, String groupname); diff --git a/src/main/java/redis/clients/jedis/commands/JedisClusterCommands.java b/src/main/java/redis/clients/jedis/commands/JedisClusterCommands.java index 55d11516af..fbbca9e6c8 100644 --- a/src/main/java/redis/clients/jedis/commands/JedisClusterCommands.java +++ b/src/main/java/redis/clients/jedis/commands/JedisClusterCommands.java @@ -517,7 +517,6 @@ List georadiusByMemberReadonly(String key, String member, dou @Deprecated List>> xreadGroup(String groupname, String consumer, int count, long block, boolean noAck, Map.Entry... streams); - /** * XPENDING key group [start end count] [consumer] * diff --git a/src/main/java/redis/clients/jedis/commands/MultiKeyBinaryCommands.java b/src/main/java/redis/clients/jedis/commands/MultiKeyBinaryCommands.java index a041fe8116..e0bb0e18a0 100644 --- a/src/main/java/redis/clients/jedis/commands/MultiKeyBinaryCommands.java +++ b/src/main/java/redis/clients/jedis/commands/MultiKeyBinaryCommands.java @@ -8,9 +8,12 @@ import redis.clients.jedis.ZParams; import redis.clients.jedis.params.GeoRadiusParam; import redis.clients.jedis.params.GeoRadiusStoreParam; +import redis.clients.jedis.params.XReadGroupParams; +import redis.clients.jedis.params.XReadParams; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; public interface MultiKeyBinaryCommands { @@ -94,11 +97,26 @@ public interface MultiKeyBinaryCommands { Long touch(byte[]... keys); + /** + * @deprecated This method will be removed due to bug regarding {@code block} param. Use + * {@link #xread(redis.clients.jedis.params.XReadParams, java.util.Map.Entry...)}. + */ + @Deprecated List xread(int count, long block, Map streams); + List xread(XReadParams xReadParams, Entry... streams); + + /** + * @deprecated This method will be removed due to bug regarding {@code block} param. Use + * {@link #xreadGroup(byte..., byte..., redis.clients.jedis.params.XReadGroupParams, java.util.Map.Entry...)}. + */ + @Deprecated List xreadGroup(byte[] groupname, byte[] consumer, int count, long block, boolean noAck, Map streams); + List xreadGroup(byte[] groupname, byte[] consumer, XReadGroupParams xReadGroupParams, + Entry... streams); + Long georadiusStore(byte[] key, double longitude, double latitude, double radius, GeoUnit unit, GeoRadiusParam param, GeoRadiusStoreParam storeParam); diff --git a/src/main/java/redis/clients/jedis/commands/MultiKeyBinaryJedisClusterCommands.java b/src/main/java/redis/clients/jedis/commands/MultiKeyBinaryJedisClusterCommands.java index 6b8adf8b3b..a01ea228ca 100644 --- a/src/main/java/redis/clients/jedis/commands/MultiKeyBinaryJedisClusterCommands.java +++ b/src/main/java/redis/clients/jedis/commands/MultiKeyBinaryJedisClusterCommands.java @@ -10,9 +10,12 @@ import redis.clients.jedis.ZParams; import redis.clients.jedis.params.GeoRadiusParam; import redis.clients.jedis.params.GeoRadiusStoreParam; +import redis.clients.jedis.params.XReadGroupParams; +import redis.clients.jedis.params.XReadParams; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; public interface MultiKeyBinaryJedisClusterCommands { @@ -88,11 +91,26 @@ public interface MultiKeyBinaryJedisClusterCommands { Set keys(byte[] pattern); + /** + * @deprecated This method will be removed due to bug regarding {@code block} param. Use + * {@link #xread(redis.clients.jedis.params.XReadParams, java.util.Map.Entry...)}. + */ + @Deprecated List xread(int count, long block, Map streams); + List xread(XReadParams xReadParams, Entry... streams); + + /** + * @deprecated This method will be removed due to bug regarding {@code block} param. Use + * {@link #xreadGroup(byte..., byte..., redis.clients.jedis.params.XReadGroupParams, java.util.Map.Entry...)}. + */ + @Deprecated List xreadGroup(byte[] groupname, byte[] consumer, int count, long block, boolean noAck, Map streams); + List xreadGroup(byte[] groupname, byte[] consumer, XReadGroupParams xReadGroupParams, + Entry... streams); + Long georadiusStore(byte[] key, double longitude, double latitude, double radius, GeoUnit unit, GeoRadiusParam param, GeoRadiusStoreParam storeParam); diff --git a/src/main/java/redis/clients/jedis/commands/MultiKeyCommands.java b/src/main/java/redis/clients/jedis/commands/MultiKeyCommands.java index 1dc7a09c68..aabf2d3612 100644 --- a/src/main/java/redis/clients/jedis/commands/MultiKeyCommands.java +++ b/src/main/java/redis/clients/jedis/commands/MultiKeyCommands.java @@ -12,6 +12,8 @@ import redis.clients.jedis.ZParams; import redis.clients.jedis.params.GeoRadiusParam; import redis.clients.jedis.params.GeoRadiusStoreParam; +import redis.clients.jedis.params.XReadGroupParams; +import redis.clients.jedis.params.XReadParams; import java.util.List; import java.util.Map; @@ -188,10 +190,16 @@ public interface MultiKeyCommands { * @param block * @param streams * @return + * @deprecated This method will be removed due to bug regarding {@code block} param. Use + * {@link #xread(redis.clients.jedis.params.XReadParams, java.util.Map)}. */ + @Deprecated List>> xread(int count, long block, Map.Entry... streams); + List>> xread(XReadParams xReadParams, + Map streams); + /** * XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] * @@ -202,10 +210,16 @@ List>> xread(int count, long block, * @param noAck * @param streams * @return + * @deprecated This method will be removed due to bug regarding {@code block} param. Use + * {@link #xreadGroup(java.lang.String, java.lang.String, redis.clients.jedis.params.XReadGroupParams, java.util.Map)}. */ + @Deprecated List>> xreadGroup(String groupname, String consumer, int count, long block, boolean noAck, Map.Entry... streams); + List>> xreadGroup(String groupname, String consumer, + XReadGroupParams xReadGroupParams, Map streams); + Long georadiusStore(String key, double longitude, double latitude, double radius, GeoUnit unit, GeoRadiusParam param, GeoRadiusStoreParam storeParam); diff --git a/src/main/java/redis/clients/jedis/commands/MultiKeyJedisClusterCommands.java b/src/main/java/redis/clients/jedis/commands/MultiKeyJedisClusterCommands.java index 0454ce26fa..7933a9ca50 100644 --- a/src/main/java/redis/clients/jedis/commands/MultiKeyJedisClusterCommands.java +++ b/src/main/java/redis/clients/jedis/commands/MultiKeyJedisClusterCommands.java @@ -12,6 +12,8 @@ import redis.clients.jedis.ZParams; import redis.clients.jedis.params.GeoRadiusParam; import redis.clients.jedis.params.GeoRadiusStoreParam; +import redis.clients.jedis.params.XReadGroupParams; +import redis.clients.jedis.params.XReadParams; import java.util.List; import java.util.Map; @@ -104,10 +106,16 @@ Long georadiusByMemberStore(String key, String member, double radius, GeoUnit un * @param block * @param streams * @return + * @deprecated This method will be removed due to bug regarding {@code block} param. Use + * {@link #xread(redis.clients.jedis.params.XReadParams, java.util.Map)}. */ + @Deprecated List>> xread(int count, long block, Map.Entry... streams); + List>> xread(XReadParams xReadParams, + Map streams); + /** * XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] * @@ -118,8 +126,14 @@ List>> xread(int count, long block, * @param noAck * @param streams * @return + * @deprecated This method will be removed due to bug regarding {@code block} param. Use + * {@link #xreadGroup(java.lang.String, java.lang.String, redis.clients.jedis.params.XReadGroupParams, java.util.Map)}. */ + @Deprecated List>> xreadGroup(String groupname, String consumer, int count, long block, boolean noAck, Map.Entry... streams); + List>> xreadGroup(String groupname, String consumer, + XReadGroupParams xReadGroupParams, Map streams); + } diff --git a/src/main/java/redis/clients/jedis/params/XClaimParams.java b/src/main/java/redis/clients/jedis/params/XClaimParams.java index f7f58ecd96..69f973d0b4 100644 --- a/src/main/java/redis/clients/jedis/params/XClaimParams.java +++ b/src/main/java/redis/clients/jedis/params/XClaimParams.java @@ -10,7 +10,7 @@ public class XClaimParams extends Params { public XClaimParams() { } - public static XClaimParams xclaimParams() { + public static XClaimParams xClaimParams() { return new XClaimParams(); } diff --git a/src/main/java/redis/clients/jedis/params/XReadGroupParams.java b/src/main/java/redis/clients/jedis/params/XReadGroupParams.java new file mode 100644 index 0000000000..b7124b0276 --- /dev/null +++ b/src/main/java/redis/clients/jedis/params/XReadGroupParams.java @@ -0,0 +1,31 @@ +package redis.clients.jedis.params; + +public class XReadGroupParams extends Params { + + private static final String COUNT = "COUNT"; + private static final String BLOCK = "BLOCK"; + private static final String NOACK = "NOACK"; + + public static XReadGroupParams xReadGroupParams() { + return new XReadGroupParams(); + } + + public XReadGroupParams count(int count) { + addParam(COUNT, count); + return this; + } + + public XReadGroupParams block(int block) { + addParam(BLOCK, block); + return this; + } + + public XReadGroupParams noAck() { + addParam(NOACK); + return this; + } + + public boolean hasBlock() { + return super.contains(BLOCK); + } +} diff --git a/src/main/java/redis/clients/jedis/params/XReadParams.java b/src/main/java/redis/clients/jedis/params/XReadParams.java new file mode 100644 index 0000000000..4a55940107 --- /dev/null +++ b/src/main/java/redis/clients/jedis/params/XReadParams.java @@ -0,0 +1,25 @@ +package redis.clients.jedis.params; + +public class XReadParams extends Params { + + private static final String COUNT = "COUNT"; + private static final String BLOCK = "BLOCK"; + + public static XReadParams xReadParams() { + return new XReadParams(); + } + + public XReadParams count(int count) { + addParam(COUNT, count); + return this; + } + + public XReadParams block(int block) { + addParam(BLOCK, block); + return this; + } + + public boolean hasBlock() { + return super.contains(BLOCK); + } +} diff --git a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java index 48a8169f1d..a0cc3e381b 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java @@ -5,24 +5,18 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static redis.clients.jedis.StreamGroupInfo.CONSUMERS; -import static redis.clients.jedis.StreamGroupInfo.LAST_DELIVERED; -import static redis.clients.jedis.StreamGroupInfo.NAME; -import static redis.clients.jedis.StreamGroupInfo.PENDING; -import static redis.clients.jedis.StreamInfo.FIRST_ENTRY; -import static redis.clients.jedis.StreamInfo.GROUPS; -import static redis.clients.jedis.StreamInfo.LAST_ENTRY; -import static redis.clients.jedis.StreamInfo.LAST_GENERATED_ID; -import static redis.clients.jedis.StreamInfo.LENGTH; -import static redis.clients.jedis.StreamInfo.RADIX_TREE_KEYS; -import static redis.clients.jedis.StreamInfo.RADIX_TREE_NODES; +import static redis.clients.jedis.StreamGroupInfo.*; +import static redis.clients.jedis.StreamInfo.*; import static redis.clients.jedis.StreamConsumersInfo.IDLE; import java.util.AbstractMap; +import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import redis.clients.jedis.*; @@ -30,6 +24,8 @@ import redis.clients.jedis.exceptions.JedisDataException; import redis.clients.jedis.exceptions.JedisException; import redis.clients.jedis.params.XClaimParams; +import redis.clients.jedis.params.XReadGroupParams; +import redis.clients.jedis.params.XReadParams; import redis.clients.jedis.util.SafeEncoder; public class StreamsCommandsTest extends JedisCommandTestBase { @@ -178,6 +174,63 @@ public void xread() { } + @Test + public void xreadWithParams() { + + Map streamQeury1 = Collections.singletonMap("xread-stream1", new StreamEntryID()); + + // Before creating Stream + assertNull(jedis.xread(XReadParams.xReadParams().block(1), streamQeury1)); + assertNull(jedis.xread(XReadParams.xReadParams(), streamQeury1)); + + Map map = new HashMap<>(); + map.put("f1", "v1"); + StreamEntryID id1 = jedis.xadd("xread-stream1", null, map); + StreamEntryID id2 = jedis.xadd("xread-stream2", null, map); + + // Read only a single Stream + List>> streams1 = jedis.xread(XReadParams.xReadParams().count(1).block(1), streamQeury1); + assertEquals(1, streams1.size()); + assertEquals("xread-stream1", streams1.get(0).getKey()); + assertEquals(1, streams1.get(0).getValue().size()); + assertEquals(id1, streams1.get(0).getValue().get(0).getID()); + assertEquals(map, streams1.get(0).getValue().get(0).getFields()); + + assertNull(jedis.xread(XReadParams.xReadParams().block(1), Collections.singletonMap("xread-stream1", id1))); + assertNull(jedis.xread(XReadParams.xReadParams(), Collections.singletonMap("xread-stream1", id1))); + + // Read from two Streams + Map streamQuery23 = new LinkedHashMap<>(); + streamQuery23.put("xread-stream1", new StreamEntryID()); + streamQuery23.put("xread-stream2", new StreamEntryID()); + List>> streams2 = jedis.xread(XReadParams.xReadParams().count(2).block(1), streamQuery23); + assertEquals(2, streams2.size()); + } + + @Test + public void xreadBlockZero() throws InterruptedException { + final AtomicReference readId = new AtomicReference<>(); + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try (Jedis blockJedis = createJedis()) { + long startTime = System.currentTimeMillis(); + List>> read = blockJedis.xread(XReadParams.xReadParams().block(0), + Collections.singletonMap("block0-stream", new StreamEntryID())); + long endTime = System.currentTimeMillis(); + assertTrue(endTime - startTime > 500); + assertNotNull(read); + readId.set(read.get(0).getValue().get(0).getID()); + } + } + }, "xread-block-0-thread"); + t.start(); + Thread.sleep(1000); + StreamEntryID addedId = jedis.xadd("block0-stream", null, Collections.singletonMap("foo", "bar")); + t.join(); + assertEquals(addedId, readId.get()); + } + @Test public void xtrim() { Map map1 = new HashMap(); @@ -296,6 +349,48 @@ public void xreadGroup() { assertEquals(id4, streams3.get(0).getValue().get(0).getID()); } + @Test + public void xreadGroupWithParams() { + + // Simple xreadGroup with NOACK + Map map = new HashMap<>(); + map.put("f1", "v1"); + StreamEntryID id1 = jedis.xadd("xreadGroup-stream1", null, map); + jedis.xgroupCreate("xreadGroup-stream1", "xreadGroup-group", null, false); + Map streamQeury1 = Collections.singletonMap("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY); + List>> range = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", + XReadGroupParams.xReadGroupParams().count(1).noAck(), streamQeury1); + assertEquals(1, range.size()); + assertEquals(1, range.get(0).getValue().size()); + + StreamEntryID id2 = jedis.xadd("xreadGroup-stream1", null, map); + StreamEntryID id3 = jedis.xadd("xreadGroup-stream2", null, map); + jedis.xgroupCreate("xreadGroup-stream2", "xreadGroup-group", null, false); + + // Read only a single Stream + Map streamQeury11 = Collections.singletonMap("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY); + List>> streams1 = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", + XReadGroupParams.xReadGroupParams().count(1).block(1).noAck(), streamQeury11); + assertEquals(1, streams1.size()); + assertEquals(1, streams1.get(0).getValue().size()); + + // Read from two Streams + Map streamQuery23 = new LinkedHashMap<>(); + streamQuery23.put("xreadGroup-stream1", new StreamEntryID()); + streamQuery23.put("xreadGroup-stream2", new StreamEntryID()); + List>> streams2 = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", + XReadGroupParams.xReadGroupParams().count(1).block(1).noAck(), streamQuery23); + assertEquals(2, streams2.size()); + + // Read only fresh messages + StreamEntryID id4 = jedis.xadd("xreadGroup-stream1", null, map); + Map streamQeuryFresh = Collections.singletonMap("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY); + List>> streams3 = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", + XReadGroupParams.xReadGroupParams().count(4).block(100).noAck(), streamQeuryFresh); + assertEquals(1, streams3.size()); + assertEquals(id4, streams3.get(0).getValue().get(0).getID()); + } + @Test public void xack() { @@ -395,7 +490,7 @@ public void xclaimWithParams() { } List streamEntrys = jedis.xclaim("xpendeing-stream", "xpendeing-group", - "xpendeing-consumer2", 50, XClaimParams.xclaimParams().idle(0).retryCount(0), + "xpendeing-consumer2", 50, XClaimParams.xClaimParams().idle(0).retryCount(0), pendingRange.get(0).getID()); assertEquals(1, streamEntrys.size()); assertEquals(pendingRange.get(0).getID(), streamEntrys.get(0).getID()); @@ -425,7 +520,7 @@ public void xclaimJustId() { } List streamEntryIDS = jedis.xclaimJustId("xpendeing-stream", "xpendeing-group", - "xpendeing-consumer2", 50, XClaimParams.xclaimParams().idle(0).retryCount(0), + "xpendeing-consumer2", 50, XClaimParams.xClaimParams().idle(0).retryCount(0), pendingRange.get(0).getID()); assertEquals(1, streamEntryIDS.size()); assertEquals(pendingRange.get(0).getID(), streamEntryIDS.get(0));