Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion src/main/java/redis/clients/jedis/BinaryClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import redis.clients.jedis.params.GetExParams;
import redis.clients.jedis.params.MigrateParams;
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
import redis.clients.jedis.params.LPosParams;
Expand Down Expand Up @@ -1690,7 +1691,7 @@ public void xpendingSummary(final byte[] key, final byte[] groupname) {
public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
long newIdleTime, int retries, boolean force, byte[][] ids) {

ArrayList<byte[]> arguments = new ArrayList<>(10 + ids.length);
List<byte[]> arguments = new ArrayList<>(10 + ids.length);

arguments.add(key);
arguments.add(groupname);
Expand All @@ -1713,6 +1714,37 @@ public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minId
sendCommand(XCLAIM, arguments.toArray(new byte[arguments.size()][]));
}

private void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
XClaimParams params, byte[][] ids, boolean justId) {
final byte[][] bparams = params.getByteParams();
final int paramLength = bparams.length;
final int idsLength = ids.length;
final byte[][] args = new byte[4 + paramLength + idsLength + (justId ? 1 : 0)][];
int index = 0;
args[index++] = key;
args[index++] = groupname;
args[index++] = consumername;
args[index++] = toByteArray(minIdleTime);
System.arraycopy(ids, 0, args, index, idsLength);
index += idsLength;
System.arraycopy(bparams, 0, args, index, paramLength);
index += paramLength;
if (justId) {
args[index++] = Keyword.JUSTID.getRaw();
}
sendCommand(XCLAIM, args);
}

public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
XClaimParams params, byte[]... ids) {
xclaim(key, groupname, consumername, minIdleTime, params, ids, false);
}

public void xclaimJustId(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
XClaimParams params, byte[]... ids) {
xclaim(key, groupname, consumername, minIdleTime, params, ids, true);
}

public void xinfoStream(byte[] key) {
sendCommand(XINFO, Keyword.STREAM.getRaw(), key);
}
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import redis.clients.jedis.params.GetExParams;
import redis.clients.jedis.params.MigrateParams;
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
import redis.clients.jedis.params.LPosParams;
Expand Down Expand Up @@ -4542,6 +4543,22 @@ public List<byte[]> xclaim(byte[] key, byte[] groupname, byte[] consumername, lo
return client.getBinaryMultiBulkReply();
}

@Override
public List<byte[]> xclaim(byte[] key, byte[] group, byte[] consumername, long minIdleTime,
XClaimParams params, byte[]... ids) {
checkIsInMultiOrPipeline();
client.xclaim(key, group, consumername, minIdleTime, params, ids);
return client.getBinaryMultiBulkReply();
}

@Override
public List<byte[]> xclaimJustId(byte[] key, byte[] group, byte[] consumername, long minIdleTime,
XClaimParams params, byte[]... ids) {
checkIsInMultiOrPipeline();
client.xclaimJustId(key, group, consumername, minIdleTime, params, ids);
return client.getBinaryMultiBulkReply();
}

@Override
public StreamInfo xinfoStream(byte[] key) {
checkIsInMultiOrPipeline();
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryJedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import redis.clients.jedis.params.GeoRadiusStoreParam;
import redis.clients.jedis.params.GetExParams;
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
import redis.clients.jedis.params.LPosParams;
Expand Down Expand Up @@ -2495,6 +2496,28 @@ public List<byte[]> execute(Jedis connection) {
}.runBinary(key);
}

@Override
public List<byte[]> xclaim(byte[] key, byte[] group, byte[] consumername, long minIdleTime,
XClaimParams params, byte[]... ids) {
return new JedisClusterCommand<List<byte[]>>(connectionHandler, maxAttempts) {
@Override
public List<byte[]> execute(Jedis connection) {
return connection.xclaim(key, group, consumername, minIdleTime, params, ids);
}
}.runBinary(key);
}

@Override
public List<byte[]> xclaimJustId(byte[] key, byte[] group, byte[] consumername, long minIdleTime,
XClaimParams params, byte[]... ids) {
return new JedisClusterCommand<List<byte[]>>(connectionHandler, maxAttempts) {
@Override
public List<byte[]> execute(Jedis connection) {
return connection.xclaimJustId(key, group, consumername, minIdleTime, params, ids);
}
}.runBinary(key);
}

@Override
public Long waitReplicas(final byte[] key, final int replicas, final long timeout) {
return new JedisClusterCommand<Long>(connectionHandler, maxAttempts) {
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryShardedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import redis.clients.jedis.params.GeoRadiusParam;
import redis.clients.jedis.params.GetExParams;
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
import redis.clients.jedis.params.LPosParams;
Expand Down Expand Up @@ -1176,6 +1177,20 @@ public List<byte[]> xclaim(byte[] key, byte[] groupname, byte[] consumername, lo
return j.xclaim(key, groupname, consumername, minIdleTime, newIdleTime, retries, force, ids);
}

@Override
public List<byte[]> xclaim(byte[] key, byte[] group, byte[] consumername, long minIdleTime,
XClaimParams params, byte[]... ids) {
Jedis j = getShard(key);
return j.xclaim(key, group, consumername, minIdleTime, params, ids);
}

@Override
public List<byte[]> xclaimJustId(byte[] key, byte[] group, byte[] consumername, long minIdleTime,
XClaimParams params, byte[]... ids) {
Jedis j = getShard(key);
return j.xclaimJustId(key, group, consumername, minIdleTime, params, ids);
}

@Override
public StreamInfo xinfoStream(byte[] key) {
Jedis j = getShard(key);
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,29 @@ public String toString() {
}
};

public static final Builder<List<StreamEntryID>> STREAM_ENTRY_ID_LIST = new Builder<List<StreamEntryID>>() {
@Override
@SuppressWarnings("unchecked")
public List<StreamEntryID> build(Object data) {
if (null == data) {
return null;
}
List<Object> objectList = (List<Object>) data;
List<StreamEntryID> responses = new ArrayList<>(objectList.size());
if (!objectList.isEmpty()) {
for(Object object : objectList) {
responses.add(STREAM_ENTRY_ID.build(object));
}
}
return responses;
}

@Override
public String toString() {
return "List<StreamEntryID>";
}
};

public static final Builder<List<StreamEntry>> STREAM_ENTRY_LIST = new Builder<List<StreamEntry>>() {
@Override
@SuppressWarnings("unchecked")
Expand Down
30 changes: 25 additions & 5 deletions src/main/java/redis/clients/jedis/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import redis.clients.jedis.params.GetExParams;
import redis.clients.jedis.params.MigrateParams;
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
import redis.clients.jedis.params.LPosParams;
Expand Down Expand Up @@ -1433,14 +1434,26 @@ public void xpendingSummary(String key, String groupname) {
@Override
public void xclaim(String key, String group, String consumername, long minIdleTime,
long newIdleTime, int retries, boolean force, StreamEntryID... ids) {

final byte[][] bids = new byte[ids.length][];
for (int i = 0; i < ids.length; i++) {
bids[i] = SafeEncoder.encode(ids[i].toString());
}
final byte[][] bids = convertStreamEntryIDsToBinary(ids);
xclaim(SafeEncoder.encode(key), SafeEncoder.encode(group), SafeEncoder.encode(consumername), minIdleTime, newIdleTime, retries, force, bids);
}

@Override
public void xclaim(String key, String group, String consumername, long minIdleTime,
XClaimParams params, StreamEntryID... ids) {
final byte[][] bids = convertStreamEntryIDsToBinary(ids);
xclaim(SafeEncoder.encode(key), SafeEncoder.encode(group), SafeEncoder.encode(consumername),
minIdleTime, params, bids);
}

@Override
public void xclaimJustId(String key, String group, String consumername, long minIdleTime,
XClaimParams params, StreamEntryID... ids) {
final byte[][] bids = convertStreamEntryIDsToBinary(ids);
xclaimJustId(SafeEncoder.encode(key), SafeEncoder.encode(group), SafeEncoder.encode(consumername),
minIdleTime, params, bids);
}

@Override
public void xinfoStream(String key) {
xinfoStream(SafeEncoder.encode(key));
Expand All @@ -1456,4 +1469,11 @@ public void xinfoConsumers(String key, String group) {
xinfoConsumers(SafeEncoder.encode(key), SafeEncoder.encode(group));
}

private byte[][] convertStreamEntryIDsToBinary(StreamEntryID... ids) {
final byte[][] bids = new byte[ids.length][];
for (int i = 0; i < ids.length; i++) {
bids[i] = SafeEncoder.encode(ids[i].toString());
}
return bids;
}
}
19 changes: 19 additions & 0 deletions src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import redis.clients.jedis.params.GetExParams;
import redis.clients.jedis.params.MigrateParams;
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
import redis.clients.jedis.params.LPosParams;
Expand Down Expand Up @@ -4163,6 +4164,24 @@ public List<StreamEntry> xclaim(String key, String group, String consumername, l
return BuilderFactory.STREAM_ENTRY_LIST.build(client.getObjectMultiBulkReply());
}

@Override
public List<StreamEntry> xclaim(String key, String group, String consumername, long minIdleTime,
XClaimParams params, StreamEntryID... ids) {
checkIsInMultiOrPipeline();
client.xclaim(key, group, consumername, minIdleTime, params, ids);

return BuilderFactory.STREAM_ENTRY_LIST.build(client.getObjectMultiBulkReply());
}

@Override
public List<StreamEntryID> xclaimJustId(String key, String group, String consumername,
long minIdleTime, XClaimParams params, StreamEntryID... ids) {
checkIsInMultiOrPipeline();
client.xclaimJustId(key, group, consumername, minIdleTime, params, ids);

return BuilderFactory.STREAM_ENTRY_ID_LIST.build(client.getObjectMultiBulkReply());
}

@Override
public StreamInfo xinfoStream(String key) {
client.xinfoStream(key);
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import redis.clients.jedis.params.GeoRadiusStoreParam;
import redis.clients.jedis.params.GetExParams;
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
import redis.clients.jedis.params.LPosParams;
Expand Down Expand Up @@ -2587,6 +2588,28 @@ public List<StreamEntry> execute(Jedis connection) {
}.run(key);
}

@Override
public List<StreamEntry> xclaim(String key, String group, String consumername, long minIdleTime,
XClaimParams params, StreamEntryID... ids) {
return new JedisClusterCommand<List<StreamEntry>>(connectionHandler, maxAttempts) {
@Override
public List<StreamEntry> execute(Jedis connection) {
return connection.xclaim(key, group, consumername, minIdleTime, params, ids);
}
}.run(key);
}

@Override
public List<StreamEntryID> xclaimJustId(String key, String group, String consumername,
long minIdleTime, XClaimParams params, StreamEntryID... ids) {
return new JedisClusterCommand<List<StreamEntryID>>(connectionHandler, maxAttempts) {
@Override
public List<StreamEntryID> execute(Jedis connection) {
return connection.xclaimJustId(key, group, consumername, minIdleTime, params, ids);
}
}.run(key);
}

public Long waitReplicas(final String key, final int replicas, final long timeout) {
return new JedisClusterCommand<Long>(connectionHandler, maxAttempts) {
@Override
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/redis/clients/jedis/PipelineBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import redis.clients.jedis.params.GeoRadiusParam;
import redis.clients.jedis.params.GetExParams;
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
import redis.clients.jedis.params.LPosParams;
Expand Down Expand Up @@ -2221,6 +2222,34 @@ public Response<List<byte[]>> xclaim(byte[] key, byte[] group, byte[] consumerna
return getResponse(BuilderFactory.BYTE_ARRAY_LIST);
}

@Override
public Response<List<StreamEntry>> xclaim(String key, String group, String consumername,
long minIdleTime, XClaimParams params, StreamEntryID... ids) {
getClient(key).xclaim(key, group, consumername, minIdleTime, params, ids);
return getResponse(BuilderFactory.STREAM_ENTRY_LIST);
}

@Override
public Response<List<byte[]>> xclaim(byte[] key, byte[] group, byte[] consumername,
long minIdleTime, XClaimParams params, byte[]... ids) {
getClient(key).xclaim(key, group, consumername, minIdleTime, params, ids);
return getResponse(BuilderFactory.BYTE_ARRAY_LIST);
}

@Override
public Response<List<StreamEntryID>> xclaimJustId(String key, String group, String consumername,
long minIdleTime, XClaimParams params, StreamEntryID... ids) {
getClient(key).xclaimJustId(key, group, consumername, minIdleTime, params, ids);
return getResponse(BuilderFactory.STREAM_ENTRY_ID_LIST);
}

@Override
public Response<List<byte[]>> xclaimJustId(byte[] key, byte[] group, byte[] consumername,
long minIdleTime, XClaimParams params, byte[]... ids) {
getClient(key).xclaimJustId(key, group, consumername, minIdleTime, params, ids);
return getResponse(BuilderFactory.BYTE_ARRAY_LIST);
}

public Response<Object> sendCommand(final String sampleKey, final ProtocolCommand cmd,
final String... args) {
getClient(sampleKey).sendCommand(cmd, args);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public static enum Keyword {
GETNAME, SETNAME, LIST, MATCH, COUNT, PING, PONG, UNLOAD, REPLACE, KEYS, PAUSE, DOCTOR, BLOCK,
NOACK, STREAMS, KEY, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP, ID, IDLE,
TIME, RETRYCOUNT, FORCE, USAGE, SAMPLES, STREAM, GROUPS, CONSUMERS, HELP, FREQ, SETUSER,
GETUSER, DELUSER, WHOAMI, CAT, GENPASS, USERS, LOG, INCR, SAVE;
GETUSER, DELUSER, WHOAMI, CAT, GENPASS, USERS, LOG, INCR, SAVE, JUSTID;

/**
* @deprecated This will be private in future. Use {@link #getRaw()}.
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/redis/clients/jedis/ShardedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import redis.clients.jedis.params.GeoRadiusParam;
import redis.clients.jedis.params.GetExParams;
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
import redis.clients.jedis.params.LPosParams;
Expand Down Expand Up @@ -1172,6 +1173,20 @@ public List<StreamEntry> xclaim(String key, String group, String consumername, l
return j.xclaim(key, group, consumername, minIdleTime, newIdleTime, retries, force, ids);
}

@Override
public List<StreamEntry> xclaim(String key, String group, String consumername, long minIdleTime,
XClaimParams params, StreamEntryID... ids) {
Jedis j = getShard(key);
return j.xclaim(key, group, consumername, minIdleTime, params, ids);
}

@Override
public List<StreamEntryID> xclaimJustId(String key, String group, String consumername,
long minIdleTime, XClaimParams params, StreamEntryID... ids) {
Jedis j = getShard(key);
return j.xclaimJustId(key, group, consumername, minIdleTime, params, ids);
}

@Override
public StreamInfo xinfoStream(String key) {

Expand Down
Loading