Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion src/main/java/com/rabbitmq/stream/StreamStats.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2025 Broadcom. All Rights Reserved.
// Copyright (c) 2020-2026 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
Expand Down Expand Up @@ -41,9 +41,25 @@ public interface StreamStats {
* given time. The value can be stale as soon as the application reads it though, as the committed
* chunk ID for a stream that is published to changes all the time.
*
* <p>Use {@link #committedChunkId()} instead if the last offset in the stream is needed.
*
* @return committed offset in this stream
* @see Context#committedChunkId()
* @see #committedOffset()
* @throws NoOffsetException if there is no committed chunk yet
*/
long committedChunkId();

/**
* The offset of the last committed message in the stream.
*
* <p>The value can be stale as soon as the application reads it though, as the last committed
* offset for a stream that is published to changes all the time.
*
* <p>Requires RabbitMQ 4.3 or more.
*
* @return last committed offset
* @throws NoOffsetException if there is no committed message yet or if the value is not available
*/
long committedOffset();
}
32 changes: 24 additions & 8 deletions src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2025 Broadcom. All Rights Reserved.
// Copyright (c) 2020-2026 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
Expand Down Expand Up @@ -557,10 +557,14 @@ public StreamStats queryStreamStats(String stream) {
};
LongSupplier firstOffsetSupplier =
offsetSupplierLogic.apply("first_chunk_id", "No first offset for stream " + stream);
LongSupplier committedOffsetSupplier =
LongSupplier committedChunkIdSupplier =
offsetSupplierLogic.apply(
"committed_chunk_id", "No committed chunk ID for stream " + stream);
return new DefaultStreamStats(firstOffsetSupplier, committedOffsetSupplier);
LongSupplier committedOffsetSupplier =
offsetSupplierLogic.apply("committed_offset", "No committed offset for stream " + stream);

return new DefaultStreamStats(
firstOffsetSupplier, committedChunkIdSupplier, committedOffsetSupplier);
} else {
throw convertCodeToException(
response.getResponseCode(),
Expand Down Expand Up @@ -629,31 +633,43 @@ public boolean streamExists(String stream) {

private static class DefaultStreamStats implements StreamStats {

private final LongSupplier firstOffsetSupplier, committedOffsetSupplier;
private final LongSupplier firstOffsetSupplier,
committedChunkIdSupplier,
committedOffsetSupplier;

private DefaultStreamStats(
LongSupplier firstOffsetSupplier, LongSupplier committedOffsetSupplier) {
LongSupplier firstOffsetSupplier,
LongSupplier committedChunkIdSupplier,
LongSupplier committedOffsetSupplier) {
this.firstOffsetSupplier = firstOffsetSupplier;
this.committedChunkIdSupplier = committedChunkIdSupplier;
this.committedOffsetSupplier = committedOffsetSupplier;
}

@Override
public long firstOffset() {
return firstOffsetSupplier.getAsLong();
return this.firstOffsetSupplier.getAsLong();
}

@Override
public long committedChunkId() {
return committedOffsetSupplier.getAsLong();
return this.committedChunkIdSupplier.getAsLong();
}

@Override
public long committedOffset() {
return this.committedOffsetSupplier.getAsLong();
}

@Override
public String toString() {
return "StreamStats{"
+ "firstOffset="
+ firstOffset()
+ ", committedOffset="
+ ", committedChunkId="
+ committedChunkId()
+ ", committedOffset="
+ committedOffset()
+ '}';
}
}
Expand Down
31 changes: 24 additions & 7 deletions src/test/java/com/rabbitmq/stream/impl/ClientTest.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2025 Broadcom. All Rights Reserved.
// Copyright (c) 2020-2026 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
Expand All @@ -14,6 +14,7 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;

import static com.rabbitmq.stream.impl.TestUtils.BrokerVersion.RABBITMQ_4_3_0;
import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.ko;
import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.ok;
import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.responseCode;
Expand Down Expand Up @@ -94,6 +95,7 @@ public class ClientTest {

String stream;
TestUtils.ClientFactory cf;
String brokerVersion;

static boolean await(CountDownLatch latch, Duration timeout) {
try {
Expand Down Expand Up @@ -907,10 +909,10 @@ void streamStatsShouldReturnFirstOffsetAndCommittedOffset() throws Exception {
int publishCount = 20_000;
CountDownLatch latch = new CountDownLatch(publishCount);

AtomicLong committedOffset = new AtomicLong();
AtomicLong committedChunkId = new AtomicLong();
Client.MessageListener messageListener =
(corr, offset, chkTimestamp, committedOfft, chunkContext, message) -> {
committedOffset.set(committedOfft);
(corr, offset, chkTimestamp, committedChkId, chunkContext, message) -> {
committedChunkId.set(committedChkId);
latch.countDown();
};
Client client =
Expand All @@ -920,11 +922,22 @@ void streamStatsShouldReturnFirstOffsetAndCommittedOffset() throws Exception {
.messageListener(messageListener));
StreamStatsResponse response = client.streamStats(stream);
assertThat(response.getInfo()).containsEntry("first_chunk_id", -1L);
assertThat(response.getInfo()).containsEntry("last_chunk_id", -1L);
assertThat(response.getInfo()).containsEntry("committed_chunk_id", -1L);

if (brokerVersion43Ormore()) {
assertThat(response.getInfo()).containsEntry("committed_offset", -1L);
}

TestUtils.publishAndWaitForConfirms(cf, publishCount, stream);
response = client.streamStats(stream);
assertThat(response.getInfo()).containsEntry("first_chunk_id", 0L);
assertThat(response.getInfo().get("committed_chunk_id")).isNotEqualTo(-1L);
assertThat(response.getInfo().get("last_chunk_id")).isPositive();
assertThat(response.getInfo().get("committed_chunk_id")).isPositive();

if (brokerVersion43Ormore()) {
assertThat(response.getInfo()).containsEntry("committed_offset", (long) (publishCount - 1));
}

client.exchangeCommandVersions();

Expand All @@ -933,8 +946,8 @@ void streamStatsShouldReturnFirstOffsetAndCommittedOffset() throws Exception {
assertThat(subscribeResponse.isOk()).isTrue();

assertThat(latch.await(10, SECONDS)).isTrue();
assertThat(committedOffset.get()).isPositive();
assertThat(committedOffset).hasValue(response.getInfo().get("committed_chunk_id"));
assertThat(committedChunkId.get()).isPositive();
assertThat(committedChunkId).hasValue(response.getInfo().get("committed_chunk_id"));
}

@Test
Expand Down Expand Up @@ -1117,4 +1130,8 @@ public int fragmentLength(Object obj) {
// we should get messages only from the "second" part of the stream
assertThat(consumedMessageCount).hasValueLessThan(messageCount * 2);
}

private boolean brokerVersion43Ormore() {
return TestUtils.atLeastVersion(RABBITMQ_4_3_0.version(), brokerVersion);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2025 Broadcom. All Rights Reserved.
// Copyright (c) 2020-2026 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
Expand All @@ -16,6 +16,7 @@

import static com.rabbitmq.stream.Cli.listLocatorConnections;
import static com.rabbitmq.stream.impl.Assertions.assertThat;
import static com.rabbitmq.stream.impl.TestUtils.BrokerVersion.RABBITMQ_4_3_0;
import static com.rabbitmq.stream.impl.TestUtils.CountDownLatchConditions.completed;
import static com.rabbitmq.stream.impl.TestUtils.ExceptionConditions.responseCode;
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
Expand Down Expand Up @@ -111,6 +112,7 @@ public class StreamEnvironmentTest {
String stream;
TestUtils.ClientFactory cf;
EventLoopGroup eventLoopGroup;
String brokerVersion;

@BeforeEach
void init() {
Expand Down Expand Up @@ -636,13 +638,19 @@ void queryStreamStatsShouldReturnFirstOffsetAndCommittedOffset(boolean lazyInit)
StreamStats stats = env.queryStreamStats(stream);
assertThatThrownBy(stats::firstOffset).isInstanceOf(NoOffsetException.class);
assertThatThrownBy(stats::committedChunkId).isInstanceOf(NoOffsetException.class);
assertThatThrownBy(stats::committedOffset).isInstanceOf(NoOffsetException.class);

int publishCount = 20_000;
TestUtils.publishAndWaitForConfirms(cf, publishCount, stream);

StreamStats stats2 = env.queryStreamStats(stream);
assertThat(stats2.firstOffset()).isZero();
assertThat(stats2.committedChunkId()).isPositive();
if (brokerVersion43Ormore()) {
assertThat(stats2.committedOffset()).isEqualTo(publishCount - 1);
} else {
assertThatThrownBy(stats::committedOffset).isInstanceOf(NoOffsetException.class);
}

CountDownLatch latch = new CountDownLatch(publishCount);
AtomicLong committedChunkId = new AtomicLong();
Expand Down Expand Up @@ -880,4 +888,8 @@ private void nativeIo(IoHandlerFactory ioHandlerFactory, Class<? extends Channel
epollEventLoopGroup.shutdownGracefully(0, 0, SECONDS);
}
}

private boolean brokerVersion43Ormore() {
return TestUtils.atLeastVersion(RABBITMQ_4_3_0.version(), brokerVersion);
}
}
5 changes: 3 additions & 2 deletions src/test/java/com/rabbitmq/stream/impl/TestUtils.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2025 Broadcom. All Rights Reserved.
// Copyright (c) 2020-2026 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
Expand Down Expand Up @@ -1115,7 +1115,8 @@ public enum BrokerVersion {
RABBITMQ_4_0_0("4.0.0"),
RABBITMQ_4_1_2("4.1.2"),
RABBITMQ_4_1_4("4.1.4"),
RABBITMQ_4_2_0("4.2.0");
RABBITMQ_4_2_0("4.2.0"),
RABBITMQ_4_3_0("4.3.0");

final String value;

Expand Down
Loading