Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,37 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
}

private void finishCall(ResponseHeader responseHeader, ByteBufInputStream in, Call call)
throws IOException {
Message value;
if (call.responseDefaultType != null) {
Message.Builder builder = call.responseDefaultType.newBuilderForType();
if (!builder.mergeDelimitedFrom(in)) {
// The javadoc of mergeDelimitedFrom says returning false means the stream reaches EOF
// before reading any bytes out, so here we need to manually finish create the EOFException
// and finish the call
call.setException(new EOFException("EOF while reading response with type: "
+ call.responseDefaultType.getClass().getName()));
return;
}
value = builder.build();
} else {
value = null;
}
CellScanner cellBlockScanner;
if (responseHeader.hasCellBlockMeta()) {
int size = responseHeader.getCellBlockMeta().getLength();
// Maybe we could read directly from the ByteBuf.
// The problem here is that we do not know when to release it.
byte[] cellBlock = new byte[size];
in.readFully(cellBlock);
cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock);
} else {
cellBlockScanner = null;
}
call.setResponse(value, cellBlockScanner);
}

private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws IOException {
int totalSize = buf.readInt();
ByteBufInputStream in = new ByteBufInputStream(buf);
Expand Down Expand Up @@ -166,31 +197,17 @@ private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws IOExcep
call.setException(remoteExc);
return;
}
Message value;
if (call.responseDefaultType != null) {
Message.Builder builder = call.responseDefaultType.newBuilderForType();
if (!builder.mergeDelimitedFrom(in)) {
// The javadoc of mergeDelimitedFrom says returning false means the stream reaches EOF
// before reading any bytes out, so here we need to manually throw the EOFException out
throw new EOFException(
"EOF while reading response with type: " + call.responseDefaultType.getClass().getName());
}
value = builder.build();
} else {
value = null;
}
CellScanner cellBlockScanner;
if (responseHeader.hasCellBlockMeta()) {
int size = responseHeader.getCellBlockMeta().getLength();
// Maybe we could read directly from the ByteBuf.
// The problem here is that we do not know when to release it.
byte[] cellBlock = new byte[size];
buf.readBytes(cellBlock);
cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock);
} else {
cellBlockScanner = null;
try {
finishCall(responseHeader, in, call);
} catch (IOException e) {
// As the call has been removed from id2Call map, if we hit an exception here, the
// exceptionCaught method can not help us finish the call, so here we need to catch the
// exception and finish it
// And in netty, the decoding the frame based, when reaching here we have already read a full
// frame, so hitting exception here does not mean the stream decoding is broken, thus we do
// not need to throw the exception out and close the connection.
call.setException(e);
}
call.setResponse(value, cellBlockScanner);
}

@Override
Expand Down