Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,16 @@ private ByteBuf decodeNext() {
remaining -= next.readableBytes();
frame.addComponent(next).writerIndex(frame.writerIndex() + next.readableBytes());
}
// Because the bytebuf created is far less than it's capacity in most cases,
// we can reduce memory consumption by consolidation
ByteBuf retained = null;
if (frameSize >= 1024 * 1024) {
retained = frame.consolidate();
} else {
retained = frame;
}
assert remaining == 0;
return frame;
return retained;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -47,6 +48,35 @@ public void testFrameDecoding() throws Exception {
verifyAndCloseDecoder(decoder, ctx, data);
}

@Test
public void testConsolidationForDecodingNonFullyWrittenByteBuf() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, this is testing that consolidation is reducing the amount of memory needed to hold a frame? But since you're writing just 1 MB to the decoder, that's not triggering consolidation, is it?

Playing with CompositeByteBuf, it adjusts the internal capacity based on the readable bytes of the components, but the component buffers remain unchanged, so still holding on to the original amount of memory:

scala> cb.numComponents()
res4: Int = 2

scala> cb.capacity()
res5: Int = 8

scala> cb.component(0).capacity()
res6: Int = 1048576

So I'm not sure this test is testing anything useful.

Also it would be nice not to use so many magic numbers.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin I think the test should be refined. but I was quesion about your test.
CompositeByteBuf.capacity returns the last component endOffset, I think use the capacity for testing is ok.
https://github.com/netty/netty/blob/8fecbab2c56d3f49d0353d58ee1681f3e6d3feca/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java#L730

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe my question wasn't clear. I'm asking what part of Spark code is this test testing.

As far as I can see, it's testing netty code, and these are not netty unit tests.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin it think this test is a little duplicate of testConsolidationPerf, we can just remove it. I will update soon. Sorry for that.

TransportFrameDecoder decoder = new TransportFrameDecoder();
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
ArrayList<ByteBuf> retained = Lists.newArrayList();
when(ctx.fireChannelRead(any())).thenAnswer(in -> {
ByteBuf buf = (ByteBuf) in.getArguments()[0];
retained.add(buf);
return null;
});
ByteBuf data1 = Unpooled.buffer(1024 * 1024);
data1.writeLong(1024 * 1024 + 8);
data1.writeByte(127);
ByteBuf data2 = Unpooled.buffer(1024 * 1024);
for (int i = 0; i < 1024 * 1024 - 1; i++) {
data2.writeByte(128);
}
int orignalCapacity = data1.capacity() + data2.capacity();
try {
decoder.channelRead(ctx, data1);
decoder.channelRead(ctx, data2);
assertEquals(1, retained.size());
assert(retained.get(0).capacity() < orignalCapacity);
} catch (Exception e) {
release(data1);
release(data2);
}
}

@Test
public void testInterception() throws Exception {
int interceptedReads = 3;
Expand Down