Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1589,6 +1589,11 @@ public void xrevrange(final byte[] key, final byte[] end, final byte[] start, fi
sendCommand(XREVRANGE, key, end, start, Keyword.COUNT.getRaw(), toByteArray(count));
}

/**
* @deprecated This method will be removed due to bug regarding {@code block} param. Use
* {@link #xread(redis.clients.jedis.params.XReadParams, java.util.Map.Entry...)}.
*/
@Deprecated
public void xread(final int count, final long block, final Map<byte[], byte[]> streams) {
final byte[][] params = new byte[3 + streams.size() * 2 + (block > 0 ? 2 : 0)][];

Expand All @@ -1611,6 +1616,24 @@ public void xread(final int count, final long block, final Map<byte[], byte[]> s
sendCommand(XREAD, params);
}

public void xread(final XReadParams params, final Entry<byte[], byte[]>... streams) {
final byte[][] bparams = params.getByteParams();
final int paramLength = bparams.length;

final byte[][] args = new byte[paramLength + 1 + streams.length * 2][];
System.arraycopy(bparams, 0, args, 0, paramLength);

args[paramLength] = Keyword.STREAMS.raw;
int keyIndex = paramLength + 1;
int idsIndex = keyIndex + streams.length;
for (final Entry<byte[], byte[]> entry : streams) {
args[keyIndex++] = entry.getKey();
args[idsIndex++] = entry.getValue();
}

sendCommand(XREAD, args);
}

public void xack(final byte[] key, final byte[] group, final byte[]... ids) {
final byte[][] params = new byte[2 + ids.length][];
int index = 0;
Expand Down Expand Up @@ -1661,6 +1684,11 @@ public void xtrim(byte[] key, long maxLen, boolean approximateLength) {
}
}

/**
* @deprecated This method will be removed due to bug regarding {@code block} param. Use
* {@link #xreadGroup(byte..., byte..., redis.clients.jedis.params.XReadGroupParams, java.util.Map.Entry...)}.
*/
@Deprecated
public void xreadGroup(byte[] groupname, byte[] consumer, int count, long block, boolean noAck,
Map<byte[], byte[]> streams) {

Expand Down Expand Up @@ -1703,6 +1731,30 @@ public void xreadGroup(byte[] groupname, byte[] consumer, int count, long block,
sendCommand(XREADGROUP, params);
}

public void xreadGroup(byte[] groupname, byte[] consumer, final XReadGroupParams params,
final Entry<byte[], byte[]>... streams) {
final byte[][] bparams = params.getByteParams();
final int paramLength = bparams.length;

final byte[][] args = new byte[3 + paramLength + 1 + streams.length * 2][];
int index = 0;
args[index++] = Keyword.GROUP.raw;
args[index++] = groupname;
args[index++] = consumer;
System.arraycopy(bparams, 0, args, index, paramLength);
index += paramLength;

args[index++] = Keyword.STREAMS.raw;
int keyIndex = index;
int idsIndex = keyIndex + streams.length;
for (final Entry<byte[], byte[]> entry : streams) {
args[keyIndex++] = entry.getKey();
args[idsIndex++] = entry.getValue();
}

sendCommand(XREADGROUP, args);
}

public void xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count,
byte[] consumername) {
if (consumername == null) {
Expand Down
48 changes: 37 additions & 11 deletions src/main/java/redis/clients/jedis/BinaryJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import javax.net.ssl.HostnameVerifier;
Expand All @@ -30,17 +31,7 @@
import redis.clients.jedis.exceptions.InvalidURIException;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.params.ClientKillParams;
import redis.clients.jedis.params.GeoAddParams;
import redis.clients.jedis.params.GeoRadiusParam;
import redis.clients.jedis.params.GeoRadiusStoreParam;
import redis.clients.jedis.params.GetExParams;
import redis.clients.jedis.params.MigrateParams;
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
import redis.clients.jedis.params.LPosParams;
import redis.clients.jedis.params.*;
import redis.clients.jedis.util.JedisByteHashMap;
import redis.clients.jedis.util.JedisURIHelper;

Expand Down Expand Up @@ -4500,6 +4491,23 @@ public List<byte[]> xread(int count, long block, Map<byte[], byte[]> streams) {
}
}

@Override
public List<byte[]> xread(XReadParams xReadParams, Entry<byte[], byte[]>... streams) {
checkIsInMultiOrPipeline();
client.xread(xReadParams, streams);

if (!xReadParams.hasBlock()) {
return client.getBinaryMultiBulkReply();
}

client.setTimeoutInfinite();
try {
return client.getBinaryMultiBulkReply();
} finally {
client.rollbackTimeout();
}
}

@Override
public List<byte[]> xreadGroup(byte[] groupname, byte[] consumer, int count, long block,
boolean noAck, Map<byte[], byte[]> streams) {
Expand All @@ -4513,6 +4521,24 @@ public List<byte[]> xreadGroup(byte[] groupname, byte[] consumer, int count, lon
}
}

@Override
public List<byte[]> xreadGroup(byte[] groupname, byte[] consumer,
XReadGroupParams xReadGroupParams, Entry<byte[], byte[]>... streams) {
checkIsInMultiOrPipeline();
client.xreadGroup(groupname, consumer, xReadGroupParams, streams);

if (!xReadGroupParams.hasBlock()) {
return client.getBinaryMultiBulkReply();
}

client.setTimeoutInfinite();
try {
return client.getBinaryMultiBulkReply();
} finally {
client.rollbackTimeout();
}
}

@Override
public byte[] xadd(byte[] key, byte[] id, Map<byte[], byte[]> hash, long maxLen,
boolean approximateLength) {
Expand Down
40 changes: 31 additions & 9 deletions src/main/java/redis/clients/jedis/BinaryJedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,15 @@
import redis.clients.jedis.commands.JedisClusterBinaryScriptingCommands;
import redis.clients.jedis.commands.MultiKeyBinaryJedisClusterCommands;
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.params.GeoAddParams;
import redis.clients.jedis.params.GeoRadiusParam;
import redis.clients.jedis.params.GeoRadiusStoreParam;
import redis.clients.jedis.params.GetExParams;
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
import redis.clients.jedis.params.LPosParams;
import redis.clients.jedis.params.*;
import redis.clients.jedis.util.JedisClusterHashTagUtil;
import redis.clients.jedis.util.KeyMergeUtil;
import redis.clients.jedis.util.SafeEncoder;

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLParameters;
Expand Down Expand Up @@ -2427,6 +2420,16 @@ public List<byte[]> execute(Jedis connection) {
}.runBinary(keys.length, keys);
}

@Override
public List<byte[]> xread(final XReadParams xReadParams, final Entry<byte[], byte[]>... streams) {
return new JedisClusterCommand<List<byte[]>>(connectionHandler, maxAttempts) {
@Override
public List<byte[]> execute(Jedis connection) {
return connection.xread(xReadParams, streams);
}
}.runBinary(streams.length, getKeys(streams));
}

@Override
public Long xack(final byte[] key, final byte[] group, final byte[]... ids) {
return new JedisClusterCommand<Long>(connectionHandler, maxAttempts) {
Expand Down Expand Up @@ -2492,6 +2495,17 @@ public List<byte[]> execute(Jedis connection) {
}.runBinary(keys.length, keys);
}

@Override
public List<byte[]> xreadGroup(final byte[] groupname, final byte[] consumer, final XReadGroupParams xReadGroupParams,
final Entry<byte[], byte[]>... streams) {
return new JedisClusterCommand<List<byte[]>>(connectionHandler, maxAttempts) {
@Override
public List<byte[]> execute(Jedis connection) {
return connection.xreadGroup(groupname, consumer, xReadGroupParams, streams);
}
}.runBinary(streams.length, getKeys(streams));
}

@Override
public Long xdel(final byte[] key, final byte[]... ids) {
return new JedisClusterCommand<Long>(connectionHandler, maxAttempts) {
Expand Down Expand Up @@ -2596,4 +2610,12 @@ public Object execute(Jedis connection) {
}
}.runBinary(sampleKey);
}

private static byte[][] getKeys(final Entry<byte[], ?>... entries) {
byte[][] keys = new byte[entries.length][];
for (int i = 0; i < entries.length; i++) {
keys[i] = entries[i].getKey();
}
return keys;
}
}
60 changes: 41 additions & 19 deletions src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package redis.clients.jedis;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -703,10 +704,36 @@ public List<StreamEntryID> build(Object data) {
}
return responses;
}
};

