-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Closed
Description
gRPC-java===1.63.0
java: 17.0.9 2023-10-17 LTS 64-Bit
When I use gRPC streaming responses, I handle it like this:
observers.forEach((context, observer) -> {
if (!context.isCancelled() && observer.isReady()) {
observer.onNext(data);
}
});This method gets triggered about 1500 times per second. Under such concurrent circumstances, there are issues with gRPC failing to send messages.
Stacktrace and logs
2024-05-13 09:44:10.412 [grpc-nio-worker-ELG-1-7] WARN io.grpc.netty.shaded.io.netty.util.ReferenceCountUtil.safeRelease - Failed to release a message: UnpooledSlicedByteBuf(freed)
io.grpc.netty.shaded.io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.grpc.netty.shaded.io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:83)
at io.grpc.netty.shaded.io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:148)
at io.grpc.netty.shaded.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
at io.grpc.netty.shaded.io.netty.buffer.CompositeByteBuf$Component.free(CompositeByteBuf.java:1959)
at io.grpc.netty.shaded.io.netty.buffer.CompositeByteBuf.deallocate(CompositeByteBuf.java:2264)
at io.grpc.netty.shaded.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:111)
at io.grpc.netty.shaded.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
at io.grpc.netty.shaded.io.netty.buffer.AbstractDerivedByteBuf.release0(AbstractDerivedByteBuf.java:98)
at io.grpc.netty.shaded.io.netty.buffer.AbstractDerivedByteBuf.release(AbstractDerivedByteBuf.java:94)
at io.grpc.netty.shaded.io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:90)
at io.grpc.netty.shaded.io.netty.util.ReferenceCountUtil.safeRelease(ReferenceCountUtil.java:116)
at io.grpc.netty.shaded.io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:280)
at io.grpc.netty.shaded.io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:361)
at io.grpc.netty.shaded.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:438)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931)
at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895)
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:921)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:907)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:893)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.flush(Http2ConnectionHandler.java:197)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:925)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:907)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:893)
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:967)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannel.flush(AbstractChannel.java:254)
at io.grpc.netty.shaded.io.grpc.netty.WriteQueue.flush(WriteQueue.java:143)
at io.grpc.netty.shaded.io.grpc.netty.WriteQueue.access$000(WriteQueue.java:35)
at io.grpc.netty.shaded.io.grpc.netty.WriteQueue$1.run(WriteQueue.java:47)
at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask$$$capture(AbstractEventExecutor.java:173)
at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java)
at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
Client error:
On error,listening end
io.grpc.StatusRuntimeException: CANCELLED: Failed to read message.
at io.grpc.Status.asRuntimeException(Status.java:533)
at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:564)
at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:72)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:729)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:710)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: io.grpc.StatusRuntimeException: INTERNAL: Invalid protobuf byte sequence
at io.grpc.Status.asRuntimeException(Status.java:525)
at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parse(ProtoLiteUtils.java:240)
at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parse(ProtoLiteUtils.java:134)
at io.grpc.MethodDescriptor.parseResponse(MethodDescriptor.java:284)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:657)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:644)
... 5 common frames omitted
Caused by: com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either that the input has been truncated or that an embedded message misreported its own length.
at com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:115)
at com.google.protobuf.CodedInputStream$ArrayDecoder.readStringRequireUtf8(CodedInputStream.java:822)
at com.google.protobuf.StringValue$Builder.mergeFrom(StringValue.java:386)
at com.google.protobuf.StringValue$1.parsePartialFrom(StringValue.java:517)
at com.google.protobuf.StringValue$1.parsePartialFrom(StringValue.java:509)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:86)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parseFrom(ProtoLiteUtils.java:245)
at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parse(ProtoLiteUtils.java:237)
... 9 common frames omitted
If I control the sending frequency with a queue, then this exception does not occur.
Metadata
Metadata
Assignees
Labels
No labels