Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,42 @@ public List<StreamEntryID> build(Object data) {
}
};

public static final Builder<StreamTrimResult> STREAM_ENTRY_DELETION_RESULT = new Builder<StreamTrimResult>() {
@Override
public StreamTrimResult build(Object data) {
if (data == null) {
return null;
}
return StreamTrimResult.fromLong((Long) data);
}

@Override
public String toString() {
return "StreamEntryDeletionResult";
}
};

public static final Builder<List<StreamTrimResult>> STREAM_ENTRY_DELETION_RESULT_LIST = new Builder<List<StreamTrimResult>>() {
@Override
@SuppressWarnings("unchecked")
public List<StreamTrimResult> build(Object data) {
if (data == null) {
return null;
}
List<Object> objectList = (List<Object>) data;
List<StreamTrimResult> responses = new ArrayList<>(objectList.size());
for (Object object : objectList) {
responses.add(STREAM_ENTRY_DELETION_RESULT.build(object));
}
return responses;
}

@Override
public String toString() {
return "List<StreamEntryDeletionResult>";
}
};

public static final Builder<StreamEntry> STREAM_ENTRY = new Builder<StreamEntry>() {
@Override
@SuppressWarnings("unchecked")
Expand Down
32 changes: 32 additions & 0 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -2626,10 +2626,26 @@ public final CommandObject<Long> xack(String key, String group, StreamEntryID...
return new CommandObject<>(commandArguments(XACK).key(key).add(group).addObjects((Object[]) ids), BuilderFactory.LONG);
}

public final CommandObject<List<StreamTrimResult>> xackdel(String key, String group, StreamEntryID... ids) {
return new CommandObject<>(commandArguments(XACKDEL).key(key).add(group).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
}

public final CommandObject<List<StreamTrimResult>> xackdel(String key, String group, StreamTrimMode trimMode, StreamEntryID... ids) {
return new CommandObject<>(commandArguments(XACKDEL).key(key).add(group).add(trimMode).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
}

public final CommandObject<Long> xack(byte[] key, byte[] group, byte[]... ids) {
return new CommandObject<>(commandArguments(XACK).key(key).add(group).addObjects((Object[]) ids), BuilderFactory.LONG);
}

public final CommandObject<List<StreamTrimResult>> xackdel(byte[] key, byte[] group, byte[]... ids) {
return new CommandObject<>(commandArguments(XACKDEL).key(key).add(group).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
}

public final CommandObject<List<StreamTrimResult>> xackdel(byte[] key, byte[] group, StreamTrimMode trimMode, byte[]... ids) {
return new CommandObject<>(commandArguments(XACKDEL).key(key).add(group).add(trimMode).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
}

public final CommandObject<String> xgroupCreate(String key, String groupName, StreamEntryID id, boolean makeStream) {
CommandArguments args = commandArguments(XGROUP).add(CREATE).key(key)
.add(groupName).add(id == null ? "0-0" : id);
Expand Down Expand Up @@ -2687,6 +2703,14 @@ public final CommandObject<Long> xdel(String key, StreamEntryID... ids) {
return new CommandObject<>(commandArguments(XDEL).key(key).addObjects((Object[]) ids), BuilderFactory.LONG);
}

public final CommandObject<List<StreamTrimResult>> xdelex(String key, StreamEntryID... ids) {
return new CommandObject<>(commandArguments(XDELEX).key(key).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
}

public final CommandObject<List<StreamTrimResult>> xdelex(String key, StreamTrimMode trimMode, StreamEntryID... ids) {
return new CommandObject<>(commandArguments(XDELEX).key(key).add(trimMode).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
}

public final CommandObject<Long> xtrim(String key, long maxLen, boolean approximate) {
CommandArguments args = commandArguments(XTRIM).key(key).add(MAXLEN);
if (approximate) args.add(Protocol.BYTES_TILDE);
Expand All @@ -2702,6 +2726,14 @@ public final CommandObject<Long> xdel(byte[] key, byte[]... ids) {
return new CommandObject<>(commandArguments(XDEL).key(key).addObjects((Object[]) ids), BuilderFactory.LONG);
}

public final CommandObject<List<StreamTrimResult>> xdelex(byte[] key, byte[]... ids) {
return new CommandObject<>(commandArguments(XDELEX).key(key).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
}

public final CommandObject<List<StreamTrimResult>> xdelex(byte[] key, StreamTrimMode trimMode, byte[]... ids) {
return new CommandObject<>(commandArguments(XDELEX).key(key).add(trimMode).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
}

public final CommandObject<Long> xtrim(byte[] key, long maxLen, boolean approximateLength) {
CommandArguments args = commandArguments(XTRIM).key(key).add(MAXLEN);
if (approximateLength) args.add(Protocol.BYTES_TILDE);
Expand Down
48 changes: 48 additions & 0 deletions src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4866,6 +4866,18 @@ public long xack(byte[] key, byte[] group, byte[]... ids) {
return connection.executeCommand(commandObjects.xack(key, group, ids));
}

@Override
public List<StreamTrimResult> xackdel(byte[] key, byte[] group, byte[]... ids) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xackdel(key, group, ids));
}

@Override
public List<StreamTrimResult> xackdel(byte[] key, byte[] group, StreamTrimMode trimMode, byte[]... ids) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xackdel(key, group, trimMode, ids));
}

@Override
public String xgroupCreate(byte[] key, byte[] consumer, byte[] id, boolean makeStream) {
checkIsInMultiOrPipeline();
Expand Down Expand Up @@ -4902,6 +4914,18 @@ public long xdel(byte[] key, byte[]... ids) {
return connection.executeCommand(commandObjects.xdel(key, ids));
}

@Override
public List<StreamTrimResult> xdelex(byte[] key, byte[]... ids) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xdelex(key, ids));
}

@Override
public List<StreamTrimResult> xdelex(byte[] key, StreamTrimMode trimMode, byte[]... ids) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xdelex(key, trimMode, ids));
}

@Override
public long xtrim(byte[] key, long maxLen, boolean approximateLength) {
checkIsInMultiOrPipeline();
Expand Down Expand Up @@ -9677,6 +9701,18 @@ public long xack(final String key, final String group, final StreamEntryID... id
return connection.executeCommand(commandObjects.xack(key, group, ids));
}

@Override
public List<StreamTrimResult> xackdel(final String key, final String group, final StreamEntryID... ids) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xackdel(key, group, ids));
}

@Override
public List<StreamTrimResult> xackdel(final String key, final String group, final StreamTrimMode trimMode, final StreamEntryID... ids) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xackdel(key, group, trimMode, ids));
}

@Override
public String xgroupCreate(final String key, final String groupName, final StreamEntryID id,
final boolean makeStream) {
Expand Down Expand Up @@ -9714,6 +9750,18 @@ public long xdel(final String key, final StreamEntryID... ids) {
return connection.executeCommand(commandObjects.xdel(key, ids));
}

@Override
public List<StreamTrimResult> xdelex(final String key, final StreamEntryID... ids) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xdelex(key, ids));
}

@Override
public List<StreamTrimResult> xdelex(final String key, final StreamTrimMode trimMode, final StreamEntryID... ids) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xdelex(key, trimMode, ids));
}

@Override
public long xtrim(final String key, final long maxLen, final boolean approximateLength) {
checkIsInMultiOrPipeline();
Expand Down
40 changes: 40 additions & 0 deletions src/main/java/redis/clients/jedis/PipeliningBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -1552,6 +1552,16 @@ public Response<Long> xack(String key, String group, StreamEntryID... ids) {
return appendCommand(commandObjects.xack(key, group, ids));
}

@Override
public Response<List<StreamTrimResult>> xackdel(String key, String group, StreamEntryID... ids) {
return appendCommand(commandObjects.xackdel(key, group, ids));
}

@Override
public Response<List<StreamTrimResult>> xackdel(String key, String group, StreamTrimMode trimMode, StreamEntryID... ids) {
return appendCommand(commandObjects.xackdel(key, group, trimMode, ids));
}

@Override
public Response<String> xgroupCreate(String key, String groupName, StreamEntryID id, boolean makeStream) {
return appendCommand(commandObjects.xgroupCreate(key, groupName, id, makeStream));
Expand Down Expand Up @@ -1592,6 +1602,16 @@ public Response<Long> xdel(String key, StreamEntryID... ids) {
return appendCommand(commandObjects.xdel(key, ids));
}

@Override
public Response<List<StreamTrimResult>> xdelex(String key, StreamEntryID... ids) {
return appendCommand(commandObjects.xdelex(key, ids));
}

@Override
public Response<List<StreamTrimResult>> xdelex(String key, StreamTrimMode trimMode, StreamEntryID... ids) {
return appendCommand(commandObjects.xdelex(key, trimMode, ids));
}

@Override
public Response<Long> xtrim(String key, long maxLen, boolean approximate) {
return appendCommand(commandObjects.xtrim(key, maxLen, approximate));
Expand Down Expand Up @@ -3264,6 +3284,16 @@ public Response<Long> xack(byte[] key, byte[] group, byte[]... ids) {
return appendCommand(commandObjects.xack(key, group, ids));
}

@Override
public Response<List<StreamTrimResult>> xackdel(byte[] key, byte[] group, byte[]... ids) {
return appendCommand(commandObjects.xackdel(key, group, ids));
}

@Override
public Response<List<StreamTrimResult>> xackdel(byte[] key, byte[] group, StreamTrimMode trimMode, byte[]... ids) {
return appendCommand(commandObjects.xackdel(key, group, trimMode, ids));
}

@Override
public Response<String> xgroupCreate(byte[] key, byte[] groupName, byte[] id, boolean makeStream) {
return appendCommand(commandObjects.xgroupCreate(key, groupName, id, makeStream));
Expand Down Expand Up @@ -3294,6 +3324,16 @@ public Response<Long> xdel(byte[] key, byte[]... ids) {
return appendCommand(commandObjects.xdel(key, ids));
}

@Override
public Response<List<StreamTrimResult>> xdelex(byte[] key, byte[]... ids) {
return appendCommand(commandObjects.xdelex(key, ids));
}

@Override
public Response<List<StreamTrimResult>> xdelex(byte[] key, StreamTrimMode trimMode, byte[]... ids) {
return appendCommand(commandObjects.xdelex(key, trimMode, ids));
}

@Override
public Response<Long> xtrim(byte[] key, long maxLen, boolean approximateLength) {
return appendCommand(commandObjects.xtrim(key, maxLen, approximateLength));
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public static enum Command implements ProtocolCommand {
GEORADIUSBYMEMBER, GEORADIUSBYMEMBER_RO, // <-- geo
PFADD, PFCOUNT, PFMERGE, // <-- hyper log log
XADD, XLEN, XDEL, XTRIM, XRANGE, XREVRANGE, XREAD, XACK, XGROUP, XREADGROUP, XPENDING, XCLAIM,
XAUTOCLAIM, XINFO, // <-- stream
XAUTOCLAIM, XINFO, XDELEX, XACKDEL, // <-- stream
EVAL, EVALSHA, SCRIPT, EVAL_RO, EVALSHA_RO, FUNCTION, FCALL, FCALL_RO, // <-- program
SUBSCRIBE, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, PUBLISH, PUBSUB,
SSUBSCRIBE, SUNSUBSCRIBE, SPUBLISH, // <-- pub sub
Expand Down
40 changes: 40 additions & 0 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -3201,6 +3201,16 @@ public long xack(String key, String group, StreamEntryID... ids) {
return executeCommand(commandObjects.xack(key, group, ids));
}

@Override
public List<StreamTrimResult> xackdel(String key, String group, StreamEntryID... ids) {
return executeCommand(commandObjects.xackdel(key, group, ids));
}

@Override
public List<StreamTrimResult> xackdel(String key, String group, StreamTrimMode trimMode, StreamEntryID... ids) {
return executeCommand(commandObjects.xackdel(key, group, trimMode, ids));
}

@Override
public String xgroupCreate(String key, String groupName, StreamEntryID id, boolean makeStream) {
return executeCommand(commandObjects.xgroupCreate(key, groupName, id, makeStream));
Expand Down Expand Up @@ -3241,6 +3251,16 @@ public long xdel(String key, StreamEntryID... ids) {
return executeCommand(commandObjects.xdel(key, ids));
}

@Override
public List<StreamTrimResult> xdelex(String key, StreamEntryID... ids) {
return executeCommand(commandObjects.xdelex(key, ids));
}

@Override
public List<StreamTrimResult> xdelex(String key, StreamTrimMode trimMode, StreamEntryID... ids) {
return executeCommand(commandObjects.xdelex(key, trimMode, ids));
}

@Override
public long xtrim(String key, long maxLen, boolean approximate) {
return executeCommand(commandObjects.xtrim(key, maxLen, approximate));
Expand Down Expand Up @@ -3356,6 +3376,16 @@ public long xack(byte[] key, byte[] group, byte[]... ids) {
return executeCommand(commandObjects.xack(key, group, ids));
}

@Override
public List<StreamTrimResult> xackdel(byte[] key, byte[] group, byte[]... ids) {
return executeCommand(commandObjects.xackdel(key, group, ids));
}

@Override
public List<StreamTrimResult> xackdel(byte[] key, byte[] group, StreamTrimMode trimMode, byte[]... ids) {
return executeCommand(commandObjects.xackdel(key, group, trimMode, ids));
}

@Override
public String xgroupCreate(byte[] key, byte[] groupName, byte[] id, boolean makeStream) {
return executeCommand(commandObjects.xgroupCreate(key, groupName, id, makeStream));
Expand Down Expand Up @@ -3386,6 +3416,16 @@ public long xdel(byte[] key, byte[]... ids) {
return executeCommand(commandObjects.xdel(key, ids));
}

@Override
public List<StreamTrimResult> xdelex(byte[] key, byte[]... ids) {
return executeCommand(commandObjects.xdelex(key, ids));
}

@Override
public List<StreamTrimResult> xdelex(byte[] key, StreamTrimMode trimMode, byte[]... ids) {
return executeCommand(commandObjects.xdelex(key, trimMode, ids));
}

@Override
public long xtrim(byte[] key, long maxLen, boolean approximateLength) {
return executeCommand(commandObjects.xtrim(key, maxLen, approximateLength));
Expand Down
38 changes: 38 additions & 0 deletions src/main/java/redis/clients/jedis/args/StreamTrimMode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package redis.clients.jedis.args;

import redis.clients.jedis.util.SafeEncoder;

/**
* Trim strategy for stream commands that handle consumer group references.
* Used with XDELEX, XACKDEL, and enhanced XADD/XTRIM commands.
*/
public enum StreamTrimMode implements Rawable {

/**
* Preserves existing references to entries in all consumer groups' PEL.
* This is the default behavior similar to XDEL.
*/
KEEP_REFERENCES("KEEPREF"),

/**
* Removes all references to entries from all consumer groups' pending entry lists,
* effectively cleaning up all traces of the messages.
*/
DELETE_REFERENCES("DELREF"),

/**
* Only operates on entries that were read and acknowledged by all consumer groups.
*/
ACKNOWLEDGED("ACKED");

private final byte[] raw;

StreamTrimMode(String redisParamName) {
raw = SafeEncoder.encode(redisParamName);
}

@Override
public byte[] getRaw() {
return raw;
}
}
Loading
Loading