Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1848,6 +1848,10 @@ public void xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int
}
}

public void xpending(byte[] key, byte[] groupname, XPendingParams params) {
sendCommand(XPENDING, joinParameters(key, groupname, params.getByteParams()));
}

public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
long newIdleTime, int retries, boolean force, byte[][] ids) {

Expand Down
7 changes: 7 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4758,6 +4758,13 @@ public Object xpending(final byte[] key, final byte[] groupname) {
return client.getOne();
}

@Override
public List<Object> xpending(final byte[] key, final byte[] groupname, final XPendingParams params) {
checkIsInMultiOrPipeline();
client.xpending(key, groupname, params);
return client.getObjectMultiBulkReply();
}

@Override
public List<byte[]> xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
long newIdleTime, int retries, boolean force, byte[]... ids) {
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryJedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -2679,6 +2679,16 @@ public Object execute(Jedis connection) {
}.runBinary(key);
}

@Override
public List<Object> xpending(final byte[] key, final byte[] groupname, final XPendingParams params) {
return new JedisClusterCommand<List<Object>>(connectionHandler, maxAttempts) {
@Override
public List<Object> execute(Jedis connection) {
return connection.xpending(key, groupname, params);
}
}.runBinary(key);
}

@Override
public List<byte[]> xclaim(final byte[] key, final byte[] groupname, final byte[] consumername,
final long minIdleTime, final long newIdleTime, final int retries, final boolean force,
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryShardedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
Expand Down Expand Up @@ -1232,6 +1233,12 @@ public Object xpending(final byte[] key, final byte[] groupname) {
return j.xpending(key, groupname);
}

@Override
public List<Object> xpending(final byte[] key, final byte[] groupname, final XPendingParams params) {
Jedis j = getShard(key);
return j.xpending(key, groupname, params);
}

@Override
public List<byte[]> xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
long newIdleTime, int retries, boolean force, byte[]... ids) {
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/redis/clients/jedis/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -1561,6 +1561,11 @@ public void xpending(String key, String groupname, StreamEntryID start, StreamEn
SafeEncoder.encode(end==null ? "+" : end.toString()), count, consumername == null? null : SafeEncoder.encode(consumername));
}

@Override
public void xpending(String key, String groupname, XPendingParams params) {
xpending(SafeEncoder.encode(key), SafeEncoder.encode(groupname), params);
}

@Override
public void xclaim(String key, String group, String consumername, long minIdleTime,
long newIdleTime, int retries, boolean force, StreamEntryID... ids) {
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4323,6 +4323,13 @@ public List<StreamPendingEntry> xpending(final String key, final String groupnam
return BuilderFactory.STREAM_PENDING_ENTRY_LIST.build(client.getObjectMultiBulkReply());
}

@Override
public List<StreamPendingEntry> xpending(final String key, final String groupname, final XPendingParams params) {
checkIsInMultiOrPipeline();
client.xpending(key, groupname, params);
return BuilderFactory.STREAM_PENDING_ENTRY_LIST.build(client.getObjectMultiBulkReply());
}

@Override
public List<StreamEntry> xclaim(String key, String group, String consumername, long minIdleTime,
long newIdleTime, int retries, boolean force, StreamEntryID... ids) {
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -2732,6 +2732,16 @@ public List<StreamPendingEntry> execute(Jedis connection) {
}.run(key);
}

@Override
public List<StreamPendingEntry> xpending(final String key, final String groupname, final XPendingParams params) {
return new JedisClusterCommand<List<StreamPendingEntry>>(connectionHandler, maxAttempts) {
@Override
public List<StreamPendingEntry> execute(Jedis connection) {
return connection.xpending(key, groupname, params);
}
}.run(key);
}

@Override
public Long xdel(final String key, final StreamEntryID... ids) {
return new JedisClusterCommand<Long>(connectionHandler, maxAttempts) {
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/redis/clients/jedis/PipelineBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
Expand Down Expand Up @@ -2294,6 +2295,18 @@ public Response<List<Object>> xpendingBinary(byte[] key, byte[] groupname, byte[
return getResponse(BuilderFactory.RAW_OBJECT_LIST);
}

@Override
public Response<List<Object>> xpending(byte[] key, byte[] groupname, XPendingParams params) {
getClient(key).xpending(key, groupname, params);
return getResponse(BuilderFactory.RAW_OBJECT_LIST);
}

@Override
public Response<List<StreamPendingEntry>> xpending(String key, String groupname, XPendingParams params) {
getClient(key).xpending(key, groupname, params);
return getResponse(BuilderFactory.STREAM_PENDING_ENTRY_LIST);
}

@Override
public Response<Long> xdel(String key, StreamEntryID... ids) {
getClient(key).xdel(key, ids);
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/redis/clients/jedis/ShardedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
Expand Down Expand Up @@ -1228,6 +1229,12 @@ public List<StreamPendingEntry> xpending(String key, String groupname, StreamEnt
return j.xpending(key, groupname, start, end, count, consumername);
}

@Override
public List<StreamPendingEntry> xpending(String key, String groupname, XPendingParams params) {
Jedis j = getShard(key);
return j.xpending(key, groupname, params);
}

@Override
public List<StreamEntry> xclaim(String key, String group, String consumername, long minIdleTime,
long newIdleTime, int retries, boolean force, StreamEntryID... ids) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
Expand Down Expand Up @@ -425,6 +426,8 @@ List<GeoRadiusResponse> georadiusByMemberReadonly(byte[] key, byte[] member, dou

List<Object> xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername);

List<Object> xpending(byte[] key, byte[] groupname, XPendingParams params);

List<byte[]> xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[][] ids);

List<byte[]> xclaim(byte[] key, byte[] group, byte[] consumername, long minIdleTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
Expand Down Expand Up @@ -456,6 +457,8 @@ default List<byte[]> xrange(byte[] key, byte[] start, byte[] end, long count) {

List<Object> xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername);

List<Object> xpending(byte[] key, byte[] groupname, XPendingParams params);

List<byte[]> xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[]... ids);

List<byte[]> xclaim(byte[] key, byte[] group, byte[] consumername, long minIdleTime, XClaimParams params, byte[]... ids);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import redis.clients.jedis.GeoRadiusResponse;
import redis.clients.jedis.GeoUnit;
import redis.clients.jedis.ListPosition;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.StreamPendingEntry;
import redis.clients.jedis.Response;
import redis.clients.jedis.SortingParams;
Expand All @@ -16,6 +15,7 @@
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
Expand Down Expand Up @@ -418,6 +418,8 @@ Response<List<GeoRadiusResponse>> georadiusByMemberReadonly(byte[] key, byte[] m

Response<List<Object>> xpendingBinary(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername);

Response<List<Object>> xpending(byte[] key, byte[] groupname, XPendingParams params);

Response<Long> xdel(byte[] key, byte[]... ids);

Response<Long> xtrim(byte[] key, long maxLen, boolean approximateLength);
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/redis/clients/jedis/commands/Commands.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
Expand Down Expand Up @@ -513,6 +514,8 @@ default void restoreReplace(String key, int ttl, byte[] serializedValue) {

void xpending(String key, String groupname, StreamEntryID start, StreamEntryID end, int count, String consumername);

void xpending(String key, String groupname, XPendingParams params);

void xclaim(String key, String group, String consumername, long minIdleTime, long newIdleTime,
int retries, boolean force, StreamEntryID... ids);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
Expand Down Expand Up @@ -576,6 +577,16 @@ List<GeoRadiusResponse> georadiusByMemberReadonly(String key, String member, dou
*/
List<StreamPendingEntry> xpending(String key, String groupname, StreamEntryID start, StreamEntryID end, int count, String consumername);

/**
* XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
*
* @param key
* @param groupname
* @param params
* @return
*/
List<StreamPendingEntry> xpending(String key, String groupname, XPendingParams params);

/**
* XDEL key ID [ID ...]
* @param key
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/redis/clients/jedis/commands/JedisCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
Expand Down Expand Up @@ -580,6 +581,15 @@ List<GeoRadiusResponse> georadiusByMemberReadonly(String key, String member, dou
List<StreamPendingEntry> xpending(String key, String groupname, StreamEntryID start,
StreamEntryID end, int count, String consumername);

/**
* XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
*
* @param key
* @param groupname
* @param params
*/
List<StreamPendingEntry> xpending(String key, String groupname, XPendingParams params);

/**
* XDEL key ID [ID ...]
* @param key
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/redis/clients/jedis/commands/RedisPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
Expand Down Expand Up @@ -409,6 +410,8 @@ Response<List<GeoRadiusResponse>> georadiusByMemberReadonly(String key, String m
Response<List<StreamPendingEntry>> xpending(String key, String groupname,
StreamEntryID start, StreamEntryID end, int count, String consumername);

Response<List<StreamPendingEntry>> xpending(String key, String groupname, XPendingParams params);

Response<Long> xdel( String key, StreamEntryID... ids);

Response<Long> xtrim( String key, long maxLen, boolean approximateLength);
Expand Down
83 changes: 83 additions & 0 deletions src/main/java/redis/clients/jedis/params/XPendingParams.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package redis.clients.jedis.params;

import static redis.clients.jedis.Protocol.Keyword.IDLE;

import java.util.ArrayList;
import java.util.List;

import redis.clients.jedis.Protocol;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.util.SafeEncoder;

public class XPendingParams extends Params {

private Long idle;

private String consumer;

private StreamEntryID start;

private StreamEntryID end;

private Long count;

public static XPendingParams xPendingParams() {
return new XPendingParams();
}

public XPendingParams idle(long idle) {
this.idle = idle;
return this;
}

public XPendingParams start(StreamEntryID start) {
this.start = start;
return this;
}

public XPendingParams end(StreamEntryID end) {
this.end = end;
return this;
}

public XPendingParams count(long count) {
this.count = count;
return this;
}

public XPendingParams consumer(String consumer) {
this.consumer = consumer;
return this;
}

@Override
public byte[][] getByteParams() {
List<byte[]> byteParams = new ArrayList<>();

if (idle != null) {
byteParams.add(IDLE.getRaw());
byteParams.add(Protocol.toByteArray(idle));
}

if (start == null) {
byteParams.add(SafeEncoder.encode("-"));
} else {
byteParams.add(SafeEncoder.encode(start.toString()));
}

if (end == null) {
byteParams.add(SafeEncoder.encode("+"));
} else {
byteParams.add(SafeEncoder.encode(end.toString()));
}

if (count != null) {
byteParams.add(Protocol.toByteArray(count));
}

if (consumer != null) {
byteParams.add(SafeEncoder.encode(consumer));
}
return byteParams.toArray(new byte[byteParams.size()][]);
}
}
Loading