From fcfe9cf21ecb0dc64ea5f46c1e3c6d610b634c79 Mon Sep 17 00:00:00 2001 From: huz Date: Fri, 17 Jan 2014 00:31:33 -0800 Subject: [PATCH 1/8] FIXME use buffer unpacker FIX unpack copy of ByteBuffer to ByteArrayInputStream --- .../rpc/loop/netty/MessagePackStreamDecoder.java | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackStreamDecoder.java b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackStreamDecoder.java index edfdb7e..6a0c82d 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackStreamDecoder.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackStreamDecoder.java @@ -40,24 +40,20 @@ public MessagePackStreamDecoder(MessagePack msgpack) { protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer source) throws Exception { // TODO #MN will modify the body with MessagePackBufferUnpacker. - ByteBuffer buffer = source.toByteBuffer(); + ByteBuffer buffer = source.toByteBuffer().duplicate(); if (!buffer.hasRemaining()) { return null; } source.markReaderIndex(); - byte[] bytes = buffer.array(); // FIXME buffer must has array - int offset = buffer.arrayOffset() + buffer.position(); - int length = buffer.arrayOffset() + buffer.limit(); - ByteArrayInputStream stream = new ByteArrayInputStream(bytes, offset, - length); - int startAvailable = stream.available(); try{ - Unpacker unpacker = msgpack.createUnpacker(stream); + final int posBeforeUnpack = buffer.position(); + Unpacker unpacker = msgpack.createBufferUnpacker(buffer); Value v = unpacker.readValue(); - source.skipBytes(startAvailable - stream.available()); + source.skipBytes(buffer.position() - posBeforeUnpack); return v; - }catch( EOFException e ){ + } + catch( EOFException e ){ // not enough buffers. // So retry reading source.resetReaderIndex(); From 7bda1c934b5497a4e18699eb1e2b17a1e62ed0c7 Mon Sep 17 00:00:00 2001 From: huz Date: Fri, 17 Jan 2014 00:42:20 -0800 Subject: [PATCH 2/8] FIXME use read ByteBuffer --- .../msgpack/rpc/loop/netty/MessagePackDecoder.java | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackDecoder.java b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackDecoder.java index f977328..6a15beb 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackDecoder.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackDecoder.java @@ -48,17 +48,7 @@ protected Object decode(ChannelHandlerContext ctx, Channel channel, return null; } - byte[] bytes = buffer.array(); // FIXME buffer must has array - int offset = buffer.arrayOffset() + buffer.position(); - int length = buffer.arrayOffset() + buffer.limit(); - - Value v = messagePack.read(bytes, offset, length); - return v; - - // TODO MessagePack.unpack() - /* - * Unpacker pac = new Unpacker(); pac.wrap(bytes, offset, length); - * return pac.unpackObject(); - */ + return messagePack.read(buffer.slice()); } } + From aad5f2d10d945ffc17d120b5f6f5598b04cf96f6 Mon Sep 17 00:00:00 2001 From: huzhou Date: Wed, 22 Jan 2014 19:11:12 -0800 Subject: [PATCH 3/8] ~ using netty 4.0 --- pom.xml | 324 +++++++++--------- src/main/java/org/msgpack/rpc/Request.java | 3 + src/main/java/org/msgpack/rpc/Server.java | 3 +- .../msgpack/rpc/dispatcher/Dispatcher.java | 1 + .../rpc/loop/netty/ChannelAdaptor.java | 6 +- .../rpc/loop/netty/MessageHandler.java | 94 ++--- .../rpc/loop/netty/MessagePackDecoder.java | 116 ++++--- .../rpc/loop/netty/MessagePackEncoder.java | 116 +++---- .../loop/netty/MessagePackStreamDecoder.java | 118 ++++--- .../rpc/loop/netty/NettyEventLoop.java | 34 +- .../loop/netty/NettyTcpClientTransport.java | 230 ++++++++++--- .../loop/netty/NettyTcpServerTransport.java | 154 +++++++-- .../rpc/loop/netty/StreamPipelineFactory.java | 76 ++-- .../rpc/reflect/ReflectionProxyBuilder.java | 1 - .../rpc/transport/ClientTransport.java | 1 + .../rpc/transport/MessageSendable.java | 2 + .../PooledStreamClientTransport.java | 13 +- .../rpc/transport/RpcMessageHandler.java | 14 +- .../rpc/transport/ServerTransport.java | 2 + .../java/org/msgpack/rpc/BigDataTest.java | 71 ++-- 20 files changed, 826 insertions(+), 553 deletions(-) diff --git a/pom.xml b/pom.xml index ede0210..f06c4f4 100644 --- a/pom.xml +++ b/pom.xml @@ -1,178 +1,178 @@ - 4.0.0 + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + 4.0.0 - org.msgpack - msgpack-rpc - jar - 0.7.1-SNAPSHOT + org.msgpack + msgpack-rpc + jar + 0.7.1-SNAPSHOT - msgpack-rpc - http://msgpack.org/ + msgpack-rpc + http://msgpack.org/ - - - The Apache Software License, Version 2.0 - http://www.apache.org/licenses/LICENSE-2.0.txt - repo - - + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + - - scm:git:git://github.com/msgpack/msgpack-rpc.git - scm:git:git://github.com/msgpack/msgpack-rpc.git - + + scm:git:git://github.com/msgpack/msgpack-rpc.git + scm:git:git://github.com/msgpack/msgpack-rpc.git + - - - - src/main/resources - - - - - src/test/resources - - + + + + src/main/resources + + + + + src/test/resources + + - - - maven-compiler-plugin - 2.3.2 - - 1.5 - 1.5 - - + + + maven-compiler-plugin + 2.3.2 + + 1.5 + 1.5 + + - - maven-eclipse-plugin - 2.5.1 - + + maven-eclipse-plugin + 2.5.1 + - - maven-release-plugin - 2.1 - - - deploy - scm:git:git://github.com/msgpack/msgpack-rpc.git - - - - + + maven-release-plugin + 2.1 + + + deploy + scm:git:git://github.com/msgpack/msgpack-rpc.git + + + + - - - - - org.apache.maven.plugins - maven-javadoc-plugin - 2.8.1 - - ${project.name} ${project.version} API - true - en_US - UTF-8 - - + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.8.1 + + ${project.name} ${project.version} API + true + en_US + UTF-8 + + - - org.apache.maven.plugins - maven-jxr-plugin - 2.2 - + + org.apache.maven.plugins + maven-jxr-plugin + 2.2 + - - org.apache.maven.plugins - maven-surefire-report-plugin - 2.11 - - - + + org.apache.maven.plugins + maven-surefire-report-plugin + 2.11 + + + - - - repository.jboss.org - https://repository.jboss.org/nexus/content/groups/public/ - - false - - - + + + repository.jboss.org + https://repository.jboss.org/nexus/content/groups/public/ + + false + + + - - - org.msgpack - msgpack - 0.6.6 - compile - - - org.jboss.netty - netty - 3.2.1.Final - - - javax.servlet - servlet-api - - - commons-logging - commons-logging - - - compile - - - org.slf4j - slf4j-api - 1.6.1 - - - org.slf4j - slf4j-log4j12 - 1.6.1 - - - junit - junit - 4.8.2 - test - - + + + org.msgpack + msgpack + 0.6.6 + compile + + + io.netty + netty-all + 4.0.15.Final + + + javax.servlet + servlet-api + + + commons-logging + commons-logging + + + compile + + + org.slf4j + slf4j-api + 1.6.1 + + + org.slf4j + slf4j-log4j12 + 1.6.1 + + + junit + junit + 4.8.2 + test + + - - - false - msgpack.org - Repository at msgpack.org - file://${project.build.directory}/website/maven2/ - - - true - msgpack.org - Repository at msgpack.org - file://${project.build.directory}/website/maven2/ - - + + + false + msgpack.org + Repository at msgpack.org + file://${project.build.directory}/website/maven2/ + + + true + msgpack.org + Repository at msgpack.org + file://${project.build.directory}/website/maven2/ + + - - - release - - - - true - org.apache.maven.plugins - maven-deploy-plugin - 2.4 - - true - - - - - - - + + + release + + + + true + org.apache.maven.plugins + maven-deploy-plugin + 2.4 + + true + + + + + + + \ No newline at end of file diff --git a/src/main/java/org/msgpack/rpc/Request.java b/src/main/java/org/msgpack/rpc/Request.java index a0007e1..2e3dcfe 100644 --- a/src/main/java/org/msgpack/rpc/Request.java +++ b/src/main/java/org/msgpack/rpc/Request.java @@ -69,6 +69,9 @@ public synchronized void sendResponse(Object result, Object error) { if (channel == null) { return; } + + System.out.println("[response] to request:" + msgid); + ResponseMessage msg = new ResponseMessage(msgid, error, result); channel.sendMessage(msg); channel = null; diff --git a/src/main/java/org/msgpack/rpc/Server.java b/src/main/java/org/msgpack/rpc/Server.java index ba027d5..9502504 100644 --- a/src/main/java/org/msgpack/rpc/Server.java +++ b/src/main/java/org/msgpack/rpc/Server.java @@ -106,7 +106,8 @@ public void onRequest(MessageSendable channel, int msgid, String method, Value a Request request = new Request(channel, msgid, method, args); try { dp.dispatch(request); - } catch (RPCError e) { + } + catch (RPCError e) { // FIXME request.sendError(e.getCode(), e); } catch (Exception e) { diff --git a/src/main/java/org/msgpack/rpc/dispatcher/Dispatcher.java b/src/main/java/org/msgpack/rpc/dispatcher/Dispatcher.java index 3d9fdbb..2d5ccff 100644 --- a/src/main/java/org/msgpack/rpc/dispatcher/Dispatcher.java +++ b/src/main/java/org/msgpack/rpc/dispatcher/Dispatcher.java @@ -20,5 +20,6 @@ import org.msgpack.rpc.Request; public interface Dispatcher { + void dispatch(Request request) throws Exception; } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/ChannelAdaptor.java b/src/main/java/org/msgpack/rpc/loop/netty/ChannelAdaptor.java index 41f2c5a..7044a10 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/ChannelAdaptor.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/ChannelAdaptor.java @@ -17,11 +17,11 @@ // package org.msgpack.rpc.loop.netty; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.Channels; +import io.netty.channel.Channel; import org.msgpack.rpc.transport.ClientTransport; class ChannelAdaptor implements ClientTransport { + private Channel channel; ChannelAdaptor(Channel channel) { @@ -29,7 +29,7 @@ class ChannelAdaptor implements ClientTransport { } public void sendMessage(Object msg) { - Channels.write(channel, msg); + channel.write(msg); } public void close() { diff --git a/src/main/java/org/msgpack/rpc/loop/netty/MessageHandler.java b/src/main/java/org/msgpack/rpc/loop/netty/MessageHandler.java index 5141b5c..6c321a5 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/MessageHandler.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/MessageHandler.java @@ -1,53 +1,53 @@ +//// +//// MessagePack-RPC for Java +//// +//// Copyright (C) 2010 FURUHASHI Sadayuki +//// +//// Licensed under the Apache License, Version 2.0 (the "License"); +//// you may not use this file except in compliance with the License. +//// You may obtain a copy of the License at +//// +//// http://www.apache.org/licenses/LICENSE-2.0 +//// +//// Unless required by applicable law or agreed to in writing, software +//// distributed under the License is distributed on an "AS IS" BASIS, +//// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//// See the License for the specific language governing permissions and +//// limitations under the License. +//// +//package org.msgpack.rpc.loop.netty; // -// MessagePack-RPC for Java +//import org.jboss.netty.channel.ChannelHandlerContext; +//import org.jboss.netty.channel.ChannelStateEvent; +//import org.jboss.netty.channel.MessageEvent; +//import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +//import org.msgpack.rpc.transport.RpcMessageHandler; +//import org.msgpack.type.Value; // -// Copyright (C) 2010 FURUHASHI Sadayuki +//class MessageHandler extends SimpleChannelUpstreamHandler { +// private RpcMessageHandler handler; +// private ChannelAdaptor adaptor; // -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at +// MessageHandler(RpcMessageHandler handler) { +// this.handler = handler; +// } // -// http://www.apache.org/licenses/LICENSE-2.0 +// @Override +// public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) +// throws Exception { +// this.adaptor = new ChannelAdaptor(e.getChannel()); +// ctx.sendUpstream(e); +// } // -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// @Override +// public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { +// Object m = e.getMessage(); +// if (!(m instanceof Value)) { +// ctx.sendUpstream(e); +// return; +// } // -package org.msgpack.rpc.loop.netty; - -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.msgpack.rpc.transport.RpcMessageHandler; -import org.msgpack.type.Value; - -class MessageHandler extends SimpleChannelUpstreamHandler { - private RpcMessageHandler handler; - private ChannelAdaptor adaptor; - - MessageHandler(RpcMessageHandler handler) { - this.handler = handler; - } - - @Override - public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) - throws Exception { - this.adaptor = new ChannelAdaptor(e.getChannel()); - ctx.sendUpstream(e); - } - - @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { - Object m = e.getMessage(); - if (!(m instanceof Value)) { - ctx.sendUpstream(e); - return; - } - - Value msg = (Value) m; - handler.handleMessage(adaptor, msg); - } -} +// Value msg = (Value) m; +// handler.handleMessage(adaptor, msg); +// } +//} diff --git a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackDecoder.java b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackDecoder.java index 6a15beb..1d14a42 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackDecoder.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackDecoder.java @@ -1,54 +1,64 @@ +//// +//// MessagePack-RPC for Java +//// +//// Copyright (C) 2010 FURUHASHI Sadayuki +//// +//// Licensed under the Apache License, Version 2.0 (the "License"); +//// you may not use this file except in compliance with the License. +//// You may obtain a copy of the License at +//// +//// http://www.apache.org/licenses/LICENSE-2.0 +//// +//// Unless required by applicable law or agreed to in writing, software +//// distributed under the License is distributed on an "AS IS" BASIS, +//// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//// See the License for the specific language governing permissions and +//// limitations under the License. +//// +//package org.msgpack.rpc.loop.netty; // -// MessagePack-RPC for Java -// -// Copyright (C) 2010 FURUHASHI Sadayuki -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -package org.msgpack.rpc.loop.netty; - -import java.nio.ByteBuffer; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.handler.codec.oneone.OneToOneDecoder; -import org.msgpack.MessagePack; -import org.msgpack.type.Value; - -public class MessagePackDecoder extends OneToOneDecoder { - - MessagePack messagePack; - - public MessagePackDecoder(MessagePack messagePack) { - super(); - this.messagePack = messagePack; - } - - @Override - protected Object decode(ChannelHandlerContext ctx, Channel channel, - Object msg) throws Exception { - if (!(msg instanceof ChannelBuffer)) { - return msg; - } - - ChannelBuffer source = (ChannelBuffer) msg; - - ByteBuffer buffer = source.toByteBuffer(); - if (!buffer.hasRemaining()) { - return null; - } - - return messagePack.read(buffer.slice()); - } -} - +//import java.nio.ByteBuffer; +//import org.jboss.netty.channel.Channel; +//import org.jboss.netty.channel.ChannelHandlerContext; +//import org.jboss.netty.buffer.ChannelBuffer; +//import org.jboss.netty.handler.codec.oneone.OneToOneDecoder; +//import org.msgpack.MessagePack; +//import org.msgpack.type.Value; +// +//public class MessagePackDecoder extends OneToOneDecoder { +// +// MessagePack messagePack; +// +// public MessagePackDecoder(MessagePack messagePack) { +// super(); +// this.messagePack = messagePack; +// } +// +// @Override +// protected Object decode(ChannelHandlerContext ctx, Channel channel, +// Object msg) throws Exception { +// if (!(msg instanceof ChannelBuffer)) { +// return msg; +// } +// +// ChannelBuffer source = (ChannelBuffer) msg; +// +// ByteBuffer buffer = source.toByteBuffer(); +// if (!buffer.hasRemaining()) { +// return null; +// } +// +// byte[] bytes = buffer.array(); // FIXME buffer must has array +// int offset = buffer.arrayOffset() + buffer.position(); +// int length = buffer.arrayOffset() + buffer.limit(); +// +// Value v = messagePack.read(bytes, offset, length); +// return v; +// +// // TODO MessagePack.unpack() +// /* +// * Unpacker pac = new Unpacker(); pac.wrap(bytes, offset, length); +// * return pac.unpackObject(); +// */ +// } +//} diff --git a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackEncoder.java b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackEncoder.java index 2a8cc27..c2e3c95 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackEncoder.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackEncoder.java @@ -1,59 +1,59 @@ +//// +//// MessagePack-RPC for Java +//// +//// Copyright (C) 2010 FURUHASHI Sadayuki +//// +//// Licensed under the Apache License, Version 2.0 (the "License"); +//// you may not use this file except in compliance with the License. +//// You may obtain a copy of the License at +//// +//// http://www.apache.org/licenses/LICENSE-2.0 +//// +//// Unless required by applicable law or agreed to in writing, software +//// distributed under the License is distributed on an "AS IS" BASIS, +//// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//// See the License for the specific language governing permissions and +//// limitations under the License. +//// +//package org.msgpack.rpc.loop.netty; // -// MessagePack-RPC for Java -// -// Copyright (C) 2010 FURUHASHI Sadayuki -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -package org.msgpack.rpc.loop.netty; - -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBufferOutputStream; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; -import org.msgpack.MessagePack; - -public class MessagePackEncoder extends OneToOneEncoder { - private final int estimatedLength; - - private MessagePack messagePack; - - public MessagePackEncoder(MessagePack messagePack) { - this(1024, messagePack); - } - - public MessagePackEncoder(int estimatedLength, MessagePack messagePack) { - this.estimatedLength = estimatedLength; - this.messagePack = messagePack; - } - - @Override - protected Object encode(ChannelHandlerContext ctx, Channel channel, - Object msg) throws Exception { - if (msg instanceof ChannelBuffer) { - return msg; - } - - ChannelBufferOutputStream out = new ChannelBufferOutputStream( - ChannelBuffers.dynamicBuffer(estimatedLength, ctx.getChannel() - .getConfig().getBufferFactory())); - - // MessagePack.pack(out, msg); - messagePack.write(out, msg); - - ChannelBuffer result = out.buffer(); - return result; - } -} +//import org.jboss.netty.channel.Channel; +//import org.jboss.netty.channel.ChannelHandlerContext; +//import org.jboss.netty.buffer.ChannelBuffer; +//import org.jboss.netty.buffer.ChannelBufferOutputStream; +//import org.jboss.netty.buffer.ChannelBuffers; +//import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; +//import org.msgpack.MessagePack; +// +//public class MessagePackEncoder extends OneToOneEncoder { +// private final int estimatedLength; +// +// private MessagePack messagePack; +// +// public MessagePackEncoder(MessagePack messagePack) { +// this(1024, messagePack); +// } +// +// public MessagePackEncoder(int estimatedLength, MessagePack messagePack) { +// this.estimatedLength = estimatedLength; +// this.messagePack = messagePack; +// } +// +// @Override +// protected Object encode(ChannelHandlerContext ctx, Channel channel, +// Object msg) throws Exception { +// if (msg instanceof ChannelBuffer) { +// return msg; +// } +// +// ChannelBufferOutputStream out = new ChannelBufferOutputStream( +// ChannelBuffers.dynamicBuffer(estimatedLength, ctx.getChannel() +// .getConfig().getBufferFactory())); +// +// // MessagePack.pack(out, msg); +// messagePack.write(out, msg); +// +// ChannelBuffer result = out.buffer(); +// return result; +// } +//} diff --git a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackStreamDecoder.java b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackStreamDecoder.java index 6a0c82d..716f666 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackStreamDecoder.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackStreamDecoder.java @@ -1,63 +1,67 @@ +//// +//// MessagePack-RPC for Java +//// +//// Copyright (C) 2010 FURUHASHI Sadayuki +//// +//// Licensed under the Apache License, Version 2.0 (the "License"); +//// you may not use this file except in compliance with the License. +//// You may obtain a copy of the License at +//// +//// http://www.apache.org/licenses/LICENSE-2.0 +//// +//// Unless required by applicable law or agreed to in writing, software +//// distributed under the License is distributed on an "AS IS" BASIS, +//// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//// See the License for the specific language governing permissions and +//// limitations under the License. +//// +//package org.msgpack.rpc.loop.netty; // -// MessagePack-RPC for Java +//import java.io.ByteArrayInputStream; +//import java.io.EOFException; +//import java.io.IOException; +//import java.io.InputStream; +//import java.nio.ByteBuffer; +//import java.util.concurrent.atomic.AtomicInteger; // -// Copyright (C) 2010 FURUHASHI Sadayuki +//import org.jboss.netty.channel.Channel; +//import org.jboss.netty.channel.ChannelHandlerContext; +//import org.jboss.netty.buffer.ChannelBuffer; +//import org.jboss.netty.handler.codec.frame.FrameDecoder; +//import org.msgpack.MessagePack; +//import org.msgpack.type.Value; +//import org.msgpack.unpacker.Unpacker; // -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at +//public class MessagePackStreamDecoder extends FrameDecoder { +// protected MessagePack msgpack; // -// http://www.apache.org/licenses/LICENSE-2.0 +// public MessagePackStreamDecoder(MessagePack msgpack) { +// super(); +// this.msgpack = msgpack; +// } // -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// @Override +// protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer source) throws Exception { // -package org.msgpack.rpc.loop.netty; - -import java.io.ByteArrayInputStream; -import java.io.EOFException; -import java.nio.ByteBuffer; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.handler.codec.frame.FrameDecoder; -import org.msgpack.MessagePack; -import org.msgpack.type.Value; -import org.msgpack.unpacker.Unpacker; - -public class MessagePackStreamDecoder extends FrameDecoder { - protected MessagePack msgpack; - - public MessagePackStreamDecoder(MessagePack msgpack) { - super(); - this.msgpack = msgpack; - } - - @Override - protected Object decode(ChannelHandlerContext ctx, Channel channel, - ChannelBuffer source) throws Exception { - // TODO #MN will modify the body with MessagePackBufferUnpacker. - ByteBuffer buffer = source.toByteBuffer().duplicate(); - if (!buffer.hasRemaining()) { - return null; - } - source.markReaderIndex(); - - try{ - final int posBeforeUnpack = buffer.position(); - Unpacker unpacker = msgpack.createBufferUnpacker(buffer); - Value v = unpacker.readValue(); - source.skipBytes(buffer.position() - posBeforeUnpack); - return v; - } - catch( EOFException e ){ - // not enough buffers. - // So retry reading - source.resetReaderIndex(); - return null; - } - } -} +// // TODO #MN will modify the body with MessagePackBufferUnpacker. +// ByteBuffer buffer = source.toByteBuffer().slice(); +// if (!buffer.hasRemaining()) { +// return null; +// } +// source.markReaderIndex(); +// +// try{ +// Unpacker unpacker = msgpack.createBufferUnpacker(buffer); +// Value v = unpacker.readValue(); +// source.skipBytes(buffer.position()); +// return v; +// } +// catch( EOFException e ){ +// // not enough buffers. +// // So retry reading +// System.out.println("~~~waste~~~"); +// source.resetReaderIndex(); +// return null; +// } +// } +//} diff --git a/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoop.java b/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoop.java index cf71a25..f6c4308 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoop.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoop.java @@ -19,10 +19,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.channel.socket.ServerSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.msgpack.MessagePack; import org.msgpack.rpc.Session; import org.msgpack.rpc.Server; @@ -39,27 +35,6 @@ public NettyEventLoop(ExecutorService workerExecutor, super(workerExecutor, ioExecutor, scheduledExecutor, messagePack); } - private ClientSocketChannelFactory clientFactory = null; - private ServerSocketChannelFactory serverFactory = null; - - public synchronized ClientSocketChannelFactory getClientFactory() { - if (clientFactory == null) { - clientFactory = new NioClientSocketChannelFactory(getIoExecutor(), - getWorkerExecutor()); // TODO: workerCount - } - return clientFactory; - } - - public synchronized ServerSocketChannelFactory getServerFactory() { - if (serverFactory == null) { - serverFactory = new NioServerSocketChannelFactory(getIoExecutor(), - getWorkerExecutor()); // TODO: workerCount - // messages will be dispatched to worker thread on server. - // see useThread(true) in NettyTcpClientTransport(). - } - return serverFactory; - } - protected ClientTransport openTcpTransport(TcpClientConfig config, Session session) { return new NettyTcpClientTransport(config, session, this); @@ -67,6 +42,13 @@ protected ClientTransport openTcpTransport(TcpClientConfig config, protected ServerTransport listenTcpTransport(TcpServerConfig config, Server server) { - return new NettyTcpServerTransport(config, server, this); + + try{ + return new NettyTcpServerTransport(config, server, this); + } + catch(Exception ex){ + ex.printStackTrace(); + return null; + } } } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java index b9115fd..b159805 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java @@ -17,90 +17,152 @@ // package org.msgpack.rpc.loop.netty; -import java.util.Map; - -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.buffer.ChannelBufferOutputStream; -import org.jboss.netty.buffer.HeapChannelBufferFactory; -import org.jboss.netty.bootstrap.ClientBootstrap; +import java.io.EOFException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.MessageToByteEncoder; +import org.msgpack.MessagePack; +import org.msgpack.MessagePackable; import org.msgpack.rpc.Session; import org.msgpack.rpc.config.TcpClientConfig; import org.msgpack.rpc.transport.RpcMessageHandler; import org.msgpack.rpc.transport.PooledStreamClientTransport; +import org.msgpack.type.Value; +import org.msgpack.unpacker.Unpacker; + +class NettyTcpClientTransport extends PooledStreamClientTransport { -class NettyTcpClientTransport extends PooledStreamClientTransport { - private static final String TCP_NO_DELAY = "tcpNoDelay"; + private ByteBufOutputStream _bufOutput; + private final Bootstrap bootstrap; + private final ConcurrentLinkedQueue _channels; - private final ClientBootstrap bootstrap; + NettyTcpClientTransport(final TcpClientConfig config, + final Session session, + final NettyEventLoop loop) { - NettyTcpClientTransport(TcpClientConfig config, Session session, - NettyEventLoop loop) { // TODO check session.getAddress() instanceof IPAddress super(config, session); - RpcMessageHandler handler = new RpcMessageHandler(session); + final RpcMessageHandler handler = new RpcMessageHandler(session); + + final EventLoopGroup workerGroup = new NioEventLoopGroup(); + + bootstrap = new Bootstrap(); // (1) + bootstrap.group(workerGroup); // (2) + bootstrap.channel(NioSocketChannel.class); // (3) + bootstrap.option(ChannelOption.SO_KEEPALIVE, true); // (4) + bootstrap.handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast( + new MessagePackDecoder(loop.getMessagePack()), + new MessageHandler(handler), + new MessagePackEncoder(loop.getMessagePack()), + new MessagePackableEncoder(loop.getMessagePack())); + } + }); - bootstrap = new ClientBootstrap(loop.getClientFactory()); - bootstrap.setPipelineFactory(new StreamPipelineFactory(loop.getMessagePack(), handler)); - Map options = config.getOptions(); - setIfNotPresent(options, TCP_NO_DELAY, Boolean.TRUE, bootstrap); - bootstrap.setOptions(options); + _channels = new ConcurrentLinkedQueue(); } private final ChannelFutureListener connectListener = new ChannelFutureListener() { + public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { - onConnectFailed(future.getChannel(), future.getCause()); + onConnectFailed(future.channel(), future.cause()); return; } - Channel c = future.getChannel(); - c.getCloseFuture().addListener(closeListener); + + Channel c = future.channel(); + + c.closeFuture().addListener(closeListener); + onConnected(c); } }; private final ChannelFutureListener closeListener = new ChannelFutureListener() { + public void operationComplete(ChannelFuture future) throws Exception { - Channel c = future.getChannel(); - onClosed(c); + + System.out.println("[client transport] channel closed!!!"); + + onClosed(future.channel()); } }; @Override - protected void startConnection() { - ChannelFuture f = bootstrap.connect(session.getAddress().getSocketAddress()); - f.addListener(connectListener); + protected ChannelFuture startConnection() { + return bootstrap.connect(session.getAddress().getSocketAddress()).addListener(connectListener); + } + + @Override + public void sendMessage(final Object msg) { + + if(_channels.isEmpty()){ + + startConnection().addListener(new ChannelFutureListener() { + + public void operationComplete(ChannelFuture future) throws Exception { + + sendMessageChannel(future.channel(), msg); + } + }); + } + else{ + sendMessageChannel(_channels.poll(), msg); + } } @Override - protected ChannelBufferOutputStream newPendingBuffer() { - return new ChannelBufferOutputStream( - ChannelBuffers.dynamicBuffer(HeapChannelBufferFactory.getInstance())); + protected OutputStream newPendingBuffer() { + + return _bufOutput = new ByteBufOutputStream(UnpooledByteBufAllocator.DEFAULT.heapBuffer(1024 * 1024)); } @Override - protected void resetPendingBuffer(ChannelBufferOutputStream b) { - b.buffer().clear(); + protected void resetPendingBuffer(OutputStream b) { + _bufOutput.buffer().resetReaderIndex(); + _bufOutput.buffer().resetWriterIndex(); } @Override - protected void flushPendingBuffer(ChannelBufferOutputStream b, Channel c) { - Channels.write(c, b.buffer()); - b.buffer().clear(); + protected void flushPendingBuffer(OutputStream b, Channel c) { + c.write(_bufOutput.buffer()); + _bufOutput.buffer().resetReaderIndex(); + _bufOutput.buffer().resetWriterIndex(); } @Override - protected void closePendingBuffer(ChannelBufferOutputStream b) { - b.buffer().clear(); + protected void closePendingBuffer(OutputStream b) { + _bufOutput.buffer().resetReaderIndex(); + _bufOutput.buffer().resetWriterIndex(); } @Override - protected void sendMessageChannel(Channel c, Object msg) { - Channels.write(c, msg); + protected ChannelFuture sendMessageChannel(Channel c, Object msg) { + + System.out.println("[client transport] send message"); + + return c.writeAndFlush(msg).addListener(new ChannelFutureListener() { + + public void operationComplete(ChannelFuture future) throws Exception { + + _channels.offer(future.channel()); + } + }); } @Override @@ -108,10 +170,88 @@ protected void closeChannel(Channel c) { c.close(); } - private static void setIfNotPresent(Map options, - String key, Object value, ClientBootstrap bootstrap) { - if (!options.containsKey(key)) { - bootstrap.setOption(key, value); + static class MessagePackDecoder extends ByteToMessageDecoder { + + private final MessagePack _msgpack; + + public MessagePackDecoder(final MessagePack msgpack){ + _msgpack = msgpack; + } + + @Override + protected void decode(final ChannelHandlerContext channelHandlerContext, + final ByteBuf byteBuf, + final List out) throws Exception { + + System.out.printf("[client transport] decode got bytebuf\n"); + + final ByteBuffer buffer = byteBuf.markReaderIndex().nioBuffer(); + + try{ + Unpacker unpacker = _msgpack.createBufferUnpacker(buffer); + out.add(unpacker.readValue()); + + System.out.printf("[client transport] decode done\n"); + + byteBuf.skipBytes(buffer.position()); + } + catch( EOFException e ){ + + byteBuf.resetReaderIndex(); + } + } + } + + static class MessagePackEncoder extends MessageToByteEncoder { + + private final MessagePack _msgpack; + + public MessagePackEncoder(final MessagePack msgpack){ + _msgpack = msgpack; + } + + @Override + protected void encode(ChannelHandlerContext ctx, Value msg, ByteBuf out) throws Exception { + + System.out.println("[client transport] encoding msg of Value"); + + _msgpack.createPacker(new ByteBufOutputStream(out)).write(msg); + } + } + + static class MessagePackableEncoder extends MessageToByteEncoder { + + private final MessagePack _msgpack; + + public MessagePackableEncoder(final MessagePack msgpack){ + _msgpack = msgpack; + } + + @Override + protected void encode(ChannelHandlerContext ctx, MessagePackable msg, ByteBuf out) throws Exception { + + System.out.println("[client transport] encoding msg of Value from packable encoder"); + + msg.writeTo(_msgpack.createPacker(new ByteBufOutputStream(out))); + } + } + + static class MessageHandler extends ChannelInboundHandlerAdapter { + + private final RpcMessageHandler _rpcHandler; + + public MessageHandler(final RpcMessageHandler rpcHandler){ + _rpcHandler = rpcHandler; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + + System.out.printf("[client transport] message handler got msg: " + msg.getClass().getName()); + + Value value = (Value) msg; + + _rpcHandler.handleMessage(new ChannelAdaptor(ctx.channel()), value); } } } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java index cab6cfe..8f6235f 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java @@ -17,47 +17,159 @@ // package org.msgpack.rpc.loop.netty; -import java.util.Map; +import java.io.EOFException; +import java.nio.ByteBuffer; +import java.util.List; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.bootstrap.ServerBootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.MessageToByteEncoder; +import org.msgpack.MessagePack; +import org.msgpack.MessagePackable; import org.msgpack.rpc.Server; import org.msgpack.rpc.config.TcpServerConfig; import org.msgpack.rpc.transport.RpcMessageHandler; import org.msgpack.rpc.transport.ServerTransport; import org.msgpack.rpc.address.Address; +import org.msgpack.type.Value; +import org.msgpack.unpacker.Unpacker; class NettyTcpServerTransport implements ServerTransport { - private Channel listenChannel; - private final static String CHILD_TCP_NODELAY = "child.tcpNoDelay"; - private final static String REUSE_ADDRESS = "reuseAddress"; - NettyTcpServerTransport(TcpServerConfig config, Server server, NettyEventLoop loop) { + private ChannelFuture channelFuture; + + NettyTcpServerTransport(final TcpServerConfig config, + final Server server, + final NettyEventLoop loop) throws InterruptedException { + if (server == null) { throw new IllegalArgumentException("Server must not be null"); } Address address = config.getListenAddress(); - RpcMessageHandler handler = new RpcMessageHandler(server); + + final RpcMessageHandler handler = new RpcMessageHandler(server); handler.useThread(true); - ServerBootstrap bootstrap = new ServerBootstrap(loop.getServerFactory()); - bootstrap.setPipelineFactory(new StreamPipelineFactory(loop.getMessagePack(), handler)); - final Map options = config.getOptions(); - setIfNotPresent(options, CHILD_TCP_NODELAY, Boolean.TRUE, bootstrap); - setIfNotPresent(options, REUSE_ADDRESS, Boolean.TRUE, bootstrap); - bootstrap.setOptions(options); - this.listenChannel = bootstrap.bind(address.getSocketAddress()); + final EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) + final EventLoopGroup workerGroup = new NioEventLoopGroup(); + final ServerBootstrap b = new ServerBootstrap(); // (2) + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) // (3) + .childHandler(new ChannelInitializer() { // (4) + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast( + new MessagePackDecoder(loop.getMessagePack()), + new MessageHandler(handler), + new MessagePackEncoder(loop.getMessagePack()), + new MessagePackableEncoder(loop.getMessagePack())); + } + }) + .option(ChannelOption.SO_BACKLOG, 128) // (5) + .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) + + // Bind and start to accept incoming connections. + channelFuture = b.bind(address.getSocketAddress()).sync(); // (7) } - public void close() { - listenChannel.close(); + static class MessagePackDecoder extends ByteToMessageDecoder { + + private final MessagePack _msgpack; + + public MessagePackDecoder(final MessagePack msgpack){ + _msgpack = msgpack; + } + + @Override + protected void decode(final ChannelHandlerContext channelHandlerContext, + final ByteBuf byteBuf, + final List out) throws Exception { + + System.out.println("[server transport] got bytebuf to decode"); + + final ByteBuffer buffer = byteBuf.markReaderIndex().nioBuffer(); + + try{ + Unpacker unpacker = _msgpack.createBufferUnpacker(buffer); + + out.add(unpacker.readValue()); + + System.out.println("[server transport] feed value to the next"); + + //byteBuf.skipBytes(buffer.position()); + } + catch( EOFException e ){ + + System.out.println("[server transport] not enough bytebuf"); + + byteBuf.resetReaderIndex(); + } + } } - private static void setIfNotPresent(Map options, - String key, Object value, ServerBootstrap bootstrap) { - if (!options.containsKey(key)) { - bootstrap.setOption(key, value); + static class MessagePackEncoder extends MessageToByteEncoder { + + private final MessagePack _msgpack; + + public MessagePackEncoder(final MessagePack msgpack){ + _msgpack = msgpack; + } + + @Override + protected void encode(ChannelHandlerContext ctx, Value msg, ByteBuf out) throws Exception { + + System.out.println("[server transport] encoding msg of Value"); + + _msgpack.createPacker(new ByteBufOutputStream(out)).write(msg); } } + + static class MessagePackableEncoder extends MessageToByteEncoder { + + private final MessagePack _msgpack; + + public MessagePackableEncoder(final MessagePack msgpack){ + _msgpack = msgpack; + } + + @Override + protected void encode(ChannelHandlerContext ctx, MessagePackable msg, ByteBuf out) throws Exception { + + System.out.println("[server transport] encoding msg of Value from packable encoder"); + + msg.writeTo(_msgpack.createPacker(new ByteBufOutputStream(out))); + + System.out.println("[server transport] encoding msg of Value from packable encoder [written]"); + } + } + + static class MessageHandler extends ChannelInboundHandlerAdapter { + + private final RpcMessageHandler _rpcHandler; + + public MessageHandler(final RpcMessageHandler rpcHandler){ + _rpcHandler = rpcHandler; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + + System.out.println("[server transport] got message decoded: " + msg.getClass().getName()); + + Value value = (Value) msg; + + _rpcHandler.handleMessage(new ChannelAdaptor(ctx.channel()), value); + } + } + + public void close() { + channelFuture.channel().close(); + } } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/StreamPipelineFactory.java b/src/main/java/org/msgpack/rpc/loop/netty/StreamPipelineFactory.java index 64a1cae..205ab13 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/StreamPipelineFactory.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/StreamPipelineFactory.java @@ -1,42 +1,42 @@ +//// +//// MessagePack-RPC for Java +//// +//// Copyright (C) 2010 FURUHASHI Sadayuki +//// +//// Licensed under the Apache License, Version 2.0 (the "License"); +//// you may not use this file except in compliance with the License. +//// You may obtain a copy of the License at +//// +//// http://www.apache.org/licenses/LICENSE-2.0 +//// +//// Unless required by applicable law or agreed to in writing, software +//// distributed under the License is distributed on an "AS IS" BASIS, +//// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//// See the License for the specific language governing permissions and +//// limitations under the License. +//// +//package org.msgpack.rpc.loop.netty; // -// MessagePack-RPC for Java +//import org.jboss.netty.channel.Channels; +//import org.jboss.netty.channel.ChannelPipeline; +//import org.jboss.netty.channel.ChannelPipelineFactory; +//import org.msgpack.MessagePack; +//import org.msgpack.rpc.transport.RpcMessageHandler; // -// Copyright (C) 2010 FURUHASHI Sadayuki +//class StreamPipelineFactory implements ChannelPipelineFactory { +// private RpcMessageHandler handler; +// private MessagePack messagePack; // -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at +// StreamPipelineFactory(MessagePack messagePack, RpcMessageHandler handler) { +// this.handler = handler; +// this.messagePack = messagePack; +// } // -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -package org.msgpack.rpc.loop.netty; - -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.msgpack.MessagePack; -import org.msgpack.rpc.transport.RpcMessageHandler; - -class StreamPipelineFactory implements ChannelPipelineFactory { - private RpcMessageHandler handler; - private MessagePack messagePack; - - StreamPipelineFactory(MessagePack messagePack, RpcMessageHandler handler) { - this.handler = handler; - this.messagePack = messagePack; - } - - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline p = Channels.pipeline(); - p.addLast("msgpack-decode-stream", new MessagePackStreamDecoder(messagePack)); - p.addLast("msgpack-encode", new MessagePackEncoder(messagePack)); - p.addLast("message", new MessageHandler(handler)); - return p; - } -} +// public ChannelPipeline getPipeline() throws Exception { +// ChannelPipeline p = Channels.pipeline(); +// p.addLast("msgpack-decode-stream", new MessagePackStreamDecoder(messagePack)); +// p.addLast("msgpack-encode", new MessagePackEncoder(messagePack)); +// p.addLast("message", new MessageHandler(handler)); +// return p; +// } +//} diff --git a/src/main/java/org/msgpack/rpc/reflect/ReflectionProxyBuilder.java b/src/main/java/org/msgpack/rpc/reflect/ReflectionProxyBuilder.java index 8d81c17..50d6c27 100644 --- a/src/main/java/org/msgpack/rpc/reflect/ReflectionProxyBuilder.java +++ b/src/main/java/org/msgpack/rpc/reflect/ReflectionProxyBuilder.java @@ -23,7 +23,6 @@ import java.lang.reflect.*; import org.msgpack.rpc.*; import org.msgpack.*; -import org.msgpack.rpc.loop.netty.MessagePackEncoder; import org.msgpack.template.*; import org.msgpack.type.Value; import org.msgpack.unpacker.Converter; diff --git a/src/main/java/org/msgpack/rpc/transport/ClientTransport.java b/src/main/java/org/msgpack/rpc/transport/ClientTransport.java index f37abcf..5c86e6d 100644 --- a/src/main/java/org/msgpack/rpc/transport/ClientTransport.java +++ b/src/main/java/org/msgpack/rpc/transport/ClientTransport.java @@ -20,6 +20,7 @@ import java.io.Closeable; public interface ClientTransport extends Closeable, MessageSendable { + public void sendMessage(Object obj); public void close(); diff --git a/src/main/java/org/msgpack/rpc/transport/MessageSendable.java b/src/main/java/org/msgpack/rpc/transport/MessageSendable.java index c09f23b..969e069 100644 --- a/src/main/java/org/msgpack/rpc/transport/MessageSendable.java +++ b/src/main/java/org/msgpack/rpc/transport/MessageSendable.java @@ -18,5 +18,7 @@ package org.msgpack.rpc.transport; public interface MessageSendable { + public void sendMessage(Object obj); + } diff --git a/src/main/java/org/msgpack/rpc/transport/PooledStreamClientTransport.java b/src/main/java/org/msgpack/rpc/transport/PooledStreamClientTransport.java index 6a3e029..90d32d3 100644 --- a/src/main/java/org/msgpack/rpc/transport/PooledStreamClientTransport.java +++ b/src/main/java/org/msgpack/rpc/transport/PooledStreamClientTransport.java @@ -22,8 +22,9 @@ import java.util.ArrayList; import java.util.List; -import org.jboss.netty.logging.InternalLogger; -import org.jboss.netty.logging.InternalLoggerFactory; +import io.netty.channel.ChannelFuture; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; import org.msgpack.rpc.Session; import org.msgpack.rpc.config.StreamClientConfig; import org.msgpack.MessagePack; @@ -71,9 +72,11 @@ public void sendMessage(Object msg) { if (pool.isEmpty()) { // may be already connected try { messagePack.write(getPendingBuffer(), msg); - } catch (IOException e) { + } + catch (IOException e) { // FIXME } + flushPendingBuffer(getPendingBuffer(), pool.get(0)); return; } } @@ -171,9 +174,9 @@ protected PendingBuffer getPendingBuffer() { protected abstract void closePendingBuffer(PendingBuffer b); - protected abstract void startConnection(); + protected abstract ChannelFuture startConnection(); - protected abstract void sendMessageChannel(Channel c, Object msg); + protected abstract ChannelFuture sendMessageChannel(Channel c, Object msg); protected abstract void closeChannel(Channel c); } diff --git a/src/main/java/org/msgpack/rpc/transport/RpcMessageHandler.java b/src/main/java/org/msgpack/rpc/transport/RpcMessageHandler.java index f26a5c9..15600c2 100644 --- a/src/main/java/org/msgpack/rpc/transport/RpcMessageHandler.java +++ b/src/main/java/org/msgpack/rpc/transport/RpcMessageHandler.java @@ -24,6 +24,7 @@ import org.msgpack.rpc.loop.EventLoop; public class RpcMessageHandler { + protected final Session session; protected final Server server; protected final EventLoop loop; @@ -42,7 +43,8 @@ public RpcMessageHandler(Session session, Server server) { this.server = server; if (session == null) { this.loop = server.getEventLoop(); - } else { + } + else { this.loop = session.getEventLoop(); } } @@ -77,6 +79,7 @@ public void handleMessage(MessageSendable channel, Value msg) { } private void handleMessageImpl(MessageSendable channel, Value msg) { + Value[] array = msg.asArrayValue().getElementArray(); // TODO check array.length @@ -88,20 +91,23 @@ private void handleMessageImpl(MessageSendable channel, Value msg) { Value args = array[3]; handleRequest(channel, msgid, method, args); - } else if (type == Messages.RESPONSE) { + } + else if (type == Messages.RESPONSE) { // RESPONSE int msgid = array[1].asIntegerValue().getInt(); Value error = array[2]; Value result = array[3]; handleResponse(channel, msgid, result, error); - } else if (type == Messages.NOTIFY) { + } + else if (type == Messages.NOTIFY) { // NOTIFY String method = array[1].asRawValue().getString(); Value args = array[2]; handleNotify(channel, method, args); - } else { + } + else { // FIXME error result throw new RuntimeException("unknown message type: " + type); } diff --git a/src/main/java/org/msgpack/rpc/transport/ServerTransport.java b/src/main/java/org/msgpack/rpc/transport/ServerTransport.java index 6dfc247..ec7cc80 100644 --- a/src/main/java/org/msgpack/rpc/transport/ServerTransport.java +++ b/src/main/java/org/msgpack/rpc/transport/ServerTransport.java @@ -20,5 +20,7 @@ import java.io.Closeable; public interface ServerTransport extends Closeable { + public void close(); + } diff --git a/src/test/java/org/msgpack/rpc/BigDataTest.java b/src/test/java/org/msgpack/rpc/BigDataTest.java index 383afa6..9c9949f 100644 --- a/src/test/java/org/msgpack/rpc/BigDataTest.java +++ b/src/test/java/org/msgpack/rpc/BigDataTest.java @@ -21,9 +21,13 @@ private static String getBigString() { } private static Value BIG_DATA = ValueFactory.createRawValue(getBigString()); + public static class BigDataDispatcher implements Dispatcher { - public void dispatch(Request request) { - assertEquals(BIG_DATA,request.getArguments().asArrayValue().get(0) ); + + public void dispatch(Request request) { + + assertEquals(BIG_DATA, request.getArguments().asArrayValue().get(0) ); + request.sendResult(BIG_DATA); } } @@ -41,7 +45,7 @@ public void testSyncBigDataLoad() throws Exception { svr.serve(new BigDataDispatcher()); svr.listen(19851); - int num = 5; + int num = 1; long start = System.currentTimeMillis(); for(int i=0; i < num; i++) { @@ -53,40 +57,43 @@ public void testSyncBigDataLoad() throws Exception { double result = num / ((double)(finish - start) / 1000); System.out.println("sync: "+result+" calls per sec"); - } finally { + } + finally { svr.close(); c.close(); loop.shutdown(); } } - @Test - public void testAsyncBigDataLoad() throws Exception { - EventLoop loop = EventLoop.start(); - Server svr = new Server(loop); - Client c = new Client("127.0.0.1", 19852, loop); - c.setRequestTimeout(100);// - - try { - svr.serve(new BigDataDispatcher()); - svr.listen(19852); - - int num = 10; - long start = System.currentTimeMillis(); - for(int i=0; i < num-1; i++) { - c.notifyApply("test", new Object[]{BIG_DATA}); - } - c.callApply("test", new Object[]{BIG_DATA}); - long finish = System.currentTimeMillis(); - - double result = num / ((double)(finish - start) / 1000); - System.out.println("async: "+result+" calls per sec"); - - } finally { - svr.close(); - c.close(); - loop.shutdown(); - } - } +// @Ignore +// @Test +// public void testAsyncBigDataLoad() throws Exception { +// EventLoop loop = EventLoop.start(); +// Server svr = new Server(loop); +// Client c = new Client("127.0.0.1", 19852, loop); +// c.setRequestTimeout(100);// +// +// try { +// svr.serve(new BigDataDispatcher()); +// svr.listen(19852); +// +// int num = 10; +// +// long start = System.currentTimeMillis(); +// for(int i=0; i < num-1; i++) { +// c.notifyApply("test", new Object[]{BIG_DATA}); +// } +// c.callApply("test", new Object[]{BIG_DATA}); +// long finish = System.currentTimeMillis(); +// +// double result = num / ((double)(finish - start) / 1000); +// System.out.println("async: "+result+" calls per sec"); +// +// } finally { +// svr.close(); +// c.close(); +// loop.shutdown(); +// } +// } } From 60b699b0a58a9d2e99450f8fd4bc6c90fa384377 Mon Sep 17 00:00:00 2001 From: huzhou Date: Wed, 22 Jan 2014 23:22:11 -0800 Subject: [PATCH 4/8] ~ netty 4.0 working!!! --- .gitignore | 5 +- pom.xml | 2 +- settings.xml | 4 + src/main/java/org/msgpack/rpc/Request.java | 5 +- .../loop/netty/NettyTcpClientTransport.java | 106 +++++++----------- .../loop/netty/NettyTcpServerTransport.java | 37 ++++-- .../java/org/msgpack/rpc/BigDataTest.java | 69 ++++++------ 7 files changed, 113 insertions(+), 115 deletions(-) create mode 100644 settings.xml diff --git a/.gitignore b/.gitignore index cf7b324..3b7bf2e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,6 @@ -.gitignore target .project .classpath .settings -.iml -*~ +*.iml +.idea \ No newline at end of file diff --git a/pom.xml b/pom.xml index f06c4f4..18f01a0 100644 --- a/pom.xml +++ b/pom.xml @@ -111,7 +111,7 @@ io.netty netty-all - 4.0.15.Final + 4.0.14.Final javax.servlet diff --git a/settings.xml b/settings.xml new file mode 100644 index 0000000..616c3f4 --- /dev/null +++ b/settings.xml @@ -0,0 +1,4 @@ + + + /Users/huzhou/maven.repo + \ No newline at end of file diff --git a/src/main/java/org/msgpack/rpc/Request.java b/src/main/java/org/msgpack/rpc/Request.java index 2e3dcfe..8ef4759 100644 --- a/src/main/java/org/msgpack/rpc/Request.java +++ b/src/main/java/org/msgpack/rpc/Request.java @@ -22,7 +22,9 @@ import org.msgpack.rpc.transport.MessageSendable; public class Request implements Callback { + private MessageSendable channel; // TODO #SF synchronized? + private int msgid; private String method; private Value args; @@ -66,12 +68,11 @@ public void sendError(Object error, Object data) { } public synchronized void sendResponse(Object result, Object error) { + if (channel == null) { return; } - System.out.println("[response] to request:" + msgid); - ResponseMessage msg = new ResponseMessage(msgid, error, result); channel.sendMessage(msg); channel = null; diff --git a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java index b159805..b20aa05 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java @@ -37,14 +37,17 @@ import org.msgpack.MessagePackable; import org.msgpack.rpc.Session; import org.msgpack.rpc.config.TcpClientConfig; +import org.msgpack.rpc.transport.ClientTransport; import org.msgpack.rpc.transport.RpcMessageHandler; import org.msgpack.rpc.transport.PooledStreamClientTransport; import org.msgpack.type.Value; import org.msgpack.unpacker.Unpacker; -class NettyTcpClientTransport extends PooledStreamClientTransport { +class NettyTcpClientTransport implements ClientTransport { private ByteBufOutputStream _bufOutput; + + private final Session _session; private final Bootstrap bootstrap; private final ConcurrentLinkedQueue _channels; @@ -53,8 +56,6 @@ class NettyTcpClientTransport extends PooledStreamClientTransport(); } - private final ChannelFutureListener connectListener = new ChannelFutureListener() { - - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - onConnectFailed(future.channel(), future.cause()); - return; - } - - Channel c = future.channel(); - - c.closeFuture().addListener(closeListener); - - onConnected(c); - } - }; - - private final ChannelFutureListener closeListener = new ChannelFutureListener() { - - public void operationComplete(ChannelFuture future) throws Exception { - - System.out.println("[client transport] channel closed!!!"); - - onClosed(future.channel()); - } - }; - - @Override protected ChannelFuture startConnection() { - return bootstrap.connect(session.getAddress().getSocketAddress()).addListener(connectListener); + return bootstrap.connect(_session.getAddress().getSocketAddress()); } - @Override public void sendMessage(final Object msg) { if(_channels.isEmpty()){ @@ -126,50 +100,31 @@ public void operationComplete(ChannelFuture future) throws Exception { } } - @Override - protected OutputStream newPendingBuffer() { - - return _bufOutput = new ByteBufOutputStream(UnpooledByteBufAllocator.DEFAULT.heapBuffer(1024 * 1024)); - } + public void close(){ - @Override - protected void resetPendingBuffer(OutputStream b) { - _bufOutput.buffer().resetReaderIndex(); - _bufOutput.buffer().resetWriterIndex(); - } + System.out.println("[client transport] closing channels:" + _channels.size()); - @Override - protected void flushPendingBuffer(OutputStream b, Channel c) { - c.write(_bufOutput.buffer()); - _bufOutput.buffer().resetReaderIndex(); - _bufOutput.buffer().resetWriterIndex(); - } + while(!_channels.isEmpty()){ + _channels.poll().close(); + } - @Override - protected void closePendingBuffer(OutputStream b) { - _bufOutput.buffer().resetReaderIndex(); - _bufOutput.buffer().resetWriterIndex(); } - @Override protected ChannelFuture sendMessageChannel(Channel c, Object msg) { - System.out.println("[client transport] send message"); + //System.out.println("[client transport] send message"); return c.writeAndFlush(msg).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { + //System.out.println("[client transport] message sent!!!"); + _channels.offer(future.channel()); } }); } - @Override - protected void closeChannel(Channel c) { - c.close(); - } - static class MessagePackDecoder extends ByteToMessageDecoder { private final MessagePack _msgpack; @@ -183,20 +138,22 @@ protected void decode(final ChannelHandlerContext channelHandlerContext, final ByteBuf byteBuf, final List out) throws Exception { - System.out.printf("[client transport] decode got bytebuf\n"); + //System.out.printf("[client transport] decode got bytebuf\n"); - final ByteBuffer buffer = byteBuf.markReaderIndex().nioBuffer(); + final ByteBuffer buffer = byteBuf.markReaderIndex().nioBuffer().slice(); try{ Unpacker unpacker = _msgpack.createBufferUnpacker(buffer); out.add(unpacker.readValue()); - System.out.printf("[client transport] decode done\n"); + //System.out.printf("[client transport] decode done\n"); byteBuf.skipBytes(buffer.position()); } catch( EOFException e ){ + //System.out.println("[client transport] not enough bytebuf"); + byteBuf.resetReaderIndex(); } } @@ -213,9 +170,11 @@ public MessagePackEncoder(final MessagePack msgpack){ @Override protected void encode(ChannelHandlerContext ctx, Value msg, ByteBuf out) throws Exception { - System.out.println("[client transport] encoding msg of Value"); + //System.out.println("[client transport] encoding msg of Value"); _msgpack.createPacker(new ByteBufOutputStream(out)).write(msg); + + ctx.flush(); } } @@ -230,10 +189,20 @@ public MessagePackableEncoder(final MessagePack msgpack){ @Override protected void encode(ChannelHandlerContext ctx, MessagePackable msg, ByteBuf out) throws Exception { - System.out.println("[client transport] encoding msg of Value from packable encoder"); + //System.out.println("[client transport] encoding msg of Value from packable encoder"); msg.writeTo(_msgpack.createPacker(new ByteBufOutputStream(out))); } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + + super.write(ctx, msg, promise); + + ctx.flush(); + + //System.out.println("[server transport] encoding msg of Value from packable encoder [flushed]"); + } } static class MessageHandler extends ChannelInboundHandlerAdapter { @@ -247,11 +216,18 @@ public MessageHandler(final RpcMessageHandler rpcHandler){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { - System.out.printf("[client transport] message handler got msg: " + msg.getClass().getName()); + //System.out.printf("[client transport] message handler got msg: " + msg.getClass().getName()); Value value = (Value) msg; _rpcHandler.handleMessage(new ChannelAdaptor(ctx.channel()), value); } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, Throwable cause) throws Exception { + + cause.printStackTrace(); + ctx.close(); + } } } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java index 8f6235f..db00a4a 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java @@ -92,22 +92,22 @@ protected void decode(final ChannelHandlerContext channelHandlerContext, final ByteBuf byteBuf, final List out) throws Exception { - System.out.println("[server transport] got bytebuf to decode"); + //System.out.println("[server transport] got bytebuf to decode"); - final ByteBuffer buffer = byteBuf.markReaderIndex().nioBuffer(); + final ByteBuffer buffer = byteBuf.markReaderIndex().nioBuffer().slice(); try{ Unpacker unpacker = _msgpack.createBufferUnpacker(buffer); out.add(unpacker.readValue()); - System.out.println("[server transport] feed value to the next"); + //System.out.println("[server transport] feed value to the next"); - //byteBuf.skipBytes(buffer.position()); + byteBuf.skipBytes(buffer.position()); } catch( EOFException e ){ - System.out.println("[server transport] not enough bytebuf"); + //System.out.println("[server transport] not enough bytebuf"); byteBuf.resetReaderIndex(); } @@ -125,9 +125,11 @@ public MessagePackEncoder(final MessagePack msgpack){ @Override protected void encode(ChannelHandlerContext ctx, Value msg, ByteBuf out) throws Exception { - System.out.println("[server transport] encoding msg of Value"); + //System.out.println("[server transport] encoding msg of Value"); _msgpack.createPacker(new ByteBufOutputStream(out)).write(msg); + + ctx.flush(); } } @@ -142,11 +144,21 @@ public MessagePackableEncoder(final MessagePack msgpack){ @Override protected void encode(ChannelHandlerContext ctx, MessagePackable msg, ByteBuf out) throws Exception { - System.out.println("[server transport] encoding msg of Value from packable encoder"); + //System.out.println("[server transport] encoding msg of Value from packable encoder"); msg.writeTo(_msgpack.createPacker(new ByteBufOutputStream(out))); - System.out.println("[server transport] encoding msg of Value from packable encoder [written]"); + //System.out.println("[server transport] encoding msg of Value from packable encoder [written]"); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + + super.write(ctx, msg, promise); + + ctx.flush(); + + //System.out.println("[server transport] encoding msg of Value from packable encoder [flushed]"); } } @@ -161,12 +173,19 @@ public MessageHandler(final RpcMessageHandler rpcHandler){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { - System.out.println("[server transport] got message decoded: " + msg.getClass().getName()); + //System.out.println("[server transport] got message decoded: " + msg.getClass().getName()); Value value = (Value) msg; _rpcHandler.handleMessage(new ChannelAdaptor(ctx.channel()), value); } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, Throwable cause) throws Exception { + + cause.printStackTrace(); + ctx.close(); + } } public void close() { diff --git a/src/test/java/org/msgpack/rpc/BigDataTest.java b/src/test/java/org/msgpack/rpc/BigDataTest.java index 9c9949f..74ea827 100644 --- a/src/test/java/org/msgpack/rpc/BigDataTest.java +++ b/src/test/java/org/msgpack/rpc/BigDataTest.java @@ -12,9 +12,9 @@ public class BigDataTest extends TestCase { private static String getBigString() { - StringBuilder sb = new StringBuilder(1024 * 1024); // 1M + StringBuilder sb = new StringBuilder(1024); // 1M Random random = new Random(); - for(int i = 0;i < 1024 * 1024;i++){ + for(int i = 0;i < 1024;i++){ sb.append( (char)('a' + random.nextInt(26))); } return sb.toString(); @@ -34,18 +34,18 @@ public void dispatch(Request request) { @Test public void testSyncBigDataLoad() throws Exception { + MessagePack messagePack = new MessagePack(); EventLoop loop = EventLoop.start(messagePack); Server svr = new Server(loop); Client c = new Client("127.0.0.1", 19851, loop); - c.setRequestTimeout(10); - + c.setRequestTimeout(100); try { svr.serve(new BigDataDispatcher()); svr.listen(19851); - int num = 1; + int num = 5; long start = System.currentTimeMillis(); for(int i=0; i < num; i++) { @@ -65,35 +65,34 @@ public void testSyncBigDataLoad() throws Exception { } } -// @Ignore -// @Test -// public void testAsyncBigDataLoad() throws Exception { -// EventLoop loop = EventLoop.start(); -// Server svr = new Server(loop); -// Client c = new Client("127.0.0.1", 19852, loop); -// c.setRequestTimeout(100);// -// -// try { -// svr.serve(new BigDataDispatcher()); -// svr.listen(19852); -// -// int num = 10; -// -// long start = System.currentTimeMillis(); -// for(int i=0; i < num-1; i++) { -// c.notifyApply("test", new Object[]{BIG_DATA}); -// } -// c.callApply("test", new Object[]{BIG_DATA}); -// long finish = System.currentTimeMillis(); -// -// double result = num / ((double)(finish - start) / 1000); -// System.out.println("async: "+result+" calls per sec"); -// -// } finally { -// svr.close(); -// c.close(); -// loop.shutdown(); -// } -// } + @Test + public void testAsyncBigDataLoad() throws Exception { + EventLoop loop = EventLoop.start(); + Server svr = new Server(loop); + Client c = new Client("127.0.0.1", 19852, loop); + c.setRequestTimeout(100);// + + try { + svr.serve(new BigDataDispatcher()); + svr.listen(19852); + + int num = 10; + + long start = System.currentTimeMillis(); + for(int i=0; i < num-1; i++) { + c.notifyApply("test", new Object[]{BIG_DATA}); + } + c.callApply("test", new Object[]{BIG_DATA}); + long finish = System.currentTimeMillis(); + + double result = num / ((double)(finish - start) / 1000); + System.out.println("async: "+result+" calls per sec"); + + } finally { + svr.close(); + c.close(); + loop.shutdown(); + } + } } From a3112d13c7dacb8e0a58bc47f5b43ffca1941b8f Mon Sep 17 00:00:00 2001 From: huz Date: Thu, 23 Jan 2014 01:25:58 -0800 Subject: [PATCH 5/8] refactorings --- .../rpc/loop/netty/MessageHandler.java | 81 ++++-------- .../rpc/loop/netty/MessagePackDecoder.java | 104 ++++++--------- .../rpc/loop/netty/MessagePackEncoder.java | 84 ++++-------- .../loop/netty/MessagePackStreamDecoder.java | 67 ---------- .../loop/netty/MessagePackableEncoder.java | 32 +++++ .../loop/netty/NettyTcpClientTransport.java | 121 ------------------ .../loop/netty/NettyTcpServerTransport.java | 121 ------------------ .../rpc/loop/netty/StreamPipelineFactory.java | 42 ------ .../java/org/msgpack/rpc/BigDataTest.java | 20 ++- 9 files changed, 138 insertions(+), 534 deletions(-) delete mode 100644 src/main/java/org/msgpack/rpc/loop/netty/MessagePackStreamDecoder.java create mode 100644 src/main/java/org/msgpack/rpc/loop/netty/MessagePackableEncoder.java delete mode 100644 src/main/java/org/msgpack/rpc/loop/netty/StreamPipelineFactory.java diff --git a/src/main/java/org/msgpack/rpc/loop/netty/MessageHandler.java b/src/main/java/org/msgpack/rpc/loop/netty/MessageHandler.java index 6c321a5..56df020 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/MessageHandler.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/MessageHandler.java @@ -1,53 +1,28 @@ -//// -//// MessagePack-RPC for Java -//// -//// Copyright (C) 2010 FURUHASHI Sadayuki -//// -//// Licensed under the Apache License, Version 2.0 (the "License"); -//// you may not use this file except in compliance with the License. -//// You may obtain a copy of the License at -//// -//// http://www.apache.org/licenses/LICENSE-2.0 -//// -//// Unless required by applicable law or agreed to in writing, software -//// distributed under the License is distributed on an "AS IS" BASIS, -//// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -//// See the License for the specific language governing permissions and -//// limitations under the License. -//// -//package org.msgpack.rpc.loop.netty; -// -//import org.jboss.netty.channel.ChannelHandlerContext; -//import org.jboss.netty.channel.ChannelStateEvent; -//import org.jboss.netty.channel.MessageEvent; -//import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -//import org.msgpack.rpc.transport.RpcMessageHandler; -//import org.msgpack.type.Value; -// -//class MessageHandler extends SimpleChannelUpstreamHandler { -// private RpcMessageHandler handler; -// private ChannelAdaptor adaptor; -// -// MessageHandler(RpcMessageHandler handler) { -// this.handler = handler; -// } -// -// @Override -// public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) -// throws Exception { -// this.adaptor = new ChannelAdaptor(e.getChannel()); -// ctx.sendUpstream(e); -// } -// -// @Override -// public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { -// Object m = e.getMessage(); -// if (!(m instanceof Value)) { -// ctx.sendUpstream(e); -// return; -// } -// -// Value msg = (Value) m; -// handler.handleMessage(adaptor, msg); -// } -//} +package org.msgpack.rpc.loop.netty; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import org.msgpack.rpc.transport.RpcMessageHandler; +import org.msgpack.type.Value; + +class MessageHandler extends ChannelInboundHandlerAdapter { + + private final RpcMessageHandler _rpcHandler; + + public MessageHandler(final RpcMessageHandler rpcHandler){ + _rpcHandler = rpcHandler; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + + _rpcHandler.handleMessage(new ChannelAdaptor(ctx.channel()), (Value) msg); + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, Throwable cause) throws Exception { + + cause.printStackTrace(); + ctx.close(); + } +} \ No newline at end of file diff --git a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackDecoder.java b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackDecoder.java index 1d14a42..8d34b90 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackDecoder.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackDecoder.java @@ -1,64 +1,40 @@ -//// -//// MessagePack-RPC for Java -//// -//// Copyright (C) 2010 FURUHASHI Sadayuki -//// -//// Licensed under the Apache License, Version 2.0 (the "License"); -//// you may not use this file except in compliance with the License. -//// You may obtain a copy of the License at -//// -//// http://www.apache.org/licenses/LICENSE-2.0 -//// -//// Unless required by applicable law or agreed to in writing, software -//// distributed under the License is distributed on an "AS IS" BASIS, -//// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -//// See the License for the specific language governing permissions and -//// limitations under the License. -//// -//package org.msgpack.rpc.loop.netty; -// -//import java.nio.ByteBuffer; -//import org.jboss.netty.channel.Channel; -//import org.jboss.netty.channel.ChannelHandlerContext; -//import org.jboss.netty.buffer.ChannelBuffer; -//import org.jboss.netty.handler.codec.oneone.OneToOneDecoder; -//import org.msgpack.MessagePack; -//import org.msgpack.type.Value; -// -//public class MessagePackDecoder extends OneToOneDecoder { -// -// MessagePack messagePack; -// -// public MessagePackDecoder(MessagePack messagePack) { -// super(); -// this.messagePack = messagePack; -// } -// -// @Override -// protected Object decode(ChannelHandlerContext ctx, Channel channel, -// Object msg) throws Exception { -// if (!(msg instanceof ChannelBuffer)) { -// return msg; -// } -// -// ChannelBuffer source = (ChannelBuffer) msg; -// -// ByteBuffer buffer = source.toByteBuffer(); -// if (!buffer.hasRemaining()) { -// return null; -// } -// -// byte[] bytes = buffer.array(); // FIXME buffer must has array -// int offset = buffer.arrayOffset() + buffer.position(); -// int length = buffer.arrayOffset() + buffer.limit(); -// -// Value v = messagePack.read(bytes, offset, length); -// return v; -// -// // TODO MessagePack.unpack() -// /* -// * Unpacker pac = new Unpacker(); pac.wrap(bytes, offset, length); -// * return pac.unpackObject(); -// */ -// } -//} +package org.msgpack.rpc.loop.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import org.msgpack.MessagePack; +import org.msgpack.unpacker.Unpacker; + +import java.io.EOFException; +import java.nio.ByteBuffer; +import java.util.List; + +class MessagePackDecoder extends ByteToMessageDecoder { + + private final MessagePack _msgpack; + + public MessagePackDecoder(final MessagePack msgpack){ + _msgpack = msgpack; + } + + @Override + protected void decode(final ChannelHandlerContext channelHandlerContext, + final ByteBuf byteBuf, + final List out) throws Exception { + + final ByteBuffer buffer = byteBuf.markReaderIndex().nioBuffer().slice(); + + try{ + Unpacker unpacker = _msgpack.createBufferUnpacker(buffer); + + out.add(unpacker.readValue()); + + byteBuf.skipBytes(buffer.position()); + } + catch(EOFException e){ + + byteBuf.resetReaderIndex(); + } + } +} \ No newline at end of file diff --git a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackEncoder.java b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackEncoder.java index c2e3c95..c1fb1cf 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackEncoder.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackEncoder.java @@ -1,59 +1,25 @@ -//// -//// MessagePack-RPC for Java -//// -//// Copyright (C) 2010 FURUHASHI Sadayuki -//// -//// Licensed under the Apache License, Version 2.0 (the "License"); -//// you may not use this file except in compliance with the License. -//// You may obtain a copy of the License at -//// -//// http://www.apache.org/licenses/LICENSE-2.0 -//// -//// Unless required by applicable law or agreed to in writing, software -//// distributed under the License is distributed on an "AS IS" BASIS, -//// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -//// See the License for the specific language governing permissions and -//// limitations under the License. -//// -//package org.msgpack.rpc.loop.netty; -// -//import org.jboss.netty.channel.Channel; -//import org.jboss.netty.channel.ChannelHandlerContext; -//import org.jboss.netty.buffer.ChannelBuffer; -//import org.jboss.netty.buffer.ChannelBufferOutputStream; -//import org.jboss.netty.buffer.ChannelBuffers; -//import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; -//import org.msgpack.MessagePack; -// -//public class MessagePackEncoder extends OneToOneEncoder { -// private final int estimatedLength; -// -// private MessagePack messagePack; -// -// public MessagePackEncoder(MessagePack messagePack) { -// this(1024, messagePack); -// } -// -// public MessagePackEncoder(int estimatedLength, MessagePack messagePack) { -// this.estimatedLength = estimatedLength; -// this.messagePack = messagePack; -// } -// -// @Override -// protected Object encode(ChannelHandlerContext ctx, Channel channel, -// Object msg) throws Exception { -// if (msg instanceof ChannelBuffer) { -// return msg; -// } -// -// ChannelBufferOutputStream out = new ChannelBufferOutputStream( -// ChannelBuffers.dynamicBuffer(estimatedLength, ctx.getChannel() -// .getConfig().getBufferFactory())); -// -// // MessagePack.pack(out, msg); -// messagePack.write(out, msg); -// -// ChannelBuffer result = out.buffer(); -// return result; -// } -//} +package org.msgpack.rpc.loop.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import org.msgpack.MessagePack; +import org.msgpack.type.Value; + +class MessagePackEncoder extends MessageToByteEncoder { + + private final MessagePack _msgpack; + + public MessagePackEncoder(final MessagePack msgpack){ + _msgpack = msgpack; + } + + @Override + protected void encode(ChannelHandlerContext ctx, Value msg, ByteBuf out) throws Exception { + + _msgpack.createPacker(new ByteBufOutputStream(out)).write(msg); + + ctx.flush(); + } +} \ No newline at end of file diff --git a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackStreamDecoder.java b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackStreamDecoder.java deleted file mode 100644 index 716f666..0000000 --- a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackStreamDecoder.java +++ /dev/null @@ -1,67 +0,0 @@ -//// -//// MessagePack-RPC for Java -//// -//// Copyright (C) 2010 FURUHASHI Sadayuki -//// -//// Licensed under the Apache License, Version 2.0 (the "License"); -//// you may not use this file except in compliance with the License. -//// You may obtain a copy of the License at -//// -//// http://www.apache.org/licenses/LICENSE-2.0 -//// -//// Unless required by applicable law or agreed to in writing, software -//// distributed under the License is distributed on an "AS IS" BASIS, -//// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -//// See the License for the specific language governing permissions and -//// limitations under the License. -//// -//package org.msgpack.rpc.loop.netty; -// -//import java.io.ByteArrayInputStream; -//import java.io.EOFException; -//import java.io.IOException; -//import java.io.InputStream; -//import java.nio.ByteBuffer; -//import java.util.concurrent.atomic.AtomicInteger; -// -//import org.jboss.netty.channel.Channel; -//import org.jboss.netty.channel.ChannelHandlerContext; -//import org.jboss.netty.buffer.ChannelBuffer; -//import org.jboss.netty.handler.codec.frame.FrameDecoder; -//import org.msgpack.MessagePack; -//import org.msgpack.type.Value; -//import org.msgpack.unpacker.Unpacker; -// -//public class MessagePackStreamDecoder extends FrameDecoder { -// protected MessagePack msgpack; -// -// public MessagePackStreamDecoder(MessagePack msgpack) { -// super(); -// this.msgpack = msgpack; -// } -// -// @Override -// protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer source) throws Exception { -// -// // TODO #MN will modify the body with MessagePackBufferUnpacker. -// ByteBuffer buffer = source.toByteBuffer().slice(); -// if (!buffer.hasRemaining()) { -// return null; -// } -// source.markReaderIndex(); -// -// try{ -// Unpacker unpacker = msgpack.createBufferUnpacker(buffer); -// Value v = unpacker.readValue(); -// source.skipBytes(buffer.position()); -// return v; -// } -// catch( EOFException e ){ -// // not enough buffers. -// // So retry reading -// System.out.println("~~~waste~~~"); -// source.resetReaderIndex(); -// return null; -// } -// } -//} diff --git a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackableEncoder.java b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackableEncoder.java new file mode 100644 index 0000000..f87633d --- /dev/null +++ b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackableEncoder.java @@ -0,0 +1,32 @@ +package org.msgpack.rpc.loop.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.MessageToByteEncoder; +import org.msgpack.MessagePack; +import org.msgpack.MessagePackable; + +class MessagePackableEncoder extends MessageToByteEncoder { + + private final MessagePack _msgpack; + + public MessagePackableEncoder(final MessagePack msgpack){ + _msgpack = msgpack; + } + + @Override + protected void encode(ChannelHandlerContext ctx, MessagePackable msg, ByteBuf out) throws Exception { + + msg.writeTo(_msgpack.createPacker(new ByteBufOutputStream(out))); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + + super.write(ctx, msg, promise); + + ctx.flush(); + } +} \ No newline at end of file diff --git a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java index b20aa05..df67837 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java @@ -17,36 +17,21 @@ // package org.msgpack.rpc.loop.netty; -import java.io.EOFException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufOutputStream; -import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.codec.ByteToMessageDecoder; -import io.netty.handler.codec.MessageToByteEncoder; -import org.msgpack.MessagePack; -import org.msgpack.MessagePackable; import org.msgpack.rpc.Session; import org.msgpack.rpc.config.TcpClientConfig; import org.msgpack.rpc.transport.ClientTransport; import org.msgpack.rpc.transport.RpcMessageHandler; -import org.msgpack.rpc.transport.PooledStreamClientTransport; -import org.msgpack.type.Value; -import org.msgpack.unpacker.Unpacker; class NettyTcpClientTransport implements ClientTransport { - private ByteBufOutputStream _bufOutput; - private final Session _session; private final Bootstrap bootstrap; private final ConcurrentLinkedQueue _channels; @@ -124,110 +109,4 @@ public void operationComplete(ChannelFuture future) throws Exception { } }); } - - static class MessagePackDecoder extends ByteToMessageDecoder { - - private final MessagePack _msgpack; - - public MessagePackDecoder(final MessagePack msgpack){ - _msgpack = msgpack; - } - - @Override - protected void decode(final ChannelHandlerContext channelHandlerContext, - final ByteBuf byteBuf, - final List out) throws Exception { - - //System.out.printf("[client transport] decode got bytebuf\n"); - - final ByteBuffer buffer = byteBuf.markReaderIndex().nioBuffer().slice(); - - try{ - Unpacker unpacker = _msgpack.createBufferUnpacker(buffer); - out.add(unpacker.readValue()); - - //System.out.printf("[client transport] decode done\n"); - - byteBuf.skipBytes(buffer.position()); - } - catch( EOFException e ){ - - //System.out.println("[client transport] not enough bytebuf"); - - byteBuf.resetReaderIndex(); - } - } - } - - static class MessagePackEncoder extends MessageToByteEncoder { - - private final MessagePack _msgpack; - - public MessagePackEncoder(final MessagePack msgpack){ - _msgpack = msgpack; - } - - @Override - protected void encode(ChannelHandlerContext ctx, Value msg, ByteBuf out) throws Exception { - - //System.out.println("[client transport] encoding msg of Value"); - - _msgpack.createPacker(new ByteBufOutputStream(out)).write(msg); - - ctx.flush(); - } - } - - static class MessagePackableEncoder extends MessageToByteEncoder { - - private final MessagePack _msgpack; - - public MessagePackableEncoder(final MessagePack msgpack){ - _msgpack = msgpack; - } - - @Override - protected void encode(ChannelHandlerContext ctx, MessagePackable msg, ByteBuf out) throws Exception { - - //System.out.println("[client transport] encoding msg of Value from packable encoder"); - - msg.writeTo(_msgpack.createPacker(new ByteBufOutputStream(out))); - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - - super.write(ctx, msg, promise); - - ctx.flush(); - - //System.out.println("[server transport] encoding msg of Value from packable encoder [flushed]"); - } - } - - static class MessageHandler extends ChannelInboundHandlerAdapter { - - private final RpcMessageHandler _rpcHandler; - - public MessageHandler(final RpcMessageHandler rpcHandler){ - _rpcHandler = rpcHandler; - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - - //System.out.printf("[client transport] message handler got msg: " + msg.getClass().getName()); - - Value value = (Value) msg; - - _rpcHandler.handleMessage(new ChannelAdaptor(ctx.channel()), value); - } - - @Override - public void exceptionCaught(final ChannelHandlerContext ctx, Throwable cause) throws Exception { - - cause.printStackTrace(); - ctx.close(); - } - } } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java index db00a4a..a6d308b 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java @@ -17,28 +17,16 @@ // package org.msgpack.rpc.loop.netty; -import java.io.EOFException; -import java.nio.ByteBuffer; -import java.util.List; - import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufOutputStream; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.codec.ByteToMessageDecoder; -import io.netty.handler.codec.MessageToByteEncoder; -import org.msgpack.MessagePack; -import org.msgpack.MessagePackable; import org.msgpack.rpc.Server; import org.msgpack.rpc.config.TcpServerConfig; import org.msgpack.rpc.transport.RpcMessageHandler; import org.msgpack.rpc.transport.ServerTransport; import org.msgpack.rpc.address.Address; -import org.msgpack.type.Value; -import org.msgpack.unpacker.Unpacker; class NettyTcpServerTransport implements ServerTransport { @@ -79,115 +67,6 @@ public void initChannel(SocketChannel ch) throws Exception { channelFuture = b.bind(address.getSocketAddress()).sync(); // (7) } - static class MessagePackDecoder extends ByteToMessageDecoder { - - private final MessagePack _msgpack; - - public MessagePackDecoder(final MessagePack msgpack){ - _msgpack = msgpack; - } - - @Override - protected void decode(final ChannelHandlerContext channelHandlerContext, - final ByteBuf byteBuf, - final List out) throws Exception { - - //System.out.println("[server transport] got bytebuf to decode"); - - final ByteBuffer buffer = byteBuf.markReaderIndex().nioBuffer().slice(); - - try{ - Unpacker unpacker = _msgpack.createBufferUnpacker(buffer); - - out.add(unpacker.readValue()); - - //System.out.println("[server transport] feed value to the next"); - - byteBuf.skipBytes(buffer.position()); - } - catch( EOFException e ){ - - //System.out.println("[server transport] not enough bytebuf"); - - byteBuf.resetReaderIndex(); - } - } - } - - static class MessagePackEncoder extends MessageToByteEncoder { - - private final MessagePack _msgpack; - - public MessagePackEncoder(final MessagePack msgpack){ - _msgpack = msgpack; - } - - @Override - protected void encode(ChannelHandlerContext ctx, Value msg, ByteBuf out) throws Exception { - - //System.out.println("[server transport] encoding msg of Value"); - - _msgpack.createPacker(new ByteBufOutputStream(out)).write(msg); - - ctx.flush(); - } - } - - static class MessagePackableEncoder extends MessageToByteEncoder { - - private final MessagePack _msgpack; - - public MessagePackableEncoder(final MessagePack msgpack){ - _msgpack = msgpack; - } - - @Override - protected void encode(ChannelHandlerContext ctx, MessagePackable msg, ByteBuf out) throws Exception { - - //System.out.println("[server transport] encoding msg of Value from packable encoder"); - - msg.writeTo(_msgpack.createPacker(new ByteBufOutputStream(out))); - - //System.out.println("[server transport] encoding msg of Value from packable encoder [written]"); - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - - super.write(ctx, msg, promise); - - ctx.flush(); - - //System.out.println("[server transport] encoding msg of Value from packable encoder [flushed]"); - } - } - - static class MessageHandler extends ChannelInboundHandlerAdapter { - - private final RpcMessageHandler _rpcHandler; - - public MessageHandler(final RpcMessageHandler rpcHandler){ - _rpcHandler = rpcHandler; - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - - //System.out.println("[server transport] got message decoded: " + msg.getClass().getName()); - - Value value = (Value) msg; - - _rpcHandler.handleMessage(new ChannelAdaptor(ctx.channel()), value); - } - - @Override - public void exceptionCaught(final ChannelHandlerContext ctx, Throwable cause) throws Exception { - - cause.printStackTrace(); - ctx.close(); - } - } - public void close() { channelFuture.channel().close(); } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/StreamPipelineFactory.java b/src/main/java/org/msgpack/rpc/loop/netty/StreamPipelineFactory.java deleted file mode 100644 index 205ab13..0000000 --- a/src/main/java/org/msgpack/rpc/loop/netty/StreamPipelineFactory.java +++ /dev/null @@ -1,42 +0,0 @@ -//// -//// MessagePack-RPC for Java -//// -//// Copyright (C) 2010 FURUHASHI Sadayuki -//// -//// Licensed under the Apache License, Version 2.0 (the "License"); -//// you may not use this file except in compliance with the License. -//// You may obtain a copy of the License at -//// -//// http://www.apache.org/licenses/LICENSE-2.0 -//// -//// Unless required by applicable law or agreed to in writing, software -//// distributed under the License is distributed on an "AS IS" BASIS, -//// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -//// See the License for the specific language governing permissions and -//// limitations under the License. -//// -//package org.msgpack.rpc.loop.netty; -// -//import org.jboss.netty.channel.Channels; -//import org.jboss.netty.channel.ChannelPipeline; -//import org.jboss.netty.channel.ChannelPipelineFactory; -//import org.msgpack.MessagePack; -//import org.msgpack.rpc.transport.RpcMessageHandler; -// -//class StreamPipelineFactory implements ChannelPipelineFactory { -// private RpcMessageHandler handler; -// private MessagePack messagePack; -// -// StreamPipelineFactory(MessagePack messagePack, RpcMessageHandler handler) { -// this.handler = handler; -// this.messagePack = messagePack; -// } -// -// public ChannelPipeline getPipeline() throws Exception { -// ChannelPipeline p = Channels.pipeline(); -// p.addLast("msgpack-decode-stream", new MessagePackStreamDecoder(messagePack)); -// p.addLast("msgpack-encode", new MessagePackEncoder(messagePack)); -// p.addLast("message", new MessageHandler(handler)); -// return p; -// } -//} diff --git a/src/test/java/org/msgpack/rpc/BigDataTest.java b/src/test/java/org/msgpack/rpc/BigDataTest.java index 74ea827..40d1cef 100644 --- a/src/test/java/org/msgpack/rpc/BigDataTest.java +++ b/src/test/java/org/msgpack/rpc/BigDataTest.java @@ -45,17 +45,19 @@ public void testSyncBigDataLoad() throws Exception { svr.serve(new BigDataDispatcher()); svr.listen(19851); - int num = 5; + //warmup + assertEquals(BIG_DATA, c.callApply("test", new Object[]{BIG_DATA})); + + int num = 10; long start = System.currentTimeMillis(); for(int i=0; i < num; i++) { - Value result = c.callApply("test", new Object[]{BIG_DATA}); - assertEquals(BIG_DATA, result); + assertEquals(BIG_DATA, c.callApply("test", new Object[]{BIG_DATA})); } long finish = System.currentTimeMillis(); double result = num / ((double)(finish - start) / 1000); - System.out.println("sync: "+result+" calls per sec"); + System.out.printf("sync: %f calls per sec, and avg: %fms per call", result, (double) (finish - start) / num); } finally { @@ -76,10 +78,13 @@ public void testAsyncBigDataLoad() throws Exception { svr.serve(new BigDataDispatcher()); svr.listen(19852); - int num = 10; + //warmup + assertEquals(BIG_DATA, c.callApply("test", new Object[]{BIG_DATA})); + + int num = 100; long start = System.currentTimeMillis(); - for(int i=0; i < num-1; i++) { + for(int i = 0; i < num - 1; i++) { c.notifyApply("test", new Object[]{BIG_DATA}); } c.callApply("test", new Object[]{BIG_DATA}); @@ -88,7 +93,8 @@ public void testAsyncBigDataLoad() throws Exception { double result = num / ((double)(finish - start) / 1000); System.out.println("async: "+result+" calls per sec"); - } finally { + } + finally { svr.close(); c.close(); loop.shutdown(); From 6003f2bde7c1f759025329d8e8f3b8279a704e26 Mon Sep 17 00:00:00 2001 From: huzhou Date: Thu, 23 Jan 2014 11:10:18 -0800 Subject: [PATCH 6/8] ~ refactorings --- .../rpc/loop/netty/ChannelAdaptor.java | 13 +++--- .../rpc/loop/netty/MessagePackEncoder.java | 2 - .../loop/netty/MessagePackableEncoder.java | 9 +---- .../loop/netty/NettyTcpClientTransport.java | 5 +-- .../loop/netty/NettyTcpServerTransport.java | 40 +++++++++---------- 5 files changed, 30 insertions(+), 39 deletions(-) diff --git a/src/main/java/org/msgpack/rpc/loop/netty/ChannelAdaptor.java b/src/main/java/org/msgpack/rpc/loop/netty/ChannelAdaptor.java index 7044a10..11a95a6 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/ChannelAdaptor.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/ChannelAdaptor.java @@ -22,17 +22,20 @@ class ChannelAdaptor implements ClientTransport { - private Channel channel; + private final Channel _channel; - ChannelAdaptor(Channel channel) { - this.channel = channel; + protected ChannelAdaptor(final Channel channel) { + + _channel = channel; } public void sendMessage(Object msg) { - channel.write(msg); + + _channel.writeAndFlush(msg); } public void close() { - channel.close(); + + _channel.close(); } } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackEncoder.java b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackEncoder.java index c1fb1cf..2472e5e 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackEncoder.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackEncoder.java @@ -19,7 +19,5 @@ public MessagePackEncoder(final MessagePack msgpack){ protected void encode(ChannelHandlerContext ctx, Value msg, ByteBuf out) throws Exception { _msgpack.createPacker(new ByteBufOutputStream(out)).write(msg); - - ctx.flush(); } } \ No newline at end of file diff --git a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackableEncoder.java b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackableEncoder.java index f87633d..65ef611 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackableEncoder.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackableEncoder.java @@ -13,6 +13,7 @@ class MessagePackableEncoder extends MessageToByteEncoder { private final MessagePack _msgpack; public MessagePackableEncoder(final MessagePack msgpack){ + _msgpack = msgpack; } @@ -21,12 +22,4 @@ protected void encode(ChannelHandlerContext ctx, MessagePackable msg, ByteBuf ou msg.writeTo(_msgpack.createPacker(new ByteBufOutputStream(out))); } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - - super.write(ctx, msg, promise); - - ctx.flush(); - } } \ No newline at end of file diff --git a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java index df67837..9bdfba3 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java @@ -20,7 +20,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBufOutputStream; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; @@ -43,10 +42,8 @@ class NettyTcpClientTransport implements ClientTransport { // TODO check session.getAddress() instanceof IPAddress final RpcMessageHandler handler = new RpcMessageHandler(session); - final EventLoopGroup workerGroup = new NioEventLoopGroup(); - bootstrap = new Bootstrap(); // (1) - bootstrap.group(workerGroup); // (2) + bootstrap.group(new NioEventLoopGroup(2)); // (2) bootstrap.channel(NioSocketChannel.class); // (3) bootstrap.option(ChannelOption.SO_KEEPALIVE, true); // (4) bootstrap.handler(new ChannelInitializer() { diff --git a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java index a6d308b..81ee899 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java @@ -30,7 +30,7 @@ class NettyTcpServerTransport implements ServerTransport { - private ChannelFuture channelFuture; + private final ChannelFuture channelFuture; NettyTcpServerTransport(final TcpServerConfig config, final Server server, @@ -40,28 +40,28 @@ class NettyTcpServerTransport implements ServerTransport { throw new IllegalArgumentException("Server must not be null"); } - Address address = config.getListenAddress(); - + final Address address = config.getListenAddress(); final RpcMessageHandler handler = new RpcMessageHandler(server); + handler.useThread(true); - final EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) - final EventLoopGroup workerGroup = new NioEventLoopGroup(); - final ServerBootstrap b = new ServerBootstrap(); // (2) - b.group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) // (3) - .childHandler(new ChannelInitializer() { // (4) - @Override - public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast( - new MessagePackDecoder(loop.getMessagePack()), - new MessageHandler(handler), - new MessagePackEncoder(loop.getMessagePack()), - new MessagePackableEncoder(loop.getMessagePack())); - } - }) - .option(ChannelOption.SO_BACKLOG, 128) // (5) - .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) + final EventLoopGroup bossGroup = new NioEventLoopGroup(1); // (1) + final EventLoopGroup workerGroup = new NioEventLoopGroup(4); + final ServerBootstrap b = new ServerBootstrap() + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) // (3) + .childHandler(new ChannelInitializer() { // (4) + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast( + new MessagePackDecoder(loop.getMessagePack()), + new MessageHandler(handler), + new MessagePackEncoder(loop.getMessagePack()), + new MessagePackableEncoder(loop.getMessagePack())); + } + }) + .option(ChannelOption.SO_BACKLOG, 128) // (5) + .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // Bind and start to accept incoming connections. channelFuture = b.bind(address.getSocketAddress()).sync(); // (7) From e7b6ee3503c66b92f6c364cefa1a2278a1184d44 Mon Sep 17 00:00:00 2001 From: huzhou Date: Thu, 23 Jan 2014 15:20:13 -0800 Subject: [PATCH 7/8] ~ refactors --- .../loop/netty/NettyTcpClientTransport.java | 46 ++++++++++--------- .../loop/netty/NettyTcpServerTransport.java | 3 +- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java index 9bdfba3..aa976ca 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java @@ -33,7 +33,7 @@ class NettyTcpClientTransport implements ClientTransport { private final Session _session; private final Bootstrap bootstrap; - private final ConcurrentLinkedQueue _channels; + private final ConcurrentLinkedQueue _writables; NettyTcpClientTransport(final TcpClientConfig config, final Session session, @@ -42,23 +42,25 @@ class NettyTcpClientTransport implements ClientTransport { // TODO check session.getAddress() instanceof IPAddress final RpcMessageHandler handler = new RpcMessageHandler(session); - bootstrap = new Bootstrap(); // (1) - bootstrap.group(new NioEventLoopGroup(2)); // (2) - bootstrap.channel(NioSocketChannel.class); // (3) - bootstrap.option(ChannelOption.SO_KEEPALIVE, true); // (4) - bootstrap.handler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast( - new MessagePackDecoder(loop.getMessagePack()), - new MessageHandler(handler), - new MessagePackEncoder(loop.getMessagePack()), - new MessagePackableEncoder(loop.getMessagePack())); - } - }); + bootstrap = new Bootstrap() + .group(new NioEventLoopGroup(2)) + .channel(NioSocketChannel.class) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.round((float)config.getConnectTimeout())) + .option(ChannelOption.TCP_NODELAY, !Boolean.FALSE.equals(config.getOption(ChannelOption.TCP_NODELAY.name()))) + .option(ChannelOption.SO_KEEPALIVE, !Boolean.FALSE.equals(config.getOption(ChannelOption.SO_KEEPALIVE.name()))) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast( + new MessagePackDecoder(loop.getMessagePack()), + new MessageHandler(handler), + new MessagePackEncoder(loop.getMessagePack()), + new MessagePackableEncoder(loop.getMessagePack())); + } + }); _session = session; - _channels = new ConcurrentLinkedQueue(); + _writables = new ConcurrentLinkedQueue(); } protected ChannelFuture startConnection() { @@ -67,7 +69,7 @@ protected ChannelFuture startConnection() { public void sendMessage(final Object msg) { - if(_channels.isEmpty()){ + if(_writables.isEmpty()){ startConnection().addListener(new ChannelFutureListener() { @@ -78,16 +80,16 @@ public void operationComplete(ChannelFuture future) throws Exception { }); } else{ - sendMessageChannel(_channels.poll(), msg); + sendMessageChannel(_writables.poll(), msg); } } public void close(){ - System.out.println("[client transport] closing channels:" + _channels.size()); + System.out.println("[client transport] closing channels:" + _writables.size()); - while(!_channels.isEmpty()){ - _channels.poll().close(); + while(!_writables.isEmpty()){ + _writables.poll().close(); } } @@ -102,7 +104,7 @@ public void operationComplete(ChannelFuture future) throws Exception { //System.out.println("[client transport] message sent!!!"); - _channels.offer(future.channel()); + _writables.offer(future.channel()); } }); } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java index 81ee899..00d7ba4 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java @@ -61,7 +61,8 @@ public void initChannel(SocketChannel ch) throws Exception { } }) .option(ChannelOption.SO_BACKLOG, 128) // (5) - .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) + .childOption(ChannelOption.TCP_NODELAY, !Boolean.FALSE.equals(config.getOption(ChannelOption.TCP_NODELAY.name()))) + .childOption(ChannelOption.SO_KEEPALIVE, !Boolean.FALSE.equals(config.getOption(ChannelOption.SO_KEEPALIVE.name()))); // Bind and start to accept incoming connections. channelFuture = b.bind(address.getSocketAddress()).sync(); // (7) From 6ca0a73724a4e936cbf34aaddb5f23e3cd6e9f63 Mon Sep 17 00:00:00 2001 From: huzhou Date: Thu, 23 Jan 2014 15:42:27 -0800 Subject: [PATCH 8/8] ~ limit num of channels to open (otherwise got too many open files) --- .../loop/netty/NettyTcpClientTransport.java | 40 ++++++++++++++----- .../loop/netty/NettyTcpServerTransport.java | 4 +- .../java/org/msgpack/rpc/BigDataTest.java | 2 +- 3 files changed, 33 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java index aa976ca..709cbf4 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java @@ -18,6 +18,7 @@ package org.msgpack.rpc.loop.netty; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; @@ -32,7 +33,8 @@ class NettyTcpClientTransport implements ClientTransport { private final Session _session; - private final Bootstrap bootstrap; + private final Bootstrap _bootstrap; + private final AtomicInteger _availables = new AtomicInteger(1024); private final ConcurrentLinkedQueue _writables; NettyTcpClientTransport(final TcpClientConfig config, @@ -42,10 +44,10 @@ class NettyTcpClientTransport implements ClientTransport { // TODO check session.getAddress() instanceof IPAddress final RpcMessageHandler handler = new RpcMessageHandler(session); - bootstrap = new Bootstrap() - .group(new NioEventLoopGroup(2)) + _bootstrap = new Bootstrap() + .group(new NioEventLoopGroup(/*2*/)) .channel(NioSocketChannel.class) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.round((float)config.getConnectTimeout())) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.round((float) config.getConnectTimeout())) .option(ChannelOption.TCP_NODELAY, !Boolean.FALSE.equals(config.getOption(ChannelOption.TCP_NODELAY.name()))) .option(ChannelOption.SO_KEEPALIVE, !Boolean.FALSE.equals(config.getOption(ChannelOption.SO_KEEPALIVE.name()))) .handler(new ChannelInitializer() { @@ -64,30 +66,48 @@ public void initChannel(SocketChannel ch) throws Exception { } protected ChannelFuture startConnection() { - return bootstrap.connect(_session.getAddress().getSocketAddress()); + + return _bootstrap.connect(_session.getAddress().getSocketAddress()); } public void sendMessage(final Object msg) { - if(_writables.isEmpty()){ + if(_writables.isEmpty() && _availables.getAndDecrement() > 0){ startConnection().addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { - sendMessageChannel(future.channel(), msg); + final Channel connected = future.channel(); + + sendMessageChannel(connected, msg); + + connected.closeFuture().addListener(new ChannelFutureListener() { + public void operationComplete(ChannelFuture channelFuture) throws Exception { + _availables.incrementAndGet(); + } + }); } }); } else{ - sendMessageChannel(_writables.poll(), msg); + + final Channel writable = _writables.poll(); + + if(writable != null){ + + sendMessageChannel(writable, msg); + } + else{ + + Thread.yield(); + sendMessage(msg); + } } } public void close(){ - System.out.println("[client transport] closing channels:" + _writables.size()); - while(!_writables.isEmpty()){ _writables.poll().close(); } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java index 00d7ba4..fffaf09 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java @@ -45,8 +45,8 @@ class NettyTcpServerTransport implements ServerTransport { handler.useThread(true); - final EventLoopGroup bossGroup = new NioEventLoopGroup(1); // (1) - final EventLoopGroup workerGroup = new NioEventLoopGroup(4); + final EventLoopGroup bossGroup = new NioEventLoopGroup(/*1*/); // (1) + final EventLoopGroup workerGroup = new NioEventLoopGroup(/*4*/); final ServerBootstrap b = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) diff --git a/src/test/java/org/msgpack/rpc/BigDataTest.java b/src/test/java/org/msgpack/rpc/BigDataTest.java index 40d1cef..7782123 100644 --- a/src/test/java/org/msgpack/rpc/BigDataTest.java +++ b/src/test/java/org/msgpack/rpc/BigDataTest.java @@ -81,7 +81,7 @@ public void testAsyncBigDataLoad() throws Exception { //warmup assertEquals(BIG_DATA, c.callApply("test", new Object[]{BIG_DATA})); - int num = 100; + int num = 10000; long start = System.currentTimeMillis(); for(int i = 0; i < num - 1; i++) {