Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Send timeout happened immediately when batching is enabled #21884

Closed
2 tasks done
BewareMyPower opened this issue Jan 11, 2024 · 2 comments · Fixed by #21889
Closed
2 tasks done

[Bug] Send timeout happened immediately when batching is enabled #21884

BewareMyPower opened this issue Jan 11, 2024 · 2 comments · Fixed by #21889
Assignees
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@BewareMyPower
Copy link
Contributor

Search before asking

  • I searched in the issues and found nothing similar.

Version

KSN (the private version of the Kafka protocol handler) based on Pulsar 3.1.0.

Minimal reproduce step

Background for KSN

There is a partitioned topic public/__kafka/__consumer_offsets with 50 partitions. In KSN, there are 50 producers and 50 readers created on this topic. The producer is created like:

        this.producerBuilder = client.newProducer(schema).enableBatching(true)
                .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
                .maxPendingMessages(maxPendingMessages)
                .sendTimeout(offsetConfig.offsetCommitTimeoutMs(), TimeUnit.MILLISECONDS)
                .blockIfQueueFull(true);

The send timeout is 5 seconds.

            final var future = producers.computeIfAbsent(partition, __ ->
                producerBuilder.clone().topic(getPartition(partition)).createAsync());

I used the Kafka CLI to consume a topic, which committed offsets to __consumer_offsets-partition-38 with an interval of 5 seconds. Each time an offset is committed, a message will be sent via a Pulsar producer.

        producer.newMessage().keyBytes(key).value(value).eventTime(timestamp).sendAsync().whenComplete(
                (msgId, e) -> {
                    if (e == null) {
                        future.complete(msgId);
                    } else {
                        if (e instanceof PulsarClientException.AlreadyClosedException) {
                            // The producer is already closed, we don't need to close it again.
                            producers.remove(partition);
                        } else if (e instanceof PulsarClientException.TimeoutException te) {
                            log.warn("Timeout when sending: {}, seq id: {}", te.getMessage(), te.getSequenceId());
                        }
                        future.completeExceptionally(e);
                    }
                });

After some time, the commit offset request failed with timeout error:

$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --group my-group
[2024-01-11 23:18:00,649] WARN [Consumer clientId=console-consumer, groupId=my-group] Offset commit failed on partition my-topic-0 at offset 0: The request timed out. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

From the broker side logs, we can see:

2024-01-11T23:10:23,628+0800 [pulsar-ph-kafka-224-11] INFO  io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync GroupMetadata{groupId=my-group, generation=22, protocolType=Optional[consumer], state=Stable, members={console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9=MemberMetadata{memberId=console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9, clientId=console-consumer, clientHost=/127.0.0.1:63244, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, supportedProtocols=java.util.stream.ReferencePipeline$Head@1031ff46}}} (38) key: 18 bytes
2024-01-11T23:10:23,630+0800 [pulsar-timer-118-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [public/__kafka/__consumer_offsets-partition-38] [standalone-2804-81] Message send timed out. Failing 1 messages
2024-01-11T23:10:23,631+0800 [pulsar-timer-118-1] WARN  io.streamnative.pulsar.handlers.kop.coordinator.CompactedPartitionedTopic - Timeout when sending: The producer standalone-2804-81 can not send message to the topic public/__kafka/__consumer_offsets-partition-38 within given timeout, seq id: -1

What did you expect to see?

Before the sendAsync method is called, the timestamp is 23:10:23,628. However, when the message timed out, the timestamp is 23:10:23,631. The interval is only 3 milliseconds.

What did you see instead?

The time duration between XYZ storeOffsetMessageAsync and Timeout when sending should be close to 5 seconds.

Anything else?

There is nothing strange before the timeout happened.

2024-01-11T23:09:53,619+0800 [pulsar-client-io-87-3] INFO  org.apache.pulsar.client.impl.ProducerImpl - [public/__kafka/__consumer_offsets-partition-38] [standalone-2804-81] Created producer on cnx [id: 0x4d70618b, L:/127.0.0.1:63232 - R:/127.0.0.1:6650]
2024-01-11T23:09:53,623+0800 [pulsar-client-io-87-3] INFO  io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync successfully 38 key: 18 bytes
2024-01-11T23:09:58,622+0800 [pulsar-ph-kafka-224-11] INFO  io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync GroupMetadata{groupId=my-group, generation=22, protocolType=Optional[consumer], state=Stable, members={console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9=MemberMetadata{memberId=console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9, clientId=console-consumer, clientHost=/127.0.0.1:63244, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, supportedProtocols=java.util.stream.ReferencePipeline$Head@7346deaf}}} (38) key: 18 bytes
2024-01-11T23:09:58,630+0800 [pulsar-client-io-87-3] INFO  io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync successfully 38 key: 18 bytes
2024-01-11T23:10:03,620+0800 [pulsar-ph-kafka-224-11] INFO  io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync GroupMetadata{groupId=my-group, generation=22, protocolType=Optional[consumer], state=Stable, members={console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9=MemberMetadata{memberId=console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9, clientId=console-consumer, clientHost=/127.0.0.1:63244, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, supportedProtocols=java.util.stream.ReferencePipeline$Head@624415e4}}} (38) key: 18 bytes
2024-01-11T23:10:03,625+0800 [pulsar-client-io-87-3] INFO  io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync successfully 38 key: 18 bytes
2024-01-11T23:10:08,624+0800 [pulsar-ph-kafka-224-11] INFO  io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync GroupMetadata{groupId=my-group, generation=22, protocolType=Optional[consumer], state=Stable, members={console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9=MemberMetadata{memberId=console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9, clientId=console-consumer, clientHost=/127.0.0.1:63244, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, supportedProtocols=java.util.stream.ReferencePipeline$Head@b35f1bd}}} (38) key: 18 bytes
2024-01-11T23:10:08,630+0800 [pulsar-client-io-87-3] INFO  io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync successfully 38 key: 18 bytes
2024-01-11T23:10:13,621+0800 [pulsar-ph-kafka-224-11] INFO  io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync GroupMetadata{groupId=my-group, generation=22, protocolType=Optional[consumer], state=Stable, members={console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9=MemberMetadata{memberId=console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9, clientId=console-consumer, clientHost=/127.0.0.1:63244, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, supportedProtocols=java.util.stream.ReferencePipeline$Head@33f29b11}}} (38) key: 18 bytes
2024-01-11T23:10:13,626+0800 [pulsar-client-io-87-3] INFO  io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync successfully 38 key: 18 bytes
2024-01-11T23:10:17,592+0800 [SyncThread-9-1] INFO  org.apache.bookkeeper.bookie.Journal - garbage collected journal 18cf7e07713.txn
2024-01-11T23:10:18,624+0800 [pulsar-ph-kafka-224-11] INFO  io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync GroupMetadata{groupId=my-group, generation=22, protocolType=Optional[consumer], state=Stable, members={console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9=MemberMetadata{memberId=console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9, clientId=console-consumer, clientHost=/127.0.0.1:63244, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, supportedProtocols=java.util.stream.ReferencePipeline$Head@76848866}}} (38) key: 18 bytes
2024-01-11T23:10:18,631+0800 [pulsar-client-io-87-3] INFO  io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync successfully 38 key: 18 bytes

The logs above are all INFO logs before the timeout error. As you can see, the message send interval is 1 message per 5 seconds and other messages succeeded as well.

I printed the sequence id of the TimeoutException and found it's -1.

024-01-11T23:10:23,631+0800 [pulsar-timer-118-1] WARN  io.streamnative.pulsar.handlers.kop.coordinator.CompactedPartitionedTopic - Timeout when sending: The producer standalone-2804-81 can not send message to the topic public/__kafka/__consumer_offsets-partition-38 within given timeout, seq id: -1

It means it failed here:

failPendingBatchMessages(new PulsarClientException.TimeoutException(msg));

If the message is already created as an OpSendMsg, the sequence id should be positive.

full-logs.tar.gz

I suspected there is something wrong with the refactored logic of: #14185

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@BewareMyPower BewareMyPower added the type/bug The PR fixed a bug or issue reported a bug label Jan 11, 2024
@BewareMyPower
Copy link
Contributor Author

It's hard to reproduce in IDE because the producer performance is low:

2024-01-11T23:52:55,784+0800 INFO  [main] o.a.p.c.a.MessageIdTest@74 - XYZ send 1 {}
2024-01-11T23:52:55,797+0800 INFO  [pulsar-client-io-1-5] o.a.p.c.a.MessageIdTest@80 - XYZ sent 1 to 140:169:-1 {}

Each message takes 10+ ms to complete.

@BewareMyPower
Copy link
Contributor Author

I have figured out the root cause. I will push a PR soon.

BewareMyPower added a commit to BewareMyPower/pulsar that referenced this issue Jan 12, 2024
Fixes apache#21884

### Motivation

When `ProducerImpl#run` is called where `pendingMessages` is empty and
`batchMessageContainer` is not empty, the whole batch's timestamp is
treated as `lastBatchSendNanoTime + batch latency`.

Given send timeout as 5 seconds, assuming a batch was flushed at `t1`
and the next message was sent after 5 seconds:
- t1: `ProducerImpl#batchFlushTask()`, `lastBatchSendNanoTime` is now `t1`
- t1 + 0.0011 s: `ProducerImpl#run()`, no pending message and the batch container is empty
- t1 + 5.0010 s: `sendAsync()`, the message is added to batch
- t1 + 5.0011 s: `ProducerImpl#run()` is called before `batchFlushTask()`

https://github.com/apache/pulsar/blob/176bdeacd309e8c1e49358987a1946abd30ba34a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2014-L2015

`createdAt` will be `t1 + 0.001 s`. However, the current time point is
`t1 + 5.0011 s`, the interval is 5.001 s, timeout happened.

### Modification

Record the timestamp when the 1st message is added to the batch
container and use this timestamp instead of `lastBatchSendNanoTime` to
compute the `createdAt`.

Add `testSendTimerCheckForBatchContainer` to cover this case.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant