Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class NettyRpcFrameDecoder extends ByteToMessageDecoder {
final NettyServerRpcConnection connection;

private boolean requestTooBig;
private boolean requestTooBigSent;
private String requestTooBigMessage;

public NettyRpcFrameDecoder(int maxFrameLength, NettyServerRpcConnection connection) {
Expand All @@ -55,8 +56,12 @@ public NettyRpcFrameDecoder(int maxFrameLength, NettyServerRpcConnection connect

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (requestTooBigSent) {
in.skipBytes(in.readableBytes());
return;
}
if (requestTooBig) {
handleTooBigRequest(in);
handleTooBigRequest(ctx, in);
return;
}

Expand All @@ -80,7 +85,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
NettyRpcServer.LOG.warn(requestTooBigMessage);

if (connection.connectionHeaderRead) {
handleTooBigRequest(in);
handleTooBigRequest(ctx, in);
return;
}
ctx.channel().close();
Expand All @@ -98,7 +103,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
out.add(in.readRetainedSlice(frameLengthInt));
}

private void handleTooBigRequest(ByteBuf in) throws IOException {
private void handleTooBigRequest(ChannelHandlerContext ctx, ByteBuf in) throws IOException {
in.skipBytes(FRAME_LENGTH_FIELD_LENGTH);
in.markReaderIndex();
int preIndex = in.readerIndex();
Expand Down Expand Up @@ -143,6 +148,10 @@ private void handleTooBigRequest(ByteBuf in) throws IOException {
// instead of calling reqTooBig.sendResponseIfReady()
reqTooBig.param = null;
connection.channel.writeAndFlush(reqTooBig).addListener(ChannelFutureListener.CLOSE);
in.skipBytes(in.readableBytes());
requestTooBigSent = true;
// disable auto read as we do not care newer data from this channel any more
ctx.channel().config().setAutoRead(false);
}

private RPCProtos.RequestHeader getHeader(ByteBuf in, int headerSize) throws IOException {
Expand Down