Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1848,6 +1848,10 @@ public void xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int
}
}

public void xpending(byte[] key, byte[] groupname, XPendingParams params) {
sendCommand(XPENDING, joinParameters(key, groupname, params.getByteParams()));
}

public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
long newIdleTime, int retries, boolean force, byte[][] ids) {

Expand Down
7 changes: 7 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4758,6 +4758,13 @@ public Object xpending(final byte[] key, final byte[] groupname) {
return client.getOne();
}

@Override
public List<Object> xpending(final byte[] key, final byte[] groupname, final XPendingParams params) {
checkIsInMultiOrPipeline();
client.xpending(key, groupname, params);
return client.getObjectMultiBulkReply();
}

@Override
public List<byte[]> xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
long newIdleTime, int retries, boolean force, byte[]... ids) {
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryJedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -2679,6 +2679,16 @@ public Object execute(Jedis connection) {
}.runBinary(key);
}

@Override
public List<Object> xpending(final byte[] key, final byte[] groupname, final XPendingParams params) {
return new JedisClusterCommand<List<Object>>(connectionHandler, maxAttempts) {
@Override
public List<Object> execute(Jedis connection) {
return connection.xpending(key, groupname, params);
}
}.runBinary(key);
}

@Override
public List<byte[]> xclaim(final byte[] key, final byte[] groupname, final byte[] consumername,
final long minIdleTime, final long newIdleTime, final int retries, final boolean force,
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryShardedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
Expand Down Expand Up @@ -1232,6 +1233,12 @@ public Object xpending(final byte[] key, final byte[] groupname) {
return j.xpending(key, groupname);
}

@Override
public List<Object> xpending(final byte[] key, final byte[] groupname, final XPendingParams params) {
Jedis j = getShard(key);
return j.xpending(key, groupname, params);
}

@Override
public List<byte[]> xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
long newIdleTime, int retries, boolean force, byte[]... ids) {
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/redis/clients/jedis/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -1561,6 +1561,11 @@ public void xpending(String key, String groupname, StreamEntryID start, StreamEn
SafeEncoder.encode(end==null ? "+" : end.toString()), count, consumername == null? null : SafeEncoder.encode(consumername));
}

@Override
public void xpending(String key, String groupname, XPendingParams params) {
xpending(SafeEncoder.encode(key), SafeEncoder.encode(groupname), params);
}

@Override
public void xclaim(String key, String group, String consumername, long minIdleTime,
long newIdleTime, int retries, boolean force, StreamEntryID... ids) {
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4323,6 +4323,13 @@ public List<StreamPendingEntry> xpending(final String key, final String groupnam
return BuilderFactory.STREAM_PENDING_ENTRY_LIST.build(client.getObjectMultiBulkReply());
}

@Override
public List<StreamPendingEntry> xpending(final String key, final String groupname, final XPendingParams params) {
checkIsInMultiOrPipeline();
client.xpending(key, groupname, params);
return BuilderFactory.STREAM_PENDING_ENTRY_LIST.build(client.getObjectMultiBulkReply());
}

@Override
public List<StreamEntry> xclaim(String key, String group, String consumername, long minIdleTime,
long newIdleTime, int retries, boolean force, StreamEntryID... ids) {
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -2732,6 +2732,16 @@ public List<StreamPendingEntry> execute(Jedis connection) {
}.run(key);
}

@Override
public List<StreamPendingEntry> xpending(final String key, final String groupname, final XPendingParams params) {
return new JedisClusterCommand<List<StreamPendingEntry>>(connectionHandler, maxAttempts) {
@Override
public List<StreamPendingEntry> execute(Jedis connection) {
return connection.xpending(key, groupname, params);
}
}.run(key);
}

@Override
public Long xdel(final String key, final StreamEntryID... ids) {
return new JedisClusterCommand<Long>(connectionHandler, maxAttempts) {
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/redis/clients/jedis/PipelineBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
Expand Down Expand Up @@ -2294,6 +2295,18 @@ public Response<List<Object>> xpendingBinary(byte[] key, byte[] groupname, byte[
return getResponse(BuilderFactory.RAW_OBJECT_LIST);
}

@Override
public Response<List<Object>> xpending(byte[] key, byte[] groupname, XPendingParams params) {
getClient(key).xpending(key, groupname, params);
return getResponse(BuilderFactory.RAW_OBJECT_LIST);
}

@Override
public Response<List<StreamPendingEntry>> xpending(String key, String groupname, XPendingParams params) {
getClient(key).xpending(key, groupname, params);
return getResponse(BuilderFactory.STREAM_PENDING_ENTRY_LIST);
}

@Override
public Response<Long> xdel(String key, StreamEntryID... ids) {
getClient(key).xdel(key, ids);
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/redis/clients/jedis/ShardedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
Expand Down Expand Up @@ -1228,6 +1229,12 @@ public List<StreamPendingEntry> xpending(String key, String groupname, StreamEnt
return j.xpending(key, groupname, start, end, count, consumername);
}

@Override
public List<StreamPendingEntry> xpending(String key, String groupname, XPendingParams params) {
Jedis j = getShard(key);
return j.xpending(key, groupname, params);
}

@Override
public List<StreamEntry> xclaim(String key, String group, String consumername, long minIdleTime,
long newIdleTime, int retries, boolean force, StreamEntryID... ids) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
Expand Down Expand Up @@ -425,6 +426,8 @@ List<GeoRadiusResponse> georadiusByMemberReadonly(byte[] key, byte[] member, dou

List<Object> xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername);

List<Object> xpending(byte[] key, byte[] groupname, XPendingParams params);

List<byte[]> xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[][] ids);

List<byte[]> xclaim(byte[] key, byte[] group, byte[] consumername, long minIdleTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
Expand Down Expand Up @@ -456,6 +457,8 @@ default List<byte[]> xrange(byte[] key, byte[] start, byte[] end, long count) {

List<Object> xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername);

List<Object> xpending(byte[] key, byte[] groupname, XPendingParams params);

List<byte[]> xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[]... ids);

List<byte[]> xclaim(byte[] key, byte[] group, byte[] consumername, long minIdleTime, XClaimParams params, byte[]... ids);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import redis.clients.jedis.GeoRadiusResponse;
import redis.clients.jedis.GeoUnit;
import redis.clients.jedis.ListPosition;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.StreamPendingEntry;
import redis.clients.jedis.Response;
import redis.clients.jedis.SortingParams;
Expand All @@ -16,6 +15,7 @@
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
Expand Down Expand Up @@ -418,6 +418,8 @@ Response<List<GeoRadiusResponse>> georadiusByMemberReadonly(byte[] key, byte[] m

Response<List<Object>> xpendingBinary(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername);

Response<List<Object>> xpending(byte[] key, byte[] groupname, XPendingParams params);

Response<Long> xdel(byte[] key, byte[]... ids);

Response<Long> xtrim(byte[] key, long maxLen, boolean approximateLength);
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/redis/clients/jedis/commands/Commands.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
Expand Down Expand Up @@ -513,6 +514,8 @@ default void restoreReplace(String key, int ttl, byte[] serializedValue) {

void xpending(String key, String groupname, StreamEntryID start, StreamEntryID end, int count, String consumername);

void xpending(String key, String groupname, XPendingParams params);

void xclaim(String key, String group, String consumername, long minIdleTime, long newIdleTime,
int retries, boolean force, StreamEntryID... ids);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
Expand Down Expand Up @@ -576,6 +577,16 @@ List<GeoRadiusResponse> georadiusByMemberReadonly(String key, String member, dou
*/
List<StreamPendingEntry> xpending(String key, String groupname, StreamEntryID start, StreamEntryID end, int count, String consumername);

/**
* XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
*
* @param key
* @param groupname
* @param params
* @return
*/
List<StreamPendingEntry> xpending(String key, String groupname, XPendingParams params);

/**
* XDEL key ID [ID ...]
* @param key
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/redis/clients/jedis/commands/JedisCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
Expand Down Expand Up @@ -580,6 +581,15 @@ List<GeoRadiusResponse> georadiusByMemberReadonly(String key, String member, dou
List<StreamPendingEntry> xpending(String key, String groupname, StreamEntryID start,
StreamEntryID end, int count, String consumername);

/**
* XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
*
* @param key
* @param groupname
* @param params
*/
List<StreamPendingEntry> xpending(String key, String groupname, XPendingParams params);

/**
* XDEL key ID [ID ...]
* @param key
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/redis/clients/jedis/commands/RedisPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
Expand Down Expand Up @@ -409,6 +410,8 @@ Response<List<GeoRadiusResponse>> georadiusByMemberReadonly(String key, String m
Response<List<StreamPendingEntry>> xpending(String key, String groupname,
StreamEntryID start, StreamEntryID end, int count, String consumername);

Response<List<StreamPendingEntry>> xpending(String key, String groupname, XPendingParams params);

Response<Long> xdel( String key, StreamEntryID... ids);

Response<Long> xtrim( String key, long maxLen, boolean approximateLength);
Expand Down
83 changes: 83 additions & 0 deletions src/main/java/redis/clients/jedis/params/XPendingParams.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package redis.clients.jedis.params;

import static redis.clients.jedis.Protocol.Keyword.IDLE;

import java.util.ArrayList;
import java.util.List;

import redis.clients.jedis.Protocol;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.util.SafeEncoder;

public class XPendingParams extends Params {

private Long idle;

private String consumer;

private StreamEntryID start;

private StreamEntryID end;

private Integer count;

public static XPendingParams xPendingParams() {
return new XPendingParams();
}

public XPendingParams idle(long idle) {
this.idle = idle;
return this;
}

public XPendingParams start(StreamEntryID start) {
this.start = start;
return this;
}

public XPendingParams end(StreamEntryID end) {
this.end = end;
return this;
}

public XPendingParams count(int count) {
this.count = count;
return this;
}

public XPendingParams consumer(String consumer) {
this.consumer = consumer;
return this;
}

@Override
public byte[][] getByteParams() {
List<byte[]> byteParams = new ArrayList<>();

if (idle != null) {
byteParams.add(IDLE.getRaw());
byteParams.add(Protocol.toByteArray(idle));
}

if (start == null) {
byteParams.add(SafeEncoder.encode("-"));
} else {
byteParams.add(SafeEncoder.encode(start.toString()));
}

if (end == null) {
byteParams.add(SafeEncoder.encode("+"));
} else {
byteParams.add(SafeEncoder.encode(end.toString()));
}

if (count != null) {
byteParams.add(Protocol.toByteArray(count));
}

if (consumer != null) {
byteParams.add(SafeEncoder.encode(consumer));
}
return byteParams.toArray(new byte[byteParams.size()][]);
}
}
Loading