Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

negative unackedMessages in consumers stats #6510

Closed
ilyam8 opened this issue Mar 8, 2020 · 5 comments
Closed

negative unackedMessages in consumers stats #6510

ilyam8 opened this issue Mar 8, 2020 · 5 comments
Labels
release/2.5.1 type/bug The PR fixed a bug or issue reported a bug
Milestone

Comments

@ilyam8
Copy link
Contributor

ilyam8 commented Mar 8, 2020

Describe the bug

Pulsar docker image apachepulsar/pulsar:2.5.0. Standalone mode.

I noticed that pulsar_subscription_unacked_messages value in the prometheus stats is a negative number for non-persitent topics.

According my tests (not extensive) the bug appears only when topic type is non-persistent.

Related: #5929


That is what i get:

pulsar_subscription_unacked_messages{cluster="standalone",namespace="sample/ns1",topic="non-persistent://sample/ns1/demo-1",subscription="my-consumer-11"} -4739 1583659567277
{
    "non-persistent://sample/ns1/demo-1": {
        "publishers": [
            {
                "msgRateIn": 8.998,
                "msgThroughputIn": 449.904,
                "averageMsgSize": 50,
                "address": "/172.17.0.1:34560",
                "producerId": 0,
                "producerName": "my-producer-11",
                "connectedSince": "2020-03-08T09:17:45.526Z",
                "clientVersion": "pulsar-client-go",
                "metadata": {}
            }
        ],
        "replication": {},
        "subscriptions": {
            "my-consumer-11": {
                "consumers": [
                    {
                        "address": "/172.17.0.1:34560",
                        "consumerName": "",
                        "availablePermits": 1,
                        "connectedSince": "2020-03-08T09:17:45.526Z",
                        "msgRateOut": 8.998,
                        "msgThroughputOut": 449.901,
                        "msgRateRedeliver": 0,
                        "unackedMessages": -5925,
                        "blockedConsumerOnUnackedMsgs": false,
                        "clientVersion": "pulsar-client-go",
                        "metadata": {}
                    }
                ],
                "msgBacklog": 0,
                "msgRateExpired": 0,
                "msgRateOut": 8.998,
                "msgThroughputOut": 449.901,
                "msgRateRedeliver": 0,
                "type": "Shared",
                "msgDropRate": 0
            }
        },
        "producerCount": 1,
        "averageMsgSize": 49.999,
        "msgRateIn": 8.998,
        "msgRateOut": 8.998,
        "msgThroughputIn": 449.904,
        "msgThroughputOut": 449.901
    }
}

To Reproduce
Steps to reproduce the behavior:

See the code below. It creates publisher/consumer for following topics:

  • non-persistent://sample/ns1/demo-1 # has hegative unackedMessages in consumers stats
  • persistent://sample/ns1/demo-1 # no problem

And that is pretty much it.

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	comcast "github.com/Comcast/pulsar-client-go"
)

var pulsarPool = comcast.NewManagedClientPool()

func newPulsarProducer(name, topic string) *comcast.ManagedProducer {
	return comcast.NewManagedProducer(pulsarPool, comcast.ManagedProducerConfig{
		ManagedClientConfig: comcast.ManagedClientConfig{
			ClientConfig: comcast.ClientConfig{
				Addr: "pulsar://localhost:6650",
			},
		},
		Topic: topic,
		Name:  name,
	})
}

func newPulsarConsumer(name, topic string) *comcast.ManagedConsumer {
	return comcast.NewManagedConsumer(pulsarPool, comcast.ManagedConsumerConfig{
		ManagedClientConfig: comcast.ManagedClientConfig{
			ClientConfig: comcast.ClientConfig{
				Addr: "pulsar://localhost:6650",
			},
		},
		Topic: topic,
		Name:  name,
	})
}

