From 4a22c0c0c1cef3b6370205651cb098a3d36927f7 Mon Sep 17 00:00:00 2001 From: jungle <353187194@qq.com> Date: Fri, 19 Oct 2018 14:18:56 +0800 Subject: [PATCH] [ISSUE #411] Fixed ClassCastException when get the instance of the store (#423) * Fixed issue #411 * fix cast in getAllDelayOffset * Update AdminBrokerProcessor.java --- .../broker/plugin/AbstractPluginMessageStore.java | 6 ++++++ .../broker/processor/AdminBrokerProcessor.java | 12 ++++++++++-- .../apache/rocketmq/store/DefaultMessageStore.java | 1 + .../java/org/apache/rocketmq/store/MessageStore.java | 8 ++++++++ 4 files changed, 25 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java index f6f8a80afb..e66cead412 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java @@ -30,6 +30,7 @@ import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.QueryMessageResult; import org.apache.rocketmq.store.SelectMappedBufferResult; +import org.apache.rocketmq.store.stats.BrokerStatsManager; public abstract class AbstractPluginMessageStore implements MessageStore { protected MessageStore next = null; @@ -246,4 +247,9 @@ public LinkedList getDispatcherList() { public ConsumeQueue getConsumeQueue(String topic, int queueId) { return next.getConsumeQueue(topic, queueId); } + + @Override + public BrokerStatsManager getBrokerStatsManager() { + return next.getBrokerStatsManager(); + }; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 356aafc46f..73fe439427 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -114,6 +114,7 @@ import org.apache.rocketmq.store.ConsumeQueueExt; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.MessageFilter; +import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.SelectMappedBufferResult; public class AdminBrokerProcessor implements NettyRequestProcessor { @@ -760,12 +761,19 @@ private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, Remoting private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); + if (!(this.brokerController.getMessageStore() instanceof DefaultMessageStore)) { + log.error("Delay offset not supported in this messagetore, client: {} ", ctx.channel().remoteAddress()); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("Delay offset not supported in this messagetore"); + return response; + } + String content = ((DefaultMessageStore) this.brokerController.getMessageStore()).getScheduleMessageService().encode(); if (content != null && content.length() > 0) { try { response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { - log.error("get all delay offset from master error.", e); + log.error("Get all delay offset from master error.", e); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UnsupportedEncodingException " + e); @@ -1051,7 +1059,7 @@ private RemotingCommand ViewBrokerStatsData(ChannelHandlerContext ctx, final ViewBrokerStatsDataRequestHeader requestHeader = (ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class); final RemotingCommand response = RemotingCommand.createResponseCommand(null); - DefaultMessageStore messageStore = (DefaultMessageStore) this.brokerController.getMessageStore(); + MessageStore messageStore = this.brokerController.getMessageStore(); StatsItem statsItem = messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(), requestHeader.getStatsKey()); if (null == statsItem) { diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 1ade7c2838..ff431ed889 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -1371,6 +1371,7 @@ public void putMessagePositionInfo(DispatchRequest dispatchRequest) { cq.putMessagePositionInfoWrapper(dispatchRequest); } + @Override public BrokerStatsManager getBrokerStatsManager() { return brokerStatsManager; } diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 907dfe2093..0f9b4f0ae6 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -21,6 +21,7 @@ import java.util.Set; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; +import org.apache.rocketmq.store.stats.BrokerStatsManager; /** * This class defines contracting interfaces to implement, allowing third-party vendor to use customized message store. @@ -358,4 +359,11 @@ QueryMessageResult queryMessage(final String topic, final String key, final int * @return Consume queue. */ ConsumeQueue getConsumeQueue(String topic, int queueId); + + /** + * Get BrokerStatsManager of the messageStore. + * + * @return BrokerStatsManager. + */ + BrokerStatsManager getBrokerStatsManager(); }