Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [Not Ready For Review] Adding poll rate to check how slow partition consuming. #1506

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.utils.ExceptionUtils;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.RedundantExceptionFilter;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.lazy.Lazy;
import io.tehuti.metrics.MetricConfig;
import io.tehuti.metrics.stats.Rate;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -49,6 +52,8 @@ class ConsumptionTask implements Runnable {
new VeniceConcurrentHashMap<>();
private final long readCycleDelayMs;
private final Supplier<Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>>> pollFunction;

private final Function<PubSubTopicPartition, Long> offsetLagGetter;
private final IntConsumer bandwidthThrottler;
private final IntConsumer recordsThrottler;
private final AggKafkaConsumerServiceStats aggStats;
Expand All @@ -61,6 +66,10 @@ class ConsumptionTask implements Runnable {
*/
private final Map<PubSubTopicPartition, Rate> messageRatePerTopicPartition = new VeniceConcurrentHashMap<>();
private final Map<PubSubTopicPartition, Rate> bytesRatePerTopicPartition = new VeniceConcurrentHashMap<>();
private final Map<PubSubTopicPartition, Rate> pollRatePerTopicPartition = new VeniceConcurrentHashMap<>();

private final Lazy<Rate> overallConsumerPollRate;
private final RedundantExceptionFilter redundantExceptionFilter;
private final Map<PubSubTopicPartition, Long> lastSuccessfulPollTimestampPerTopicPartition =
new VeniceConcurrentHashMap<>();

Expand All @@ -87,14 +96,19 @@ public ConsumptionTask(
final IntConsumer bandwidthThrottler,
final IntConsumer recordsThrottler,
final AggKafkaConsumerServiceStats aggStats,
final ConsumerSubscriptionCleaner cleaner) {
final ConsumerSubscriptionCleaner cleaner,
Function<PubSubTopicPartition, Long> offsetLagGetter,
RedundantExceptionFilter redundantExceptionFilter) {
this.readCycleDelayMs = readCycleDelayMs;
this.pollFunction = pollFunction;
this.bandwidthThrottler = bandwidthThrottler;
this.recordsThrottler = recordsThrottler;
this.aggStats = aggStats;
this.cleaner = cleaner;
this.taskId = taskId;
this.offsetLagGetter = offsetLagGetter;
this.redundantExceptionFilter = redundantExceptionFilter;
this.overallConsumerPollRate = Lazy.of(() -> createRate(System.currentTimeMillis()));
this.consumptionTaskIdStr = Utils.getSanitizedStringForLogger(consumerNamePrefix) + " - " + taskId;
this.LOGGER = LogManager.getLogger(getClass().getSimpleName() + "[ " + consumptionTaskIdStr + " ]");
}
Expand Down Expand Up @@ -180,9 +194,13 @@ public void run() {
bytesRatePerTopicPartition
.computeIfAbsent(pubSubTopicPartition, tp -> createRate(lastSuccessfulPollTimestamp))
.record(payloadSizePerTopicPartition, lastSuccessfulPollTimestamp);

pollRatePerTopicPartition
.computeIfAbsent(pubSubTopicPartition, tp -> createRate(lastSuccessfulPollTimestamp))
.record(1, lastSuccessfulPollTimestamp);
consumedDataReceiver.write(topicPartitionMessages);
checkSlowPartitionWithHighLag(pubSubTopicPartition);
}
overallConsumerPollRate.get().record(1, lastSuccessfulPollTimestamp);
aggStats.recordTotalConsumerRecordsProducingToWriterBufferLatency(
LatencyUtils.getElapsedTimeFromMsToMs(beforeProducingToWriteBufferTimestamp));
aggStats.recordTotalNonZeroPollResultNum(polledPubSubMessagesCount);
Expand Down Expand Up @@ -278,6 +296,31 @@ Double getByteRate(PubSubTopicPartition topicPartition) {
return 0.0D;
}

Double getPollRate(PubSubTopicPartition topicPartition) {
if (pollRatePerTopicPartition.containsKey(topicPartition)) {
return pollRatePerTopicPartition.get(topicPartition).measure(metricConfig, System.currentTimeMillis());
}
return 0.0D;
}

private void checkSlowPartitionWithHighLag(PubSubTopicPartition pubSubTopicPartition) {
Long offsetLag = offsetLagGetter.apply(pubSubTopicPartition);
Double messageRate = getMessageRate(pubSubTopicPartition);
Double pollRate = getPollRate(pubSubTopicPartition);
Double consumerPollRate = overallConsumerPollRate.get().measure(metricConfig, System.currentTimeMillis());
String slowTaskWithPartitionStr = consumptionTaskIdStr + " - " + pubSubTopicPartition;
if (offsetLag > 200000 && messageRate < 200
&& !redundantExceptionFilter.isRedundantException(slowTaskWithPartitionStr)) {
LOGGER.warn(
"Slow partition with high lag detected: {}. Lag: {}, Message Rate: {}, Poll Rate: {},Consumer Poll Rate: {}",
pubSubTopicPartition,
offsetLag,
messageRate,
pollRate,
consumerPollRate);
}
}

PubSubTopic getDestinationIdentifier(PubSubTopicPartition topicPartition) {
ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> dataReceiver =
dataReceiverMap.get(topicPartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -90,7 +91,7 @@ public abstract class KafkaConsumerService extends AbstractKafkaConsumerService
private static final int SHUTDOWN_TIMEOUT_IN_SECOND = 1;
// 4MB bitset size, 2 bitmaps for active and old bitset
private static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER =
new RedundantExceptionFilter(8 * 1024 * 1024 * 4, TimeUnit.MINUTES.toMillis(10));
new RedundantExceptionFilter(8 * 1024 * 1024 * 4, TimeUnit.MINUTES.toMillis(1));

/**
* @param statsOverride injection of stats, for test purposes
Expand Down Expand Up @@ -167,6 +168,7 @@ protected KafkaConsumerService(
pubSubConsumer::batchUnsubscribe,
time);

Function<PubSubTopicPartition, Long> offsetLagGetter = pubSubConsumer::getOffsetLag;
ConsumptionTask consumptionTask = new ConsumptionTask(
consumerNamePrefix,
i,
Expand All @@ -175,7 +177,9 @@ protected KafkaConsumerService(
bandwidthThrottlerFunction,
recordsThrottlerFunction,
this.aggStats,
cleaner);
cleaner,
offsetLagGetter,
REDUNDANT_LOGGING_FILTER);
consumerToConsumptionTask.putByIndex(pubSubConsumer, consumptionTask, i);
consumerToLocks.put(pubSubConsumer, new ReentrantLock());
}
Expand Down Expand Up @@ -560,6 +564,7 @@ private Map<PubSubTopicPartition, TopicPartitionIngestionInfo> getIngestionInfoF
long latestOffset = consumer.getLatestOffset(topicPartition);
double msgRate = consumptionTask.getMessageRate(topicPartition);
double byteRate = consumptionTask.getByteRate(topicPartition);
double pollRate = consumptionTask.getPollRate(topicPartition);
long lastSuccessfulPollTimestamp = consumptionTask.getLastSuccessfulPollTimestamp(topicPartition);
long elapsedTimeSinceLastPollInMs = ConsumptionTask.DEFAULT_TOPIC_PARTITION_NO_POLL_TIMESTAMP;
if (lastSuccessfulPollTimestamp != ConsumptionTask.DEFAULT_TOPIC_PARTITION_NO_POLL_TIMESTAMP) {
Expand All @@ -573,6 +578,7 @@ private Map<PubSubTopicPartition, TopicPartitionIngestionInfo> getIngestionInfoF
offsetLag,
msgRate,
byteRate,
pollRate,
consumerIdStr,
elapsedTimeSinceLastPollInMs,
destinationVersionTopicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public class TopicPartitionIngestionInfo {
private long offsetLag;
private double msgRate;
private double byteRate;
private double pollRate;
private String consumerIdStr;
private long elapsedTimeSinceLastPollInMs;

Expand All @@ -20,13 +21,15 @@ public TopicPartitionIngestionInfo(
@JsonProperty("offsetLag") long offsetLag,
@JsonProperty("msgRate") double msgRate,
@JsonProperty("byteRate") double byteRate,
@JsonProperty("pollRate") double pollRate,
@JsonProperty("consumerIdStr") String consumerIdStr,
@JsonProperty("elapsedTimeSinceLastPollInMs") long elapsedTimeSinceLastPollInMs,
@JsonProperty("versionTopicName") String versionTopicName) {
this.latestOffset = latestOffset;
this.offsetLag = offsetLag;
this.msgRate = msgRate;
this.byteRate = byteRate;
this.pollRate = pollRate;
this.consumerIdStr = consumerIdStr;
this.elapsedTimeSinceLastPollInMs = elapsedTimeSinceLastPollInMs;
this.versionTopicName = versionTopicName;
Expand Down Expand Up @@ -60,6 +63,14 @@ public void setByteRate(double byteRate) {
this.byteRate = byteRate;
}

public double getPollRate() {
return pollRate;
}

public void setPollRate(double pollRate) {
this.pollRate = pollRate;
}

public String getConsumerIdStr() {
return consumerIdStr;
}
Expand Down Expand Up @@ -97,6 +108,7 @@ public boolean equals(Object o) {
&& this.offsetLag == topicPartitionIngestionInfo.getOffsetLag()
&& Double.doubleToLongBits(this.msgRate) == Double.doubleToLongBits(topicPartitionIngestionInfo.getMsgRate())
&& Double.doubleToLongBits(this.byteRate) == Double.doubleToLongBits(topicPartitionIngestionInfo.getByteRate())
&& Double.doubleToLongBits(this.pollRate) == Double.doubleToLongBits(topicPartitionIngestionInfo.getPollRate())
&& this.consumerIdStr.equals(topicPartitionIngestionInfo.getConsumerIdStr())
&& this.elapsedTimeSinceLastPollInMs == topicPartitionIngestionInfo.getElapsedTimeSinceLastPollInMs()
&& this.versionTopicName.equals(topicPartitionIngestionInfo.getVersionTopicName());
Expand All @@ -108,6 +120,7 @@ public int hashCode() {
result = 31 * result + Long.hashCode(offsetLag);
result = 31 * result + Double.hashCode(msgRate);
result = 31 * result + Double.hashCode(byteRate);
result = 31 * result + Double.hashCode(pollRate);
result = 31 * result + consumerIdStr.hashCode();
result = 31 * result + Long.hashCode(elapsedTimeSinceLastPollInMs);
result = 31 * result + versionTopicName.hashCode();
Expand All @@ -117,7 +130,7 @@ public int hashCode() {
@Override
public String toString() {
return "{" + "latestOffset:" + latestOffset + ", offsetLag:" + offsetLag + ", msgRate:" + msgRate + ", byteRate:"
+ byteRate + ", consumerIdStr:" + consumerIdStr + ", elapsedTimeSinceLastPollInMs:"
+ byteRate + ", pollRate:" + pollRate + ", consumerIdStr:" + consumerIdStr + ", elapsedTimeSinceLastPollInMs:"
+ elapsedTimeSinceLastPollInMs + ", versionTopicName:" + versionTopicName + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class TopicPartitionIngestionInfoTest {
public void testJsonParse() throws Exception {
PubSubTopic versionTopic = pubSubTopicRepository.getTopic("test_store_v1");
TopicPartitionIngestionInfo topicPartitionIngestionInfo =
new TopicPartitionIngestionInfo(0, 1, 2.0, 4.0, "consumerIdStr", 7, versionTopic.getName());
new TopicPartitionIngestionInfo(0, 1, 2.0, 4.0, 1.0, "consumerIdStr", 7, versionTopic.getName());
String kafkaUrl = "localhost:1234";
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(versionTopic, 0);
Map<String, Map<String, TopicPartitionIngestionInfo>> topicPartitionIngestionContext = new HashMap<>();
Expand Down
Loading