Skip to content

Commit

Permalink
Expose lastConsumedTimestamp and lastAckedTimestamp to consumer stats (
Browse files Browse the repository at this point in the history
…apache#6051)

---

Master Issue: apache#6046

*Motivation*

Make people can use the timestamp to tell if acknowledge and consumption
are happening.

*Modifications*

- Add lastConsumedTimestamp and lastAckedTimestamp to consume stats

*Verify this change*

- Pass the test `testConsumerStatsLastTimestamp`
  • Loading branch information
zymap authored and tuteng committed Feb 23, 2020
1 parent c6b7665 commit 1b151fe
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ public class Consumer {
private final Rate msgOut;
private final Rate msgRedeliver;

private long lastConsumedTimestamp;
private long lastAckedTimestamp;

// Represents how many messages we can safely send to the consumer without
// overflowing its receiving queue. The consumer will use Flow commands to
// increase its availability
Expand Down Expand Up @@ -188,6 +191,7 @@ public boolean readCompacted() {
*/
public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes batchSizes, int totalMessages,
long totalBytes, RedeliveryTracker redeliveryTracker) {
this.lastConsumedTimestamp = System.currentTimeMillis();
final ChannelHandlerContext ctx = cnx.ctx();
final ChannelPromise writePromise = ctx.newPromise();

Expand Down Expand Up @@ -335,6 +339,7 @@ void doUnsubscribe(final long requestId) {
}

void messageAcked(CommandAck ack) {
this.lastAckedTimestamp = System.currentTimeMillis();
Map<String,Long> properties = Collections.emptyMap();
if (ack.getPropertiesCount() > 0) {
properties = ack.getPropertiesList().stream()
Expand Down Expand Up @@ -450,6 +455,8 @@ public void updateRates() {
}

public ConsumerStats getStats() {
stats.lastAckedTimestamp = lastAckedTimestamp;
stats.lastConsumedTimestamp = lastConsumedTimestamp;
stats.availablePermits = getAvailablePermits();
stats.unackedMessages = unackedMessages;
stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class PersistentSubscription implements Subscription {
private PersistentMessageExpiryMonitor expiryMonitor;

private long lastExpireTimestamp = 0L;
private long lastConsumedFlowTimestamp = 0L;

// for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold
private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
Expand Down Expand Up @@ -315,6 +316,7 @@ public void deactivateCursor() {

@Override
public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
this.lastConsumedFlowTimestamp = System.currentTimeMillis();
dispatcher.consumerFlow(consumer, additionalNumberOfMessages);
}

Expand Down Expand Up @@ -935,6 +937,7 @@ public long estimateBacklogSize() {
public SubscriptionStats getStats() {
SubscriptionStats subStats = new SubscriptionStats();
subStats.lastExpireTimestamp = lastExpireTimestamp;
subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp;
Dispatcher dispatcher = this.dispatcher;
if (dispatcher != null) {
dispatcher.getConsumers().forEach(consumer -> {
Expand All @@ -944,6 +947,8 @@ public SubscriptionStats getStats() {
subStats.msgThroughputOut += consumerStats.msgThroughputOut;
subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
subStats.unackedMessages += consumerStats.unackedMessages;
subStats.lastConsumedTimestamp = Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp);
subStats.lastAckedTimestamp = Math.max(subStats.lastAckedTimestamp, consumerStats.lastAckedTimestamp);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
Expand All @@ -31,6 +32,7 @@
import com.google.common.collect.Sets;

import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -60,6 +62,8 @@
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicDomain;
Expand Down Expand Up @@ -948,4 +952,110 @@ public void testCreateNamespaceWithNoClusters() throws PulsarAdminException {
assertEquals(admin.namespaces().getNamespaceReplicationClusters(namespace),
Collections.singletonList(localCluster));
}

@Test(timeOut = 30000)
public void testConsumerStatsLastTimestamp() throws PulsarClientException, PulsarAdminException, InterruptedException {
long timestamp = System.currentTimeMillis();
final String topicName = "consumer-stats-" + timestamp;
final String subscribeName = topicName + "-test-stats-sub";
final String topic = "persistent://prop-xyz/ns1/" + topicName;
final String producerName = "producer-" + topicName;

@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
Producer<byte[]> producer = client.newProducer().topic(topic)
.enableBatching(false)
.producerName(producerName)
.create();

// a. Send a message to the topic.
producer.send("message-1".getBytes(StandardCharsets.UTF_8));

// b. Create a consumer, because there was a message in the topic, the consumer will receive the message pushed
// by the broker, the lastConsumedTimestamp will as the consume subscribe time.
Consumer<byte[]> consumer = client.newConsumer().topic(topic)
.subscriptionName(subscribeName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Message<byte[]> message = consumer.receive();

// Get the consumer stats.
TopicStats topicStats = admin.topics().getStats(topic);
SubscriptionStats subscriptionStats = topicStats.subscriptions.get(subscribeName);
long startConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
long startAckedTimestampInSubStats = subscriptionStats.lastAckedTimestamp;
ConsumerStats consumerStats = subscriptionStats.consumers.get(0);
long startConsumedTimestampInConsumerStats = consumerStats.lastConsumedTimestamp;
long startAckedTimestampInConsumerStats = consumerStats.lastAckedTimestamp;

// Because the message was pushed by the broker, the consumedTimestamp should not as 0.
assertNotEquals(0, startConsumedTimestampInConsumerStats);
// There is no consumer ack the message, so the lastAckedTimestamp still as 0.
assertEquals(0, startAckedTimestampInConsumerStats);
assertNotEquals(0, startConsumedFlowTimestamp);
assertEquals(0, startAckedTimestampInSubStats);

// c. The Consumer receives the message and acks the message.
consumer.acknowledge(message);
// Waiting for the ack command send to the broker.
while (true) {
topicStats = admin.topics().getStats(topic);
if (topicStats.subscriptions.get(subscribeName).lastAckedTimestamp != 0) {
break;
}
TimeUnit.MILLISECONDS.sleep(100);
}

// Get the consumer stats.
topicStats = admin.topics().getStats(topic);
subscriptionStats = topicStats.subscriptions.get(subscribeName);
long consumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
long ackedTimestampInSubStats = subscriptionStats.lastAckedTimestamp;
consumerStats = subscriptionStats.consumers.get(0);
long consumedTimestamp = consumerStats.lastConsumedTimestamp;
long ackedTimestamp = consumerStats.lastAckedTimestamp;

// The lastConsumedTimestamp should same as the last time because the broker does not push any messages and the
// consumer does not pull any messages.
assertEquals(startConsumedTimestampInConsumerStats, consumedTimestamp);
assertTrue(startAckedTimestampInConsumerStats < ackedTimestamp);
assertNotEquals(0, consumedFlowTimestamp);
assertTrue(startAckedTimestampInSubStats < ackedTimestampInSubStats);

// d. Send another messages. The lastConsumedTimestamp should be updated.
producer.send("message-2".getBytes(StandardCharsets.UTF_8));

// e. Receive the message and ack it.
message = consumer.receive();
consumer.acknowledge(message);
// Waiting for the ack command send to the broker.
while (true) {
topicStats = admin.topics().getStats(topic);
if (topicStats.subscriptions.get(subscribeName).lastAckedTimestamp != ackedTimestampInSubStats) {
break;
}
TimeUnit.MILLISECONDS.sleep(100);
}

// Get the consumer stats again.
topicStats = admin.topics().getStats(topic);
subscriptionStats = topicStats.subscriptions.get(subscribeName);
long lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
long lastConsumedTimestampInSubStats = subscriptionStats.lastConsumedTimestamp;
long lastAckedTimestampInSubStats = subscriptionStats.lastAckedTimestamp;
consumerStats = subscriptionStats.consumers.get(0);
long lastConsumedTimestamp = consumerStats.lastConsumedTimestamp;
long lastAckedTimestamp = consumerStats.lastAckedTimestamp;

assertTrue(consumedTimestamp < lastConsumedTimestamp);
assertTrue(ackedTimestamp < lastAckedTimestamp);
assertTrue(startConsumedTimestampInConsumerStats < lastConsumedTimestamp);
assertTrue(startAckedTimestampInConsumerStats < lastAckedTimestamp);
assertTrue(consumedFlowTimestamp == lastConsumedFlowTimestamp);
assertTrue(ackedTimestampInSubStats < lastAckedTimestampInSubStats);
assertEquals(lastConsumedTimestamp, lastConsumedTimestampInSubStats);

consumer.close();
producer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public class ConsumerStats {
private int clientVersionOffset = -1;
private int clientVersionLength;

public long lastAckedTimestamp;
public long lastConsumedTimestamp;

/** Metadata (key/value strings) associated with this consumer. */
public Map<String, String> metadata;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ public class SubscriptionStats {
/** Last message expire execution timestamp. */
public long lastExpireTimestamp;

/** Last received consume flow command timestamp. */
public long lastConsumedFlowTimestamp;

/** Last consume message timestamp. */
public long lastConsumedTimestamp;

/** Last acked message timestamp. */
public long lastAckedTimestamp;

/** List of connected consumers on this subscription w/ their stats. */
public List<ConsumerStats> consumers;

Expand Down
12 changes: 12 additions & 0 deletions site2/docs/admin-api-persistent-topics.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ It shows current statistics of a given non-partitioned topic.
- **type**: This subscription type

- **msgRateExpired**: The rate at which messages were discarded instead of dispatched from this subscription due to TTL

- **lastExpireTimestamp**: The last message expire execution timestamp

- **lastConsumedFlowTimestamp**: The last flow command received timestamp

- **lastConsumedTimestamp**: The latest timestamp of all the consumed timestamp of the consumers

- **lastAckedTimestamp**: The latest timestamp of all the acked timestamp of the consumers

- **consumers**: The list of connected consumers for this subscription

Expand All @@ -236,6 +244,10 @@ It shows current statistics of a given non-partitioned topic.
- **unackedMessages**: Number of unacknowledged messages for the consumer

- **blockedConsumerOnUnackedMsgs**: Flag to verify if the consumer is blocked due to reaching threshold of unacked messages
- **lastConsumedTimestamp**: The timestamp of the consumer last consume a message
- **lastAckedTimestamp**: The timestamp of the consumer last ack a message

- **replication**: This section gives the stats for cross-colo replication of this topic

Expand Down

0 comments on commit 1b151fe

Please sign in to comment.