Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cant ack messages on exclusive subscriptions with partioned topic #6552

Closed
st0ckface opened this issue Mar 18, 2020 · 0 comments · Fixed by #6558
Closed

Cant ack messages on exclusive subscriptions with partioned topic #6552

st0ckface opened this issue Mar 18, 2020 · 0 comments · Fixed by #6558
Assignees
Labels
release/2.5.1 type/bug The PR fixed a bug or issue reported a bug
Milestone

Comments

@st0ckface
Copy link

Describe the bug
When creating a consumer with an exclusive subscription, acked messages are not reflected in broker stats.

To Reproduce
Steps to reproduce the behavior:

  1. Create a consumer with an exclusive subscription to a partitioned topic
  2. Ack each message individually as they are received
  3. Notice that "unackedMessages" continually climbs in the topic stats via the admin client
static PulsarClient getJPulsarClient() throws Exception{
        return PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .connectionsPerBroker(20)
                .ioThreads(8)
                .listenerThreads(8)
                .build();

    }

    static Consumer<byte[]> getJPulsarConsumer(PulsarClient client, String topic, String subscription) throws Exception {
      return  client.newConsumer(Schema.BYTES)
                .subscriptionName(subscription)
                .subscriptionType(SubscriptionType.Exclusive)
                .topic(topic)
                .consumerName("ackTestConsumer")
              .subscribe();
    }

    public static void main(String... args) throws Exception{
        PulsarClient client = getJPulsarClient();
        String topic = "public/default/870|EXTERNID_NG|2020-03-17";
        String subscription = "ackTestSubscription"
        Consumer<byte[]> consumer = getJPulsarConsumer(client, topic, subscription);

        int maxMessages = 1000000;
        List<CompletableFuture<Object>> futures = new ArrayList<>();
        for(int i = 0; i < maxMessages; i ++){
            futures.add(consumer.receiveAsync().thenApplyAsync(m -> {
                try {
                    consumer.acknowledge(m.getMessageId());
                } catch (PulsarClientException e) {
                    e.printStackTrace();
                }
                return null;
            }));
        }

        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).get();
    }

Expected behavior
The "unackedMessages" metric should at most hover at or around the receiveQueueSize if not 0. The backlog should also be descreasing

Screenshots
If applicable, add screenshots to help explain your problem.
Screen Shot 2020-03-18 at 5 52 35 PM
Screen Shot 2020-03-18 at 5 52 18 PM
image

Desktop (please complete the following information):

  • OS: N/A

Additional context
If the subscription is changed to Shared, the topicStats produce the expected behavior (showing messages acked)

@st0ckface st0ckface added the type/bug The PR fixed a bug or issue reported a bug label Mar 18, 2020
@codelipenghui codelipenghui self-assigned this Mar 19, 2020
@codelipenghui codelipenghui added this to the 2.6.0 milestone Mar 19, 2020
sijie pushed a commit that referenced this issue Mar 19, 2020
…over subscription mode. (#6558)

Fixes #6552

### Motivation

#6552 is introduced by #5929, so this PR stop increase unacked messages for the consumer with Exclusive/Failover subscription mode.
tuteng pushed a commit to AmateurEvents/pulsar that referenced this issue Mar 21, 2020
…over subscription mode. (apache#6558)

Fixes apache#6552

### Motivation

apache#6552 is introduced by apache#5929, so this PR stop increase unacked messages for the consumer with Exclusive/Failover subscription mode.

(cherry picked from commit 2449696)
tuteng pushed a commit that referenced this issue Apr 6, 2020
…over subscription mode. (#6558)

Fixes #6552

### Motivation

#6552 is introduced by #5929, so this PR stop increase unacked messages for the consumer with Exclusive/Failover subscription mode.

(cherry picked from commit 2449696)
tuteng pushed a commit that referenced this issue Apr 13, 2020
…over subscription mode. (#6558)

Fixes #6552

### Motivation

#6552 is introduced by #5929, so this PR stop increase unacked messages for the consumer with Exclusive/Failover subscription mode.

(cherry picked from commit 2449696)
jiazhai pushed a commit to jiazhai/pulsar that referenced this issue May 18, 2020
…over subscription mode. (apache#6558)

Fixes apache#6552

### Motivation

apache#6552 is introduced by apache#5929, so this PR stop increase unacked messages for the consumer with Exclusive/Failover subscription mode.
(cherry picked from commit 2449696)
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this issue Aug 24, 2020
…over subscription mode. (apache#6558)

Fixes apache#6552

### Motivation

apache#6552 is introduced by apache#5929, so this PR stop increase unacked messages for the consumer with Exclusive/Failover subscription mode.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release/2.5.1 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
2 participants