func startPulsarProducer(p *comcast.ManagedProducer, wg *sync.WaitGroup, stop chan struct{}) {
	defer func() {
		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
		defer cancel()
		_ = p.Close(ctx)
		wg.Done()
	}()
	produce := func(msg string) error {
		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
		defer cancel()
		_, err := p.Send(ctx, []byte(msg))
		return err
	}

loop:
	for i := 0; ; i++ {
		select {
		case <-stop:
			break loop
		default:
			msg := fmt.Sprintf("message-%d", i)
			if err := produce(msg); err != nil {
				log.Printf("producer error: %v\n", err)
				break loop
			}
		}
		time.Sleep(time.Millisecond * 100)
	}
}

func startPulsarConsumer(c *comcast.ManagedConsumer, wg *sync.WaitGroup) {
	defer func() {
		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
		defer cancel()
		_ = c.Unsubscribe(ctx)
		_ = c.Close(ctx)
		wg.Done()
	}()
	consume := func() error {
		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
		defer cancel()

		msg, err := c.Receive(ctx)
		if err != nil {
			return err
		}
		log.Println("received:", msg.Topic, string(msg.Payload), *msg.Msg.ConsumerId)

		return c.Ack(ctx, msg)
	}

	for {
		if err := consume(); err != nil {
			log.Printf("consumer error: %v\n", err)
			break
		}
	}
}

func doPulsar() {
	stopCh := make(chan struct{})
	go func() {
		signalChan := make(chan os.Signal, 1)
		signal.Notify(signalChan, syscall.SIGINT)
		<-signalChan
		close(stopCh)
		log.Println("SIGINT received. Terminating...")
	}()

	topic1 := "non-persistent://sample/ns1/demo-1"
	p11 := newPulsarProducer("my-producer-11", topic1)
	c11 := newPulsarConsumer("my-consumer-11", topic1)

	topic2 := "persistent://sample/ns1/demo-1"
	p21 := newPulsarProducer("my-producer-21", topic2)
	c21 := newPulsarConsumer("my-consumer-21", topic2)

	var wg sync.WaitGroup
	wg.Add(4)
	go startPulsarProducer(p11, &wg, stopCh)
	go startPulsarConsumer(c11, &wg)

	go startPulsarProducer(p21, &wg, stopCh)
	go startPulsarConsumer(c21, &wg)

	wg.Wait()
}

func main() {
	doPulsar()
}

Expected behavior

Do not have negative unackedMessages/pulsar_subscription_unacked_messages values.

Desktop (please complete the following information):

  • ProductName: Mac OS X, ProductVersion: 10.15.3
@jiazhai
Copy link
Member

jiazhai commented Mar 11, 2020

@codelipenghui #5929, Would you please help on this issue?

@codelipenghui
Copy link
Contributor

codelipenghui commented Mar 16, 2020

@ilyam8 I think #5929 can fix this issue because of the value of maxUnackedMessages always 0 for a non-persistent topic's consumer. And #5929 will release at 2.5.1

@ilyam8
Copy link
Contributor Author

ilyam8 commented Mar 16, 2020

Oh, i didnt notice 2.5.1 label in the #5929, i thought is is in the 2.5.0.

Ok, so this issue is duplicate of #5755, feel free to close. Thx!

@codelipenghui
Copy link
Contributor

Ok, thanks for your feedback, close this issue via #5929

@kuskmen
Copy link

kuskmen commented Jul 18, 2023

Hi, we are running 2.9.x version of pulsar and we are currently seeing negative values for pulsar_subscription_unacked_messages metrics. We do know that consumer of given subscription is in fact "holding" msg until AckTimeout is reached (issue on our side we are looking to fix), but I was wondering could this also be the reason why pulsar_subscription_unacked_messages is growing (in the wrong direction (negative) ) as well and can I cross reference it in order to conclude that once issue has been resolved from our side this value should be zero. Or is it some bug on your side and this value should never come below 0? Notice that when we restart the consumer everything goes back to normal.

P.S backlog remains 0-5 constantly

for instance observe this chart
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release/2.5.1 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

4 participants