public static final Builder<StreamEntry> STREAM_ENTRY = new Builder<StreamEntry>() {
@Override
@SuppressWarnings("unchecked")
public StreamEntry build(Object data) {
if (null == data) {
return null;
}
List<Object> objectList = (List<Object>) data;

if (objectList.isEmpty()) {
return null;
}

String entryIdString = SafeEncoder.encode((byte[]) objectList.get(0));
StreamEntryID entryID = new StreamEntryID(entryIdString);
List<byte[]> hash = (List<byte[]>) objectList.get(1);

Iterator<byte[]> hashIterator = hash.iterator();
Map<String, String> map = new HashMap<>(hash.size() / 2);
while (hashIterator.hasNext()) {
map.put(SafeEncoder.encode(hashIterator.next()), SafeEncoder.encode(hashIterator.next()));
}
return new StreamEntry(entryID, map);
}

@Override
public String toString() {
return "List<StreamEntryID>";
return "StreamEntry";
}
};

Expand Down Expand Up @@ -750,34 +777,29 @@ public String toString() {
}
};

public static final Builder<StreamEntry> STREAM_ENTRY = new Builder<StreamEntry>() {
public static final Builder<List<Map.Entry<String, List<StreamEntry>>>> STREAM_READ_RESPONSE
= new Builder<List<Map.Entry<String, List<StreamEntry>>>>() {
@Override
@SuppressWarnings("unchecked")
public StreamEntry build(Object data) {
if (null == data) {
public List<Map.Entry<String, List<StreamEntry>>> build(Object data) {
if (data == null) {
return null;
}
List<Object> objectList = (List<Object>) data;
List<Object> streams = (List<Object>) data;

if (objectList.isEmpty()) {
return null;
List<Map.Entry<String, List<StreamEntry>>> result = new ArrayList<>(streams.size());
for (Object streamObj : streams) {
List<Object> stream = (List<Object>) streamObj;
String streamId = SafeEncoder.encode((byte[]) stream.get(0));
List<StreamEntry> streamEntries = BuilderFactory.STREAM_ENTRY_LIST.build(stream.get(1));
result.add(new AbstractMap.SimpleEntry<>(streamId, streamEntries));
}

String entryIdString = SafeEncoder.encode((byte[]) objectList.get(0));
StreamEntryID entryID = new StreamEntryID(entryIdString);
List<byte[]> hash = (List<byte[]>) objectList.get(1);

Iterator<byte[]> hashIterator = hash.iterator();
Map<String, String> map = new HashMap<>(hash.size() / 2);
while (hashIterator.hasNext()) {
map.put(SafeEncoder.encode(hashIterator.next()), SafeEncoder.encode(hashIterator.next()));
}
return new StreamEntry(entryID, map);
return result;
}

@Override
public String toString() {
return "StreamEntry";
return "List<Entry<String, List<StreamEntry>>>";
}
};

Expand Down
54 changes: 44 additions & 10 deletions src/main/java/redis/clients/jedis/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,7 @@
import javax.net.ssl.SSLSocketFactory;

import redis.clients.jedis.commands.Commands;
import redis.clients.jedis.params.GeoAddParams;
import redis.clients.jedis.params.GeoRadiusParam;
import redis.clients.jedis.params.GeoRadiusStoreParam;
import redis.clients.jedis.params.GetExParams;
import redis.clients.jedis.params.MigrateParams;
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
import redis.clients.jedis.params.LPosParams;
import redis.clients.jedis.params.*;
import redis.clients.jedis.util.SafeEncoder;

public class Client extends BinaryClient implements Commands {
Expand Down Expand Up @@ -1384,6 +1375,25 @@ public void xread(final int count, final long block,
xread(count, block, bhash);
}

@Override
public void xread(final XReadParams params, final Map<String, StreamEntryID> streams) {
final byte[][] bparams = params.getByteParams();
final int paramLength = bparams.length;

final byte[][] args = new byte[paramLength + 1 + streams.size() * 2][];
System.arraycopy(bparams, 0, args, 0, paramLength);

args[paramLength] = Protocol.Keyword.STREAMS.raw;
int keyIndex = paramLength + 1;
int idsIndex = keyIndex + streams.size();
for (Entry<String, StreamEntryID> entry : streams.entrySet()) {
args[keyIndex++] = SafeEncoder.encode(entry.getKey());
args[idsIndex++] = SafeEncoder.encode(entry.getValue().toString());
}

sendCommand(Protocol.Command.XREAD, args);
}

@Override
public void xack(final String key, final String group, final StreamEntryID... ids) {
final byte[][] bids = new byte[ids.length][];
Expand Down Expand Up @@ -1442,6 +1452,30 @@ public void xreadGroup(String groupname, String consumer, int count, long block,
xreadGroup(SafeEncoder.encode(groupname), SafeEncoder.encode(consumer), count, block, noAck, bhash);
}

@Override
public void xreadGroup(String groupname, String consumer, XReadGroupParams params, Map<String, StreamEntryID> streams) {
final byte[][] bparams = params.getByteParams();
final int paramLength = bparams.length;

final byte[][] args = new byte[3 + paramLength + 1 + streams.size() * 2][];
int index = 0;
args[index++] = Protocol.Keyword.GROUP.raw;
args[index++] = SafeEncoder.encode(groupname);
args[index++] = SafeEncoder.encode(consumer);
System.arraycopy(bparams, 0, args, index, paramLength);
index += paramLength;

args[index++] = Protocol.Keyword.STREAMS.raw;
int keyIndex = index;
int idsIndex = keyIndex + streams.size();
for (Entry<String, StreamEntryID> entry : streams.entrySet()) {
args[keyIndex++] = SafeEncoder.encode(entry.getKey());
args[idsIndex++] = SafeEncoder.encode(entry.getValue().toString());
}

sendCommand(Protocol.Command.XREADGROUP, args);
}

@Override
public void xpending(String key, String groupname, StreamEntryID start, StreamEntryID end,
int count, String consumername) {
Expand Down
Loading