Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose lastConsumedTimestamp and lastAckedTimestamp to consumer stats #6051

Merged
merged 3 commits into from
Feb 4, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ public class Consumer {
private final Rate msgOut;
private final Rate msgRedeliver;

private long lastConsumedTimestamp;
private long lastAckedTimestamp;

// Represents how many messages we can safely send to the consumer without
// overflowing its receiving queue. The consumer will use Flow commands to
// increase its availability
Expand Down Expand Up @@ -188,6 +191,7 @@ public boolean readCompacted() {
*/
public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes batchSizes, int totalMessages,
long totalBytes, RedeliveryTracker redeliveryTracker) {
this.lastConsumedTimestamp = System.currentTimeMillis();
final ChannelHandlerContext ctx = cnx.ctx();
final ChannelPromise writePromise = ctx.newPromise();

Expand Down Expand Up @@ -335,6 +339,7 @@ void doUnsubscribe(final long requestId) {
}

void messageAcked(CommandAck ack) {
this.lastAckedTimestamp = System.currentTimeMillis();
Map<String,Long> properties = Collections.emptyMap();
if (ack.getPropertiesCount() > 0) {
properties = ack.getPropertiesList().stream()
Expand Down Expand Up @@ -450,6 +455,8 @@ public void updateRates() {
}

public ConsumerStats getStats() {
stats.lastAckedTimestamp = lastAckedTimestamp;
stats.lastConsumedTimestamp = lastConsumedTimestamp;
stats.availablePermits = getAvailablePermits();
stats.unackedMessages = unackedMessages;
stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class PersistentSubscription implements Subscription {
private PersistentMessageExpiryMonitor expiryMonitor;

private long lastExpireTimestamp = 0L;
private long lastConsumedFlowTimestamp = 0L;

// for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold
private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
Expand Down Expand Up @@ -315,6 +316,7 @@ public void deactivateCursor() {

@Override
public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
this.lastConsumedFlowTimestamp = System.currentTimeMillis();
dispatcher.consumerFlow(consumer, additionalNumberOfMessages);
}

Expand Down Expand Up @@ -935,6 +937,7 @@ public long estimateBacklogSize() {
public SubscriptionStats getStats() {
SubscriptionStats subStats = new SubscriptionStats();
subStats.lastExpireTimestamp = lastExpireTimestamp;
subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp;
Dispatcher dispatcher = this.dispatcher;
if (dispatcher != null) {
dispatcher.getConsumers().forEach(consumer -> {
Expand All @@ -944,6 +947,8 @@ public SubscriptionStats getStats() {
subStats.msgThroughputOut += consumerStats.msgThroughputOut;
subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
subStats.unackedMessages += consumerStats.unackedMessages;
subStats.lastConsumedTimestamp = Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp);
subStats.lastAckedTimestamp = Math.max(subStats.lastAckedTimestamp, consumerStats.lastAckedTimestamp);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
Expand All @@ -31,6 +32,7 @@
import com.google.common.collect.Sets;

import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -60,6 +62,8 @@
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicDomain;
Expand Down Expand Up @@ -948,4 +952,98 @@ public void testCreateNamespaceWithNoClusters() throws PulsarAdminException {
assertEquals(admin.namespaces().getNamespaceReplicationClusters(namespace),
Collections.singletonList(localCluster));
}

@Test(timeOut = 30000)
public void testConsumerStatsLastTimestamp() throws PulsarClientException, PulsarAdminException, InterruptedException {
long timestamp = System.currentTimeMillis();
final String topicName = "consumer-stats-" + timestamp;
final String subscribeName = topicName + "-test-stats-sub";
final String topic = "persistent://prop-xyz/ns1/" + topicName;
final String producerName = "producer-" + topicName;

@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
Producer<byte[]> producer = client.newProducer().topic(topic)
.enableBatching(false)
.producerName(producerName)
.create();

// a. Send a message to the topic.
producer.send("message-1".getBytes(StandardCharsets.UTF_8));

// b. Create a consumer, because there was a message in the topic, the consumer will receive the message pushed
// by the broker, the lastConsumedTimestamp will as the consume subscribe time.
Consumer<byte[]> consumer = client.newConsumer().topic(topic)
.subscriptionName(subscribeName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

// Get the consumer stats.
TopicStats topicStats = admin.topics().getStats(topic);
zymap marked this conversation as resolved.
Show resolved Hide resolved
SubscriptionStats subscriptionStats = topicStats.subscriptions.get(subscribeName);
long startConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
long startAckedTimestampInSubStats = subscriptionStats.lastAckedTimestamp;
ConsumerStats consumerStats = subscriptionStats.consumers.get(0);
long startConsumedTimestampInConsumerStats = consumerStats.lastConsumedTimestamp;
long startAckedTimestampInConsumerStats = consumerStats.lastAckedTimestamp;

// Because the message was pushed by the broker, the consumedTimestamp should not as 0.
assertNotEquals(0, startConsumedTimestampInConsumerStats);
// There is no consumer ack the message, so the lastAckedTimestamp still as 0.
assertEquals(0, startAckedTimestampInConsumerStats);
assertNotEquals(0, startConsumedFlowTimestamp);
assertEquals(0, startAckedTimestampInSubStats);

// c. The Consumer receives the message and acks the message.
Message<byte[]> message = consumer.receive();
zymap marked this conversation as resolved.
Show resolved Hide resolved
consumer.acknowledge(message);
// Waiting for the ack command send to the broker.
TimeUnit.SECONDS.sleep(5);
zymap marked this conversation as resolved.
Show resolved Hide resolved

// Get the consumer stats.
topicStats = admin.topics().getStats(topic);
subscriptionStats = topicStats.subscriptions.get(subscribeName);
long consumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
long ackedTimestampInSubStats = subscriptionStats.lastAckedTimestamp;
consumerStats = subscriptionStats.consumers.get(0);
long consumedTimestamp = consumerStats.lastConsumedTimestamp;
long ackedTimestamp = consumerStats.lastAckedTimestamp;

// The lastConsumedTimestamp should same as the last time because the broker does not push any messages and the
// consumer does not pull any messages.
assertEquals(startConsumedTimestampInConsumerStats, consumedTimestamp);
assertTrue(startAckedTimestampInConsumerStats < ackedTimestamp);
assertNotEquals(0, consumedFlowTimestamp);
assertTrue(startAckedTimestampInSubStats < ackedTimestampInSubStats);

// d. Send another messages. The lastConsumedTimestamp should be updated.
producer.send("message-2".getBytes(StandardCharsets.UTF_8));

// e. Receive the message and ack it.
message = consumer.receive();
zymap marked this conversation as resolved.
Show resolved Hide resolved
consumer.acknowledge(message);
// Waiting for the ack command send to the broker.
TimeUnit.SECONDS.sleep(5);

// Get the consumer stats again.
topicStats = admin.topics().getStats(topic);
subscriptionStats = topicStats.subscriptions.get(subscribeName);
long lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
long lastConsumedTimestampInSubStats = subscriptionStats.lastConsumedTimestamp;
long lastAckedTimestampInSubStats = subscriptionStats.lastAckedTimestamp;
consumerStats = subscriptionStats.consumers.get(0);
long lastConsumedTimestamp = consumerStats.lastConsumedTimestamp;
long lastAckedTimestamp = consumerStats.lastAckedTimestamp;

assertTrue(consumedTimestamp < lastConsumedTimestamp);
assertTrue(ackedTimestamp < lastAckedTimestamp);
assertTrue(startConsumedTimestampInConsumerStats < lastConsumedTimestamp);
assertTrue(startAckedTimestampInConsumerStats < lastAckedTimestamp);
assertTrue(consumedFlowTimestamp == lastConsumedFlowTimestamp);
assertTrue(ackedTimestampInSubStats < lastAckedTimestampInSubStats);
assertEquals(lastConsumedTimestamp, lastConsumedTimestampInSubStats);

consumer.close();
producer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public class ConsumerStats {
private int clientVersionOffset = -1;
private int clientVersionLength;

public long lastAckedTimestamp;
public long lastConsumedTimestamp;

/** Metadata (key/value strings) associated with this consumer. */
public Map<String, String> metadata;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ public class SubscriptionStats {
/** Last message expire execution timestamp. */
public long lastExpireTimestamp;

/** Last received consume flow command timestamp. */
public long lastConsumedFlowTimestamp;

/** Last consume message timestamp. */
public long lastConsumedTimestamp;

/** Last acked message timestamp. */
public long lastAckedTimestamp;

/** List of connected consumers on this subscription w/ their stats. */
public List<ConsumerStats> consumers;

Expand Down