Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RIP-46] Enhanced metrics for timing and transactional messages #7500

Merged
merged 20 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
import org.apache.rocketmq.broker.topic.TopicRouteInfoManager;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionMetricsFlushService;
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageCheckListener;
Expand Down Expand Up @@ -276,6 +277,7 @@ public class BrokerController {
private BrokerMetricsManager brokerMetricsManager;
private ColdDataPullRequestHoldService coldDataPullRequestHoldService;
private ColdDataCgCtrService coldDataCgCtrService;
private TransactionMetricsFlushService transactionMetricsFlushService;

public BrokerController(
final BrokerConfig brokerConfig,
Expand Down Expand Up @@ -957,6 +959,9 @@ private void initialTransaction() {
}
this.transactionalMessageCheckListener.setBrokerController(this);
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
this.transactionMetricsFlushService = new TransactionMetricsFlushService(this);
this.transactionMetricsFlushService.start();

}

private void initialAcl() {
Expand Down Expand Up @@ -1434,6 +1439,10 @@ protected void shutdownBasicService() {
this.endTransactionExecutor.shutdown();
}

if (this.transactionMetricsFlushService != null) {
this.transactionMetricsFlushService.shutdown();
}

if (this.escapeBridge != null) {
escapeBridge.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public static String getTimerCheckPath(final String rootDir) {
public static String getTimerMetricsPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "timermetrics";
}
public static String getTransactionMetricsPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "transactionMetrics";
}

public static String getConsumerFilterPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "consumerFilter.json";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public class BrokerMetricsConstant {
public static final String GAUGE_CONSUMER_READY_MESSAGES = "rocketmq_consumer_ready_messages";
public static final String COUNTER_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL = "rocketmq_send_to_dlq_messages_total";

public static final String COUNTER_COMMIT_MESSAGES_TOTAL = "rocketmq_commit_messages_total";
public static final String COUNTER_ROLLBACK_MESSAGES_TOTAL = "rocketmq_rollback_messages_total";
public static final String HISTOGRAM_FINISH_MSG_LATENCY = "rocketmq_finish_message_latency";
public static final String GAUGE_HALF_MESSAGES = "rocketmq_half_messages";

public static final String LABEL_CLUSTER_NAME = "cluster";
public static final String LABEL_NODE_TYPE = "node_type";
public static final String NODE_TYPE_BROKER = "broker";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,6 @@
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerManager;
Expand All @@ -68,12 +61,23 @@
import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant;
import org.slf4j.bridge.SLF4JBridgeHandler;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.AGGREGATION_DELTA;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_COMMIT_MESSAGES_TOTAL;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_MESSAGES_IN_TOTAL;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_MESSAGES_OUT_TOTAL;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_ROLLBACK_MESSAGES_TOTAL;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_THROUGHPUT_IN_TOTAL;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_THROUGHPUT_OUT_TOTAL;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_BROKER_PERMISSION;
Expand All @@ -83,8 +87,10 @@
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_LAG_MESSAGES;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_QUEUEING_LATENCY;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_READY_MESSAGES;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_HALF_MESSAGES;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PROCESSOR_WATERMARK;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PRODUCER_CONNECTIONS;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_FINISH_MSG_LATENCY;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_MESSAGE_SIZE;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_AGGREGATION;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CLUSTER_NAME;
Expand Down Expand Up @@ -141,6 +147,10 @@ public class BrokerMetricsManager {
public static ObservableLongGauge consumerQueueingLatency = new NopObservableLongGauge();
public static ObservableLongGauge consumerReadyMessages = new NopObservableLongGauge();
public static LongCounter sendToDlqMessages = new NopLongCounter();
public static ObservableLongGauge halfMessages = new NopObservableLongGauge();
public static LongCounter commitMessagesTotal = new NopLongCounter();
public static LongCounter rollBackMessagesTotal = new NopLongCounter();
public static LongHistogram transactionFinishLatency = new NopLongHistogram();

public static final List<String> SYSTEM_GROUP_PREFIX_LIST = new ArrayList<String>() {
{
Expand Down Expand Up @@ -347,6 +357,7 @@ private void init() {
initRequestMetrics();
initConnectionMetrics();
initLagAndDlqMetrics();
initTransactionMetrics();
initOtherMetrics();
}

Expand All @@ -360,6 +371,15 @@ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
2d * 1024 * 1024, //2MB
4d * 1024 * 1024 //4MB
);

List<Double> commitLatencyBuckets = Arrays.asList(
1d * 1 * 1 * 5, //5s
1d * 1 * 1 * 60, //1min
1d * 1 * 10 * 60, //10min
1d * 1 * 60 * 60, //1h
1d * 12 * 60 * 60, //12h
1d * 24 * 60 * 60 //24h
);
InstrumentSelector messageSizeSelector = InstrumentSelector.builder()
.setType(InstrumentType.HISTOGRAM)
.setName(HISTOGRAM_MESSAGE_SIZE)
Expand All @@ -370,6 +390,16 @@ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
SdkMeterProviderUtil.setCardinalityLimit(messageSizeViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
providerBuilder.registerView(messageSizeSelector, messageSizeViewBuilder.build());

InstrumentSelector commitLatencySelector = InstrumentSelector.builder()
.setType(InstrumentType.HISTOGRAM)
.setName(HISTOGRAM_FINISH_MSG_LATENCY)
.build();
ViewBuilder commitLatencyViewBuilder = View.builder()
.setAggregation(Aggregation.explicitBucketHistogram(commitLatencyBuckets));
// To config the cardinalityLimit for openTelemetry metrics exporting.
SdkMeterProviderUtil.setCardinalityLimit(commitLatencyViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
providerBuilder.registerView(commitLatencySelector, commitLatencyViewBuilder.build());

for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair : RemotingMetricsManager.getMetricsView()) {
ViewBuilder viewBuilder = selectorViewPair.getObject2();
SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
Expand Down Expand Up @@ -559,6 +589,34 @@ private void initLagAndDlqMetrics() {
.build();
}

private void initTransactionMetrics() {
commitMessagesTotal = brokerMeter.counterBuilder(COUNTER_COMMIT_MESSAGES_TOTAL)
.setDescription("Total number of commit messages")
.build();

rollBackMessagesTotal = brokerMeter.counterBuilder(COUNTER_ROLLBACK_MESSAGES_TOTAL)
.setDescription("Total number of rollback messages")
.build();

transactionFinishLatency = brokerMeter.histogramBuilder(HISTOGRAM_FINISH_MSG_LATENCY)
.setDescription("Transaction finish latency")
.ofLongs()
.setUnit("ms")
.build();

halfMessages = brokerMeter.gaugeBuilder(GAUGE_HALF_MESSAGES)
.setDescription("Half messages of all topics")
.ofLongs()
.buildWithCallback(measurement -> {
brokerController.getTransactionalMessageService().getTransactionMetrics().getTransactionCounts()
.forEach((topic, metric) -> {
measurement.record(
metric.getCount().get(),
newAttributesBuilder().put(DefaultStoreMetricsConstant.LABEL_TOPIC, topic).build()
);
});
});
}
private void initOtherMetrics() {
RemotingMetricsManager.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder);
messageStore.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.netty.channel.ChannelHandlerContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.transaction.OperationResult;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.TopicFilterType;
Expand All @@ -40,6 +41,8 @@
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.BrokerRole;

import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;

/**
* EndTransaction processor: process commit and rollback message
*/
Expand Down Expand Up @@ -144,6 +147,16 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
// successful committed, then total num of half-messages minus 1
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getTopic(), -1);
BrokerMetricsManager.commitMessagesTotal.add(1, BrokerMetricsManager.newAttributesBuilder()
.put(LABEL_TOPIC, msgInner.getTopic())
.build());
// record the commit latency.
Long commitLatency = (System.currentTimeMillis() - result.getPrepareMessage().getBornTimestamp()) / 1000;
BrokerMetricsManager.transactionFinishLatency.record(commitLatency, BrokerMetricsManager.newAttributesBuilder()
.put(LABEL_TOPIC, msgInner.getTopic())
.build());
}
return sendResult;
}
Expand All @@ -161,6 +174,11 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
// roll back, then total num of half-messages minus 1
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC), -1);
BrokerMetricsManager.rollBackMessagesTotal.add(1, BrokerMetricsManager.newAttributesBuilder()
.put(LABEL_TOPIC, result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC))
.build());
}
return res;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@

