From d01b04f362f3337588e543a089ec7c9979a73007 Mon Sep 17 00:00:00 2001 From: Li Zhanhui Date: Wed, 8 Jun 2022 11:37:17 +0800 Subject: [PATCH 1/2] Let name server generate valid JSON response when process topic route queries --- .../processor/DefaultRequestProcessor.java | 24 +++++----- remoting/pom.xml | 7 +++ .../protocol/RemotingSerializable.java | 13 ++++++ .../protocol/RemotingSerializableTest.java | 44 ++++++++++++++++++- 4 files changed, 77 insertions(+), 11 deletions(-) diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java index 6df8c65bf3f..b8c136f9a8d 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java @@ -16,10 +16,8 @@ */ package org.apache.rocketmq.namesrv.processor; +import com.alibaba.fastjson.serializer.SerializerFeature; import io.netty.channel.ChannelHandlerContext; -import java.io.UnsupportedEncodingException; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MQVersion.Version; @@ -27,19 +25,17 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.help.FAQUrl; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.body.TopicList; -import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader; -import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.namesrv.NamesrvUtil; import org.apache.rocketmq.common.namesrv.RegisterBrokerResult; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.header.GetTopicsByClusterRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader; import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.DeleteTopicFromNamesrvRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader; @@ -55,6 +51,8 @@ import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader; import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.namesrv.NamesrvController; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; @@ -62,6 +60,10 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import java.io.UnsupportedEncodingException; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; + public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); @@ -362,7 +364,9 @@ public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, topicRouteData.setOrderTopicConf(orderTopicConf); } - byte[] content = topicRouteData.encode(); + byte[] content = topicRouteData.encode(SerializerFeature.BrowserCompatible, + SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField, + SerializerFeature.MapSortField); response.setBody(content); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); diff --git a/remoting/pom.xml b/remoting/pom.xml index 32320e6d050..1474e04ddae 100644 --- a/remoting/pom.xml +++ b/remoting/pom.xml @@ -40,5 +40,12 @@ ${project.groupId} rocketmq-logging + + + com.google.code.gson + gson + 2.9.0 + test + diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java index 4a32f65ff66..182b5e7d08a 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java @@ -17,6 +17,8 @@ package org.apache.rocketmq.remoting.protocol; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; + import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -52,6 +54,17 @@ public byte[] encode() { return null; } + /** + * Allow call-site to apply specific features according to their requirements. + * + * @param features Features to apply + * @return serialized data. + */ + public byte[] encode(SerializerFeature...features) { + final String json = JSON.toJSONString(this, features); + return json.getBytes(CHARSET_UTF8); + } + public String toJson() { return toJson(false); } diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java index 3e8b7a90ae6..b70e23acecd 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java @@ -16,9 +16,19 @@ */ package org.apache.rocketmq.remoting.protocol; +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.TypeAdapter; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.nio.charset.Charset; import java.util.Arrays; +import java.util.HashMap; import java.util.List; -import org.junit.Test; +import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; @@ -80,6 +90,38 @@ public void setStringList(List stringList) { "}"); } + @Test + public void testEncode() { + class Foo extends RemotingSerializable { + Map map = new HashMap<>(); + + Foo() { + map.put(0L, "Test"); + } + + public Map getMap() { + return map; + } + } + Foo foo = new Foo(); + String invalid = new String(foo.encode(), Charset.defaultCharset()); + String valid = new String(foo.encode(SerializerFeature.BrowserCompatible, SerializerFeature.QuoteFieldNames, + SerializerFeature.MapSortField), Charset.defaultCharset()); + + Gson gson = new Gson(); + final TypeAdapter strictAdapter = gson.getAdapter(JsonElement.class); + try { + strictAdapter.fromJson(invalid); + Assert.fail("Should have thrown"); + } catch (IOException ignore) { + } + + try { + strictAdapter.fromJson(valid); + } catch (IOException ignore) { + Assert.fail("Should not throw"); + } + } } class Sample { From 4d112dc989f02018b0000155e4ab95059f3a15a7 Mon Sep 17 00:00:00 2001 From: Li Zhanhui Date: Thu, 9 Jun 2022 09:50:00 +0800 Subject: [PATCH 2/2] Revert sorting imports --- .../processor/DefaultRequestProcessor.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java index b8c136f9a8d..48a0342960d 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java @@ -18,6 +18,9 @@ import com.alibaba.fastjson.serializer.SerializerFeature; import io.netty.channel.ChannelHandlerContext; +import java.io.UnsupportedEncodingException; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MQVersion.Version; @@ -25,17 +28,19 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.help.FAQUrl; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.body.TopicList; +import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.namesrv.NamesrvUtil; import org.apache.rocketmq.common.namesrv.RegisterBrokerResult; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; -import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.header.GetTopicsByClusterRequestHeader; -import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader; -import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader; import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.DeleteTopicFromNamesrvRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader; @@ -51,8 +56,6 @@ import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader; import org.apache.rocketmq.common.protocol.route.TopicRouteData; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.namesrv.NamesrvController; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; @@ -60,10 +63,6 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import java.io.UnsupportedEncodingException; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicLong; - public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);