Skip to content

Commit

Permalink
fix NettyCodecAdapter decoder memory leak (#14538)
Browse files Browse the repository at this point in the history
* fix NettyCodecAdapter decoder memory leak

* fix format

* update comment
  • Loading branch information
JoeCqupt authored Aug 28, 2024
1 parent f3d1294 commit 1190033
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,26 +94,30 @@ private class InternalDecoder extends ByteToMessageDecoder {
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {

ChannelBuffer message = new NettyBackedChannelBuffer(input);

NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);

// decode object.
do {
int saveReaderIndex = message.readerIndex();
Object msg = codec.decode(channel, message);
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
message.readerIndex(saveReaderIndex);
break;
} else {
// is it possible to go here ?
if (saveReaderIndex == message.readerIndex()) {
throw new IOException("Decode without read data.");
}
if (msg != null) {
out.add(msg);
try {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);

// decode object.
do {
int saveReaderIndex = message.readerIndex();
Object msg = codec.decode(channel, message);
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
message.readerIndex(saveReaderIndex);
break;
} else {
// is it possible to go here ?
if (saveReaderIndex == message.readerIndex()) {
throw new IOException("Decode without read data.");
}
if (msg != null) {
out.add(msg);
}
}
}
} while (message.readable());
} while (message.readable());
} catch (Throwable t) {
message.skipBytes(message.readableBytes());
throw t;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,26 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Codec2;
import org.apache.dubbo.remoting.Constants;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

import io.netty.buffer.AbstractByteBufAllocator;
import io.netty.buffer.ByteBuf;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.MessageToByteEncoder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;

/**
* {@link NettyCodecAdapter}
*/
Expand All @@ -42,4 +55,30 @@ void test() {
Assertions.assertTrue(decoder instanceof ByteToMessageDecoder);
Assertions.assertTrue(encoder instanceof MessageToByteEncoder);
}

@Test
void testDecodeException() throws IOException {
Codec2 codec2 = Mockito.mock(Codec2.class);
doThrow(new IOException("testDecodeIllegalPacket")).when(codec2).decode(any(), any());

URL url = Mockito.mock(URL.class);
doReturn("default").when(url).getParameter(eq(Constants.CODEC_KEY));

ChannelHandler handler = Mockito.mock(ChannelHandler.class);
NettyCodecAdapter nettyCodecAdapter = new NettyCodecAdapter(codec2, url, handler);
io.netty.channel.ChannelHandler decoder = nettyCodecAdapter.getDecoder();
EmbeddedChannel embeddedChannel = new EmbeddedChannel();
embeddedChannel.pipeline().addLast(decoder);

// simulate illegal data packet
ByteBuf input = AbstractByteBufAllocator.DEFAULT.buffer();
input.writeBytes("testDecodeIllegalPacket".getBytes(StandardCharsets.UTF_8));

DecoderException decoderException = Assertions.assertThrows(DecoderException.class, () -> {
embeddedChannel.writeInbound(input);
});
Assertions.assertTrue(decoderException.getCause() instanceof IOException);

Assertions.assertEquals(0, input.refCnt());
}
}

0 comments on commit 1190033

Please sign in to comment.