import io.netty.channel.ChannelHandlerContext;
import io.opentelemetry.api.common.Attributes;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
Expand Down Expand Up @@ -63,10 +58,17 @@
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_MESSAGE_TYPE;
Expand Down Expand Up @@ -290,7 +292,7 @@ public RemotingCommand sendMessage(final ChannelHandlerContext ctx,

// Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
boolean sendTransactionPrepareMessage = false;
boolean sendTransactionPrepareMessage;
if (Boolean.parseBoolean(traFlag)
&& !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
Expand All @@ -301,6 +303,8 @@ public RemotingCommand sendMessage(final ChannelHandlerContext ctx,
return response;
}
sendTransactionPrepareMessage = true;
} else {
sendTransactionPrepareMessage = false;
}

long beginTimeMillis = this.brokerController.getMessageStore().now();
Expand All @@ -322,6 +326,12 @@ public RemotingCommand sendMessage(final ChannelHandlerContext ctx,
if (responseFuture != null) {
doResponse(ctx, request, responseFuture);
}

// record the transaction metrics, responseFuture == null means put successfully
if (sendTransactionPrepareMessage && (responseFuture == null || responseFuture.getCode() == ResponseCode.SUCCESS)) {
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC), 1);
}

sendMessageCallback.onComplete(sendMessageContext, response);
}, this.brokerController.getPutMessageFutureExecutor());
// Returns null to release the send message thread
Expand All @@ -334,6 +344,10 @@ public RemotingCommand sendMessage(final ChannelHandlerContext ctx,
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext, BrokerMetricsManager.getMessageType(requestHeader));
// record the transaction metrics
if (putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK && putMessageResult.getAppendMessageResult().isOk()) {
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC), 1);
}
sendMessageCallback.onComplete(sendMessageContext, response);
return response;
}
Expand Down
Loading
Loading