diff --git a/src/main/java/com/rabbitmq/stream/StreamStats.java b/src/main/java/com/rabbitmq/stream/StreamStats.java index bbcdc2d39b..c9e5579826 100644 --- a/src/main/java/com/rabbitmq/stream/StreamStats.java +++ b/src/main/java/com/rabbitmq/stream/StreamStats.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2025 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2026 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -41,9 +41,25 @@ public interface StreamStats { * given time. The value can be stale as soon as the application reads it though, as the committed * chunk ID for a stream that is published to changes all the time. * + *
Use {@link #committedChunkId()} instead if the last offset in the stream is needed. + * * @return committed offset in this stream * @see Context#committedChunkId() + * @see #committedOffset() * @throws NoOffsetException if there is no committed chunk yet */ long committedChunkId(); + + /** + * The offset of the last committed message in the stream. + * + *
The value can be stale as soon as the application reads it though, as the last committed + * offset for a stream that is published to changes all the time. + * + *
Requires RabbitMQ 4.3 or more. + * + * @return last committed offset + * @throws NoOffsetException if there is no committed message yet or if the value is not available + */ + long committedOffset(); } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index 6b2143aa75..f56cee8d6e 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2025 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2026 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -557,10 +557,14 @@ public StreamStats queryStreamStats(String stream) { }; LongSupplier firstOffsetSupplier = offsetSupplierLogic.apply("first_chunk_id", "No first offset for stream " + stream); - LongSupplier committedOffsetSupplier = + LongSupplier committedChunkIdSupplier = offsetSupplierLogic.apply( "committed_chunk_id", "No committed chunk ID for stream " + stream); - return new DefaultStreamStats(firstOffsetSupplier, committedOffsetSupplier); + LongSupplier committedOffsetSupplier = + offsetSupplierLogic.apply("committed_offset", "No committed offset for stream " + stream); + + return new DefaultStreamStats( + firstOffsetSupplier, committedChunkIdSupplier, committedOffsetSupplier); } else { throw convertCodeToException( response.getResponseCode(), @@ -629,22 +633,32 @@ public boolean streamExists(String stream) { private static class DefaultStreamStats implements StreamStats { - private final LongSupplier firstOffsetSupplier, committedOffsetSupplier; + private final LongSupplier firstOffsetSupplier, + committedChunkIdSupplier, + committedOffsetSupplier; private DefaultStreamStats( - LongSupplier firstOffsetSupplier, LongSupplier committedOffsetSupplier) { + LongSupplier firstOffsetSupplier, + LongSupplier committedChunkIdSupplier, + LongSupplier committedOffsetSupplier) { this.firstOffsetSupplier = firstOffsetSupplier; + this.committedChunkIdSupplier = committedChunkIdSupplier; this.committedOffsetSupplier = committedOffsetSupplier; } @Override public long firstOffset() { - return firstOffsetSupplier.getAsLong(); + return this.firstOffsetSupplier.getAsLong(); } @Override public long committedChunkId() { - return committedOffsetSupplier.getAsLong(); + return this.committedChunkIdSupplier.getAsLong(); + } + + @Override + public long committedOffset() { + return this.committedOffsetSupplier.getAsLong(); } @Override @@ -652,8 +666,10 @@ public String toString() { return "StreamStats{" + "firstOffset=" + firstOffset() - + ", committedOffset=" + + ", committedChunkId=" + committedChunkId() + + ", committedOffset=" + + committedOffset() + '}'; } } diff --git a/src/test/java/com/rabbitmq/stream/impl/ClientTest.java b/src/test/java/com/rabbitmq/stream/impl/ClientTest.java index 4d78b30b4d..9351bf5b09 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ClientTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ClientTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2025 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2026 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -14,6 +14,7 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; +import static com.rabbitmq.stream.impl.TestUtils.BrokerVersion.RABBITMQ_4_3_0; import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.ko; import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.ok; import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.responseCode; @@ -94,6 +95,7 @@ public class ClientTest { String stream; TestUtils.ClientFactory cf; + String brokerVersion; static boolean await(CountDownLatch latch, Duration timeout) { try { @@ -907,10 +909,10 @@ void streamStatsShouldReturnFirstOffsetAndCommittedOffset() throws Exception { int publishCount = 20_000; CountDownLatch latch = new CountDownLatch(publishCount); - AtomicLong committedOffset = new AtomicLong(); + AtomicLong committedChunkId = new AtomicLong(); Client.MessageListener messageListener = - (corr, offset, chkTimestamp, committedOfft, chunkContext, message) -> { - committedOffset.set(committedOfft); + (corr, offset, chkTimestamp, committedChkId, chunkContext, message) -> { + committedChunkId.set(committedChkId); latch.countDown(); }; Client client = @@ -920,11 +922,22 @@ void streamStatsShouldReturnFirstOffsetAndCommittedOffset() throws Exception { .messageListener(messageListener)); StreamStatsResponse response = client.streamStats(stream); assertThat(response.getInfo()).containsEntry("first_chunk_id", -1L); + assertThat(response.getInfo()).containsEntry("last_chunk_id", -1L); assertThat(response.getInfo()).containsEntry("committed_chunk_id", -1L); + + if (brokerVersion43Ormore()) { + assertThat(response.getInfo()).containsEntry("committed_offset", -1L); + } + TestUtils.publishAndWaitForConfirms(cf, publishCount, stream); response = client.streamStats(stream); assertThat(response.getInfo()).containsEntry("first_chunk_id", 0L); - assertThat(response.getInfo().get("committed_chunk_id")).isNotEqualTo(-1L); + assertThat(response.getInfo().get("last_chunk_id")).isPositive(); + assertThat(response.getInfo().get("committed_chunk_id")).isPositive(); + + if (brokerVersion43Ormore()) { + assertThat(response.getInfo()).containsEntry("committed_offset", (long) (publishCount - 1)); + } client.exchangeCommandVersions(); @@ -933,8 +946,8 @@ void streamStatsShouldReturnFirstOffsetAndCommittedOffset() throws Exception { assertThat(subscribeResponse.isOk()).isTrue(); assertThat(latch.await(10, SECONDS)).isTrue(); - assertThat(committedOffset.get()).isPositive(); - assertThat(committedOffset).hasValue(response.getInfo().get("committed_chunk_id")); + assertThat(committedChunkId.get()).isPositive(); + assertThat(committedChunkId).hasValue(response.getInfo().get("committed_chunk_id")); } @Test @@ -1117,4 +1130,8 @@ public int fragmentLength(Object obj) { // we should get messages only from the "second" part of the stream assertThat(consumedMessageCount).hasValueLessThan(messageCount * 2); } + + private boolean brokerVersion43Ormore() { + return TestUtils.atLeastVersion(RABBITMQ_4_3_0.version(), brokerVersion); + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java index 6388fc890c..0ab5b9c2c9 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2025 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2026 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -16,6 +16,7 @@ import static com.rabbitmq.stream.Cli.listLocatorConnections; import static com.rabbitmq.stream.impl.Assertions.assertThat; +import static com.rabbitmq.stream.impl.TestUtils.BrokerVersion.RABBITMQ_4_3_0; import static com.rabbitmq.stream.impl.TestUtils.CountDownLatchConditions.completed; import static com.rabbitmq.stream.impl.TestUtils.ExceptionConditions.responseCode; import static com.rabbitmq.stream.impl.TestUtils.latchAssert; @@ -111,6 +112,7 @@ public class StreamEnvironmentTest { String stream; TestUtils.ClientFactory cf; EventLoopGroup eventLoopGroup; + String brokerVersion; @BeforeEach void init() { @@ -636,6 +638,7 @@ void queryStreamStatsShouldReturnFirstOffsetAndCommittedOffset(boolean lazyInit) StreamStats stats = env.queryStreamStats(stream); assertThatThrownBy(stats::firstOffset).isInstanceOf(NoOffsetException.class); assertThatThrownBy(stats::committedChunkId).isInstanceOf(NoOffsetException.class); + assertThatThrownBy(stats::committedOffset).isInstanceOf(NoOffsetException.class); int publishCount = 20_000; TestUtils.publishAndWaitForConfirms(cf, publishCount, stream); @@ -643,6 +646,11 @@ void queryStreamStatsShouldReturnFirstOffsetAndCommittedOffset(boolean lazyInit) StreamStats stats2 = env.queryStreamStats(stream); assertThat(stats2.firstOffset()).isZero(); assertThat(stats2.committedChunkId()).isPositive(); + if (brokerVersion43Ormore()) { + assertThat(stats2.committedOffset()).isEqualTo(publishCount - 1); + } else { + assertThatThrownBy(stats::committedOffset).isInstanceOf(NoOffsetException.class); + } CountDownLatch latch = new CountDownLatch(publishCount); AtomicLong committedChunkId = new AtomicLong(); @@ -880,4 +888,8 @@ private void nativeIo(IoHandlerFactory ioHandlerFactory, Class extends Channel epollEventLoopGroup.shutdownGracefully(0, 0, SECONDS); } } + + private boolean brokerVersion43Ormore() { + return TestUtils.atLeastVersion(RABBITMQ_4_3_0.version(), brokerVersion); + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index 36a584b170..13062f30f5 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2025 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2026 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -1115,7 +1115,8 @@ public enum BrokerVersion { RABBITMQ_4_0_0("4.0.0"), RABBITMQ_4_1_2("4.1.2"), RABBITMQ_4_1_4("4.1.4"), - RABBITMQ_4_2_0("4.2.0"); + RABBITMQ_4_2_0("4.2.0"), + RABBITMQ_4_3_0("4.3.0"); final String value;