Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #7988] Refector client trace #7989

Merged
merged 3 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ public class ClientConfig {

private boolean enableHeartbeatChannelEventListener = true;

/**
* The switch for message trace
*/
protected boolean enableTrace = true;

/**
* The name value of message trace topic. If not set, the default trace topic name will be used.
*/
protected String traceTopic;

public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
Expand Down Expand Up @@ -215,6 +225,8 @@ public void resetClientConfig(final ClientConfig cc) {
this.detectInterval = cc.detectInterval;
this.detectTimeout = cc.detectTimeout;
this.namespaceV2 = cc.namespaceV2;
this.enableTrace = cc.enableTrace;
this.traceTopic = cc.traceTopic;
}

public ClientConfig cloneClientConfig() {
Expand Down Expand Up @@ -245,6 +257,8 @@ public ClientConfig cloneClientConfig() {
cc.detectInterval = detectInterval;
cc.detectTimeout = detectTimeout;
cc.namespaceV2 = namespaceV2;
cc.enableTrace = enableTrace;
cc.traceTopic = traceTopic;
return cc;
}

Expand Down Expand Up @@ -474,6 +488,22 @@ public void setUseHeartbeatV2(boolean useHeartbeatV2) {
this.useHeartbeatV2 = useHeartbeatV2;
}

public boolean isEnableTrace() {
return enableTrace;
}

public void setEnableTrace(boolean enableTrace) {
this.enableTrace = enableTrace;
}

public String getTraceTopic() {
return traceTopic;
}

public void setTraceTopic(String traceTopic) {
this.traceTopic = traceTopic;
}

@Override
public String toString() {
return "ClientConfig{" +
Expand Down Expand Up @@ -505,6 +535,8 @@ public String toString() {
", sendLatencyEnable=" + sendLatencyEnable +
", startDetectorEnable=" + startDetectorEnable +
", enableHeartbeatChannelEventListener=" + enableHeartbeatChannelEventListener +
", enableTrace=" + enableTrace +
", traceTopic='" + traceTopic + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
*/
private TraceDispatcher traceDispatcher = null;

/**
* The flag for message trace
*/
private boolean enableMsgTrace = false;

/**
* The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
private String customizedTraceTopic;
private RPCHook rpcHook;

/**
* Default constructor.
Expand Down Expand Up @@ -212,6 +204,7 @@ public DefaultLitePullConsumer(RPCHook rpcHook) {
*/
public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) {
this.consumerGroup = consumerGroup;
this.rpcHook = rpcHook;
this.enableStreamRequestType = true;
defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
}
Expand All @@ -226,6 +219,7 @@ public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) {
public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.consumerGroup = consumerGroup;
this.rpcHook = rpcHook;
this.enableStreamRequestType = true;
defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
}
Expand Down Expand Up @@ -592,15 +586,12 @@ public TraceDispatcher getTraceDispatcher() {
return traceDispatcher;
}

public void setCustomizedTraceTopic(String customizedTraceTopic) {
this.customizedTraceTopic = customizedTraceTopic;
}

private void setTraceDispatcher() {
if (isEnableMsgTrace()) {
if (enableTrace) {
try {
AsyncTraceDispatcher traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, null);
AsyncTraceDispatcher traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, traceTopic, rpcHook);
traceDispatcher.getTraceProducer().setUseTLS(this.isUseTLS());
traceDispatcher.setNamespaceV2(namespaceV2);
this.traceDispatcher = traceDispatcher;
this.defaultLitePullConsumerImpl.registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher));
Expand All @@ -611,14 +602,18 @@ private void setTraceDispatcher() {
}

public String getCustomizedTraceTopic() {
return customizedTraceTopic;
return traceTopic;
}

public void setCustomizedTraceTopic(String customizedTraceTopic) {
this.traceTopic = customizedTraceTopic;
}

public boolean isEnableMsgTrace() {
return enableMsgTrace;
return enableTrace;
}

public void setEnableMsgTrace(boolean enableMsgTrace) {
this.enableMsgTrace = enableMsgTrace;
this.enableTrace = enableMsgTrace;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
// force to use client rebalance
private boolean clientRebalance = true;

private RPCHook rpcHook = null;

/**
* Default constructor.
*/
Expand Down Expand Up @@ -327,6 +329,7 @@ public DefaultMQPushConsumer(RPCHook rpcHook) {
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook) {
this.consumerGroup = consumerGroup;
this.rpcHook = rpcHook;
this.allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
Expand All @@ -353,6 +356,7 @@ public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace,
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.rpcHook = rpcHook;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
Expand All @@ -369,18 +373,11 @@ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.rpcHook = rpcHook;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook);
dispatcher.setHostConsumer(this.defaultMQPushConsumerImpl);
traceDispatcher = dispatcher;
this.defaultMQPushConsumerImpl.registerConsumeMessageHook(new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
this.enableTrace = enableMsgTrace;
this.traceTopic = customizedTraceTopic;
}

/**
Expand Down Expand Up @@ -419,6 +416,7 @@ public DefaultMQPushConsumer(final String namespace, final String consumerGroup,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.rpcHook = rpcHook;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
Expand All @@ -438,18 +436,11 @@ public DefaultMQPushConsumer(final String namespace, final String consumerGroup,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.rpcHook = rpcHook;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook);
dispatcher.setHostConsumer(this.defaultMQPushConsumerImpl);
traceDispatcher = dispatcher;
this.defaultMQPushConsumerImpl.registerConsumeMessageHook(new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
this.enableTrace = enableMsgTrace;
this.traceTopic = customizedTraceTopic;
}

/**
Expand All @@ -464,9 +455,6 @@ public void createTopic(String key, String newTopic, int queueNum, Map<String, S
@Override
public void setUseTLS(boolean useTLS) {
super.setUseTLS(useTLS);
if (traceDispatcher instanceof AsyncTraceDispatcher) {
((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(useTLS);
}
}

/**
Expand Down Expand Up @@ -750,7 +738,21 @@ public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClie
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
this.defaultMQPushConsumerImpl.start();
if (enableTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, traceTopic, rpcHook);
dispatcher.setHostConsumer(this.defaultMQPushConsumerImpl);
dispatcher.setNamespaceV2(namespaceV2);
traceDispatcher = dispatcher;
this.defaultMQPushConsumerImpl.registerConsumeMessageHook(new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
if (null != traceDispatcher) {
if (traceDispatcher instanceof AsyncTraceDispatcher) {
((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(isUseTLS());
}
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
private int backPressureForAsyncSendSize = 100 * 1024 * 1024;

private RPCHook rpcHook = null;

/**
* Default constructor.
*/
Expand Down Expand Up @@ -202,6 +204,7 @@ public DefaultMQProducer(final String producerGroup) {
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this.producerGroup = producerGroup;
this.rpcHook = rpcHook;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}
Expand Down Expand Up @@ -243,20 +246,8 @@ public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, fin
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,
final String customizedTraceTopic) {
this(producerGroup, rpcHook);
//if client open the message trace feature
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.defaultMQProducerImpl);
traceDispatcher = dispatcher;
this.defaultMQProducerImpl.registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
this.defaultMQProducerImpl.registerEndTransactionHook(
new EndTransactionTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
logger.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
this.enableTrace = enableMsgTrace;
this.traceTopic = customizedTraceTopic;
}

/**
Expand Down Expand Up @@ -298,6 +289,7 @@ public DefaultMQProducer(final String namespace, final String producerGroup) {
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
this.rpcHook = rpcHook;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}
Expand All @@ -318,27 +310,8 @@ public DefaultMQProducer(final String namespace, final String producerGroup, RPC
boolean enableMsgTrace, final String customizedTraceTopic) {
this(namespace, producerGroup, rpcHook);
//if client open the message trace feature
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.defaultMQProducerImpl);
traceDispatcher = dispatcher;
this.defaultMQProducerImpl.registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
this.defaultMQProducerImpl.registerEndTransactionHook(
new EndTransactionTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
logger.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}

@Override
public void setUseTLS(boolean useTLS) {
super.setUseTLS(useTLS);
if (traceDispatcher instanceof AsyncTraceDispatcher) {
((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(useTLS);
}
this.enableTrace = enableMsgTrace;
this.traceTopic = customizedTraceTopic;
}

/**
Expand All @@ -356,7 +329,24 @@ public void start() throws MQClientException {
if (this.produceAccumulator != null) {
this.produceAccumulator.start();
}
if (enableTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, traceTopic, rpcHook);
dispatcher.setHostProducer(this.defaultMQProducerImpl);
dispatcher.setNamespaceV2(this.namespaceV2);
traceDispatcher = dispatcher;
this.defaultMQProducerImpl.registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
this.defaultMQProducerImpl.registerEndTransactionHook(
new EndTransactionTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
logger.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
if (null != traceDispatcher) {
if (traceDispatcher instanceof AsyncTraceDispatcher) {
((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(isUseTLS());
}
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private volatile AccessChannel accessChannel = AccessChannel.LOCAL;
private String group;
private Type type;
private String namespaceV2;

public AsyncTraceDispatcher(String group, Type type, String traceTopicName, RPCHook rpcHook) {
// queueSize is greater than or equal to the n power of 2 of value
Expand Down Expand Up @@ -144,10 +145,20 @@ public void setHostConsumer(DefaultMQPushConsumerImpl hostConsumer) {
this.hostConsumer = hostConsumer;
}

public String getNamespaceV2() {
return namespaceV2;
}

public void setNamespaceV2(String namespaceV2) {
this.namespaceV2 = namespaceV2;
}

public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.setNamespaceV2(namespaceV2);
traceProducer.setEnableTrace(false);
traceProducer.start();
}
this.accessChannel = accessChannel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ public PullResult answer(InvocationOnMock mock) throws Throwable {
new ConsumeMessageOpenTracingHookImpl(tracer));
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setPullInterval(60 * 1000);
// disable trace to let mock trace work
pushConsumer.setEnableTrace(false);

OffsetStore offsetStore = Mockito.mock(OffsetStore.class);
Mockito.when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(0L);
Expand Down
Loading
Loading