Skip to content

Commit

Permalink
[ISSUE alibaba#411] Fixed ClassCastException when get the instance of…
Browse files Browse the repository at this point in the history
… the store (alibaba#423)

* Fixed issue alibaba#411

* fix cast in getAllDelayOffset

* Update AdminBrokerProcessor.java
  • Loading branch information
Hellojungle authored and vongosling committed Oct 19, 2018
1 parent 945ceda commit 4a22c0c
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.stats.BrokerStatsManager;

public abstract class AbstractPluginMessageStore implements MessageStore {
protected MessageStore next = null;
Expand Down Expand Up @@ -246,4 +247,9 @@ public LinkedList<CommitLogDispatcher> getDispatcherList() {
public ConsumeQueue getConsumeQueue(String topic, int queueId) {
return next.getConsumeQueue(topic, queueId);
}

@Override
public BrokerStatsManager getBrokerStatsManager() {
return next.getBrokerStatsManager();
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;

public class AdminBrokerProcessor implements NettyRequestProcessor {
Expand Down Expand Up @@ -760,12 +761,19 @@ private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, Remoting
private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);

if (!(this.brokerController.getMessageStore() instanceof DefaultMessageStore)) {
log.error("Delay offset not supported in this messagetore, client: {} ", ctx.channel().remoteAddress());
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("Delay offset not supported in this messagetore");
return response;
}

String content = ((DefaultMessageStore) this.brokerController.getMessageStore()).getScheduleMessageService().encode();
if (content != null && content.length() > 0) {
try {
response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
log.error("get all delay offset from master error.", e);
log.error("Get all delay offset from master error.", e);

response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UnsupportedEncodingException " + e);
Expand Down Expand Up @@ -1051,7 +1059,7 @@ private RemotingCommand ViewBrokerStatsData(ChannelHandlerContext ctx,
final ViewBrokerStatsDataRequestHeader requestHeader =
(ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class);
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
DefaultMessageStore messageStore = (DefaultMessageStore) this.brokerController.getMessageStore();
MessageStore messageStore = this.brokerController.getMessageStore();

StatsItem statsItem = messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(), requestHeader.getStatsKey());
if (null == statsItem) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1371,6 +1371,7 @@ public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
cq.putMessagePositionInfoWrapper(dispatchRequest);
}

@Override
public BrokerStatsManager getBrokerStatsManager() {
return brokerStatsManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Set;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.store.stats.BrokerStatsManager;

/**
* This class defines contracting interfaces to implement, allowing third-party vendor to use customized message store.
Expand Down Expand Up @@ -358,4 +359,11 @@ QueryMessageResult queryMessage(final String topic, final String key, final int
* @return Consume queue.
*/
ConsumeQueue getConsumeQueue(String topic, int queueId);

/**
* Get BrokerStatsManager of the messageStore.
*
* @return BrokerStatsManager.
*/
BrokerStatsManager getBrokerStatsManager();
}

0 comments on commit 4a22c0c

Please sign in to comment.