Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Fix building broken batched message when publishing #24061

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Mar 6, 2025

Motivation

1. The views of the issue:

  • The first single message can not be extracted and gets an error ava.lang.IllegalArgumentException: Invalid unknown tag type: 3
  • And it also leads to a message loss issue.
  • You can reproduce the issue by the new test testNoEnoughMemSend

2. The steps of issue occurs.

  • Enable batch send
  • Publish async msg-1
  • Flush batched messages
    • Since there is only 1 msg in the container, the batched message will be built as [{msg-metadata-1}, {msg-payload}].
  • Failed build OpSendMsg
    • The variable BatchMessageContainerImpl.batchedMessageMetadataAndPayload is [{msg-metadata-1}, {msg-payload-1}] now
    • (Highlight): Since {batch-msg-metadata} will not be appended into BatchMessageContainerImpl.batchedMessageMetadataAndPayload, the variable will be [{msg-payload-1}]
  • Publish async msg-1
  • Flush batched messages
    • In expected: since there is 2 msg in the container, the batched message will be built as [{batch-metadata}, {single-msg-metadata-1}, {msg-payload-1}, {single-msg-metadata-2}, {msg-payload-2}].
    • (Highlight) Issue: Since the variable BatchMessageContainerImpl.batchedMessageMetadataAndPayload already has some data that has not been cleared, the data actually is [{batch-metadata}, {msg-payload-1}, {single-msg-metadata-1}, {msg-payload-1}, {single-msg-metadata-2}, {msg-payload-2}]
    • (Highlight): BTW, since {msg-payload-1} has been read out at the first flushing, the second one will be empty, so the final data is [{batch-metadata}, {msg-payload-1},{single-msg-metadata-1}, {empty}, {single-msg-metadata-2}, {msg-payload-2}]

3. Explain why it also leads to a message loss issue.

The error will also lead the batch message to be [{batch-metadata}, {single-msg-metadata-1}, {msg-payload-1}, {single-msg-metadata-2}, {msg-payload-2}, {single-msg-metadata-1}, {empty}, {single-msg-metadata-2}, {empty}, {single-msg-metadata-3}, {msg-payload-3}]. Then the msg-3 will be discarded because the value of batch-metadata.numChunksFromMsg is 3, but there are 5 single message metadata.


4. Error that we encountered

  • bin/pulsar-admin topics get-message-by-id -l 2310013 -e 57 <topic name>
2025-02-19T10:47:10,530+0000 [AsyncHttpClient-9-1] ERROR org.apache.pulsar.client.admin.internal.TopicsImpl - Exception occurred while trying to get BatchMsgId: 2310013:57:0
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 3
	at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[io.streamnative-pulsar-common-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.common.api.proto.SingleMessageMetadata.parseFrom(SingleMessageMetadata.java:470) ~[io.streamnative-pulsar-common-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.common.protocol.Commands.deSerializeSingleMessageInBatch(Commands.java:1891) ~[io.streamnative-pulsar-common-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.client.admin.internal.TopicsImpl.getIndividualMsgsFromBatch(TopicsImpl.java:1496) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.client.admin.internal.TopicsImpl.getMessagesFromHttpResponse(TopicsImpl.java:1473) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.client.admin.internal.TopicsImpl.getMessagesFromHttpResponse(TopicsImpl.java:1270) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.client.admin.internal.TopicsImpl.access$200(TopicsImpl.java:100) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.client.admin.internal.TopicsImpl$21.completed(TopicsImpl.java:1012) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.client.admin.internal.TopicsImpl$21.completed(TopicsImpl.java:1008) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at org.glassfish.jersey.client.JerseyInvocation$1.completed(JerseyInvocation.java:873) ~[org.glassfish.jersey.core-jersey-client-2.42.jar:?]
	at org.glassfish.jersey.client.ClientRuntime.processResponse(ClientRuntime.java:232) ~[org.glassfish.jersey.core-jersey-client-2.42.jar:?]
	at org.glassfish.jersey.client.ClientRuntime.access$200(ClientRuntime.java:62) ~[org.glassfish.jersey.core-jersey-client-2.42.jar:?]
	at org.glassfish.jersey.client.ClientRuntime$2.lambda$response$0(ClientRuntime.java:176) ~[org.glassfish.jersey.core-jersey-client-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:292) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:274) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:288) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.client.ClientRuntime$2.response(ClientRuntime.java:176) ~[org.glassfish.jersey.core-jersey-client-2.42.jar:?]
	at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$apply$1(AsyncHttpConnector.java:317) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?]
	at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$retryOperation$4(AsyncHttpConnector.java:373) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.cancel(Unknown Source) ~[?:?]
	at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$executeRequest$10(AsyncHttpConnector.java:442) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?]
	at com.spotify.futures.ConcurrencyReducer.lambda$invoke$0(ConcurrencyReducer.java:173) ~[com.spotify-completable-futures-0.3.6.jar:0.3.6]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?]
	at org.asynchttpclient.netty.NettyResponseFuture.loadContent(NettyResponseFuture.java:222) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
	at org.asynchttpclient.netty.NettyResponseFuture.done(NettyResponseFuture.java:257) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
	at org.asynchttpclient.netty.handler.AsyncHttpClientHandler.finishUpdate(AsyncHttpClientHandler.java:241) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
	at org.asynchttpclient.netty.handler.HttpHandler.handleChunk(HttpHandler.java:114) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
	at org.asynchttpclient.netty.handler.HttpHandler.handleRead(HttpHandler.java:143) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
	at org.asynchttpclient.netty.handler.AsyncHttpClientHandler.channelRead(AsyncHttpClientHandler.java:78) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[io.netty-netty-codec-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[io.netty-netty-codec-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[io.netty-netty-codec-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[io.netty-netty-common-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.113.Final.jar:4.1.113.Final]
	at java.base/java.lang.Thread.run(Unknown Source) [?:?]
2025-02-19T10:47:10,553+0000 [AsyncHttpClient-9-1] ERROR org.apache.pulsar.client.admin.internal.TopicsImpl - Exception occurred while trying to get BatchMsgId: 2310013:57:1
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 6
	at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[io.streamnative-pulsar-common-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.common.api.proto.SingleMessageMetadata.parseFrom(SingleMessageMetadata.java:470) ~[io.streamnative-pulsar-common-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.common.protocol.Commands.deSerializeSingleMessageInBatch(Commands.java:1891) ~[io.streamnative-pulsar-common-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.client.admin.internal.TopicsImpl.getIndividualMsgsFromBatch(TopicsImpl.java:1496) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.client.admin.internal.TopicsImpl.getMessagesFromHttpResponse(TopicsImpl.java:1473) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.client.admin.internal.TopicsImpl.getMessagesFromHttpResponse(TopicsImpl.java:1270) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.client.admin.internal.TopicsImpl.access$200(TopicsImpl.java:100) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.client.admin.internal.TopicsImpl$21.completed(TopicsImpl.java:1012) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.client.admin.internal.TopicsImpl$21.completed(TopicsImpl.java:1008) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at org.glassfish.jersey.client.JerseyInvocation$1.completed(JerseyInvocation.java:873) ~[org.glassfish.jersey.core-jersey-client-2.42.jar:?]
	at org.glassfish.jersey.client.ClientRuntime.processResponse(ClientRuntime.java:232) ~[org.glassfish.jersey.core-jersey-client-2.42.jar:?]
	at org.glassfish.jersey.client.ClientRuntime.access$200(ClientRuntime.java:62) ~[org.glassfish.jersey.core-jersey-client-2.42.jar:?]
	at org.glassfish.jersey.client.ClientRuntime$2.lambda$response$0(ClientRuntime.java:176) ~[org.glassfish.jersey.core-jersey-client-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:292) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:274) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:288) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.client.ClientRuntime$2.response(ClientRuntime.java:176) ~[org.glassfish.jersey.core-jersey-client-2.42.jar:?]
	at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$apply$1(AsyncHttpConnector.java:317) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?]
	at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$retryOperation$4(AsyncHttpConnector.java:373) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.cancel(Unknown Source) ~[?:?]
	at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$executeRequest$10(AsyncHttpConnector.java:442) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?]
	at com.spotify.futures.ConcurrencyReducer.lambda$invoke$0(ConcurrencyReducer.java:173) ~[com.spotify-completable-futures-0.3.6.jar:0.3.6]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?]
	at org.asynchttpclient.netty.NettyResponseFuture.loadContent(NettyResponseFuture.java:222) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
	at org.asynchttpclient.netty.NettyResponseFuture.done(NettyResponseFuture.java:257) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
	at org.asynchttpclient.netty.handler.AsyncHttpClientHandler.finishUpdate(AsyncHttpClientHandler.java:241) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
	at org.asynchttpclient.netty.handler.HttpHandler.handleChunk(HttpHandler.java:114) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
	at org.asynchttpclient.netty.handler.HttpHandler.handleRead(HttpHandler.java:143) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
	at org.asynchttpclient.netty.handler.AsyncHttpClientHandler.channelRead(AsyncHttpClientHandler.java:78) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[io.netty-netty-codec-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[io.netty-netty-codec-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[io.netty-netty-codec-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[io.netty-netty-common-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.113.Final.jar:4.1.113.Final]
	at java.base/java.lang.Thread.run(Unknown Source) [?:?]
2025-02-19T10:47:10,555+0000 [AsyncHttpClient-9-1] ERROR org.apache.pulsar.client.admin.internal.TopicsImpl - Exception occurred while trying to get BatchMsgId: 2310013:57:2
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 4
	at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[io.streamnative-pulsar-common-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.common.api.proto.SingleMessageMetadata.parseFrom(SingleMessageMetadata.java:470) ~[io.streamnative-pulsar-common-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.common.protocol.Commands.deSerializeSingleMessageInBatch(Commands.java:1891) ~[io.streamnative-pulsar-common-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.client.admin.internal.TopicsImpl.getIndividualMsgsFromBatch(TopicsImpl.java:1496) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.client.admin.internal.TopicsImpl.getMessagesFromHttpResponse(TopicsImpl.java:1473) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.client.admin.internal.TopicsImpl.getMessagesFromHttpResponse(TopicsImpl.java:1270) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.client.admin.internal.TopicsImpl.access$200(TopicsImpl.java:100) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.client.admin.internal.TopicsImpl$21.completed(TopicsImpl.java:1012) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at org.apache.pulsar.client.admin.internal.TopicsImpl$21.completed(TopicsImpl.java:1008) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at org.glassfish.jersey.client.JerseyInvocation$1.completed(JerseyInvocation.java:873) ~[org.glassfish.jersey.core-jersey-client-2.42.jar:?]
	at org.glassfish.jersey.client.ClientRuntime.processResponse(ClientRuntime.java:232) ~[org.glassfish.jersey.core-jersey-client-2.42.jar:?]
	at org.glassfish.jersey.client.ClientRuntime.access$200(ClientRuntime.java:62) ~[org.glassfish.jersey.core-jersey-client-2.42.jar:?]
	at org.glassfish.jersey.client.ClientRuntime$2.lambda$response$0(ClientRuntime.java:176) ~[org.glassfish.jersey.core-jersey-client-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:292) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:274) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:288) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
	at org.glassfish.jersey.client.ClientRuntime$2.response(ClientRuntime.java:176) ~[org.glassfish.jersey.core-jersey-client-2.42.jar:?]
	at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$apply$1(AsyncHttpConnector.java:317) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?]
	at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$retryOperation$4(AsyncHttpConnector.java:373) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.cancel(Unknown Source) ~[?:?]
	at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$executeRequest$10(AsyncHttpConnector.java:442) ~[io.streamnative-pulsar-client-admin-original-3.3.1.8.jar:3.3.1.8]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?]
	at com.spotify.futures.ConcurrencyReducer.lambda$invoke$0(ConcurrencyReducer.java:173) ~[com.spotify-completable-futures-0.3.6.jar:0.3.6]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?]
	at org.asynchttpclient.netty.NettyResponseFuture.loadContent(NettyResponseFuture.java:222) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
	at org.asynchttpclient.netty.NettyResponseFuture.done(NettyResponseFuture.java:257) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
	at org.asynchttpclient.netty.handler.AsyncHttpClientHandler.finishUpdate(AsyncHttpClientHandler.java:241) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
	at org.asynchttpclient.netty.handler.HttpHandler.handleChunk(HttpHandler.java:114) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
	at org.asynchttpclient.netty.handler.HttpHandler.handleRead(HttpHandler.java:143) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
	at org.asynchttpclient.netty.handler.AsyncHttpClientHandler.channelRead(AsyncHttpClientHandler.java:78) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[io.netty-netty-codec-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[io.netty-netty-codec-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[io.netty-netty-codec-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[io.netty-netty-transport-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[io.netty-netty-common-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.113.Final.jar:4.1.113.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.113.Final.jar:4.1.113.Final]
	at java.base/java.lang.Thread.run(Unknown Source) [?:?]

Modifications

  • fix the issue.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

Copy link

github-actions bot commented Mar 6, 2025

@poorbarcode Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

@github-actions github-actions bot added doc-not-needed Your PR changes do not impact docs and removed doc-label-missing labels Mar 6, 2025
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work tracing down this issue, Yubiao! I have some initial feedback as comments.

Comment on lines -165 to -168
for (int j = 0; j <= i; j++) {
MessageImpl<?> previousMsg = messages.get(j);
previousMsg.getDataBuffer().resetReaderIndex();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering why it wouldn't be fine to reset the reader index of all already batched messages? I guess we don't have a test case to fully cover it where it matters?
Would it be possible to add a test case where multiple messages are sent and the failure happens on the last one? To ensure that all messages get sent successfully when the sending resumes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I described this in the Motivation section

(Highlight): BTW, since {msg-payload-1} has been read out at the first flushing, the second one will be empty, so the final data is [{batch-metadata}, {msg-payload-1},{single-msg-metadata-1}, {empty}, {single-msg-metadata-2}, {msg-payload-2}]

The message-payload will be read again once the previous Send-Command building fails; it needs to be reset after a failed Send-Command building.

I reset the message payload after reading it, which is a simple way to fix the issue of reading an empty payload. Another solution is as follows, which is more safe:

  • records message-payload.readerIndex when messages are added
  • only reset message-payload when failed building a Send-Command

Please show your view that prefer which soltion

Copy link
Member

@lhotari lhotari Mar 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@poorbarcode Please add a similar test case as what you have already added with the variation that instead of failing after every message, fail and flushAsync only after the 3rd message. That would reveal the problem if it exists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

@codecov-commenter
Copy link

codecov-commenter commented Mar 6, 2025

Codecov Report

Attention: Patch coverage is 58.33333% with 5 lines in your changes missing coverage. Please review.

Project coverage is 74.16%. Comparing base (bbc6224) to head (add2c58).
Report is 952 commits behind head on master.

Files with missing lines Patch % Lines
...sar/client/impl/BatchMessageKeyBasedContainer.java 0.00% 4 Missing ⚠️
.../pulsar/client/impl/BatchMessageContainerImpl.java 80.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24061      +/-   ##
============================================
+ Coverage     73.57%   74.16%   +0.59%     
+ Complexity    32624    32025     -599     
============================================
  Files          1877     1862      -15     
  Lines        139502   144231    +4729     
  Branches      15299    16432    +1133     
============================================
+ Hits         102638   106970    +4332     
+ Misses        28908    28816      -92     
- Partials       7956     8445     +489     
Flag Coverage Δ
inttests 26.71% <16.66%> (+2.13%) ⬆️
systests 23.12% <16.66%> (-1.20%) ⬇️
unittests 73.68% <58.33%> (+0.84%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...va/org/apache/pulsar/client/impl/ProducerImpl.java 84.48% <100.00%> (+0.89%) ⬆️
...va/org/apache/pulsar/common/protocol/Commands.java 90.95% <100.00%> (+0.35%) ⬆️
.../pulsar/client/impl/BatchMessageContainerImpl.java 87.30% <80.00%> (+6.40%) ⬆️
...sar/client/impl/BatchMessageKeyBasedContainer.java 60.37% <0.00%> (-4.93%) ⬇️

... and 1053 files with indirect coverage changes

🚀 New features to boost your workflow:
  • Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@poorbarcode poorbarcode requested a review from lhotari March 7, 2025 03:33
@poorbarcode poorbarcode requested review from lhotari and removed request for lhotari March 10, 2025 03:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs ready-to-test release/3.0.11 release/3.3.6 release/4.0.4 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants