Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix consumer fetch message number maps to read entry number bug and expose avgMessagesPerEntry metric #6719

Merged
merged 6 commits into from
May 29, 2020

Conversation

hangc0276
Copy link
Contributor

Motivation

when consumer send fetch request to broker server, it contains fetch message number telling the server how many messges should be pushed to consumer client. However, the broker server stores data in bookkeeper or broker cache according to entry not single message if producer produce message using batch feature. There is a gap to map the number of message to the number of entry when dealing with consumer fetch request.

Current strategy just using the following calculating formula to deal with those situation:

messagesToRead = Math.min(availablePermits, readBatchSize);

availablePermits is the number of message the consumer requested, and readBatchSize is the max read entry size set in broker.conf

Assuming availablePermits is 1000 and readBatchSize is 500, and each entry contains 1000 messages, messagesToRead will be 500 according to this formula. The broker server will read 500 entry, that is 500 * 1000 = 500,000 messages from bookkeeper or broker cache and push 500,000 messages to consumer at one time though the consumer just need 1000 messages, which leading the consumer cost too much memory to store the fetched message, especially when we increase readBatchSize to increase bookkeeper read throughput.

Changes

I add an variable avgMessagesPerEntry to record average messages stored in one entry. It will update when broker server push message to the consumer using the following calculating formula

avgMessagesPerEntry = avgMessagePerEntry * avgPercent + (1 - avgPercent) * new Value

avgMessagePerEntry is the history average message number per entry and new Value is the message number per entry in the fetch request the broker read from cache or bookkeeper. avgPercent is a final value 0.9, and the value just control the history avgMessagePerEntry decay rate when update new one. The avgMessagePerEntry initial value is 1000.

When dealing with consumer fetch request, it will map fetch requst number to entry number according to the following formula:

messagesToRead = Math.min((int) Math.ceil(availablePermits * 1.0 / avgMessagesPerEntry), readBatchSize);

I also expose the avgMessagePerEntry static value to consumer stat metric json.

@sijie
Copy link
Member

sijie commented Apr 12, 2020

/pulsarbot run-failure-checks

@sijie
Copy link
Member

sijie commented Apr 12, 2020

@hangc0276 nice contribution!

@sijie sijie added area/broker type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages labels Apr 12, 2020
@sijie sijie added this to the 2.6.0 milestone Apr 12, 2020
@jiazhai
Copy link
Member

jiazhai commented Apr 13, 2020

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor

@hangc0276 Can you rebase the branch? There are some integration test related fixes in the master.

@hangc0276
Copy link
Contributor Author

@hangc0276 Can you rebase the branch? There are some integration test related fixes in the master.

I have merged the master branch code, but also run test failed. Maybe some test case run failed, i will check in detail.

@codelipenghui
Copy link
Contributor

/pulsarbot run-failure-checks

@hangc0276
Copy link
Contributor Author

/pulsarbot run-failure-checks

2 similar comments
@hangc0276
Copy link
Contributor Author

/pulsarbot run-failure-checks

@sijie
Copy link
Member

sijie commented Apr 21, 2020

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor

@hangc0276 If you have time, please help take a look at the failed tests and resolve the conflicts. And I think you can add a flag in the broker and disable it by default.

@hangc0276
Copy link
Contributor Author

@hangc0276 If you have time, please help take a look at the failed tests and resolve the conflicts. And I think you can add a flag in the broker and disable it by default.

@sijie @jiazhai @codelipenghui I add a flag to disable the precise dispatcher flow control to resolve the test case conflicts, and add a test case, please take a look again. Thanks.

@hangc0276
Copy link
Contributor Author

/pulsarbot run-failure-checks

@hangc0276
Copy link
Contributor Author

/pulsarbot run-failure-checks

2 similar comments
@hangc0276
Copy link
Contributor Author

/pulsarbot run-failure-checks

@hangc0276
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor

@hangc0276 Looks good to me.

@hangc0276
Copy link
Contributor Author

/pulsarbot run-failure-checks

5 similar comments
@hangc0276
Copy link
Contributor Author

/pulsarbot run-failure-checks

@hangc0276
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor

/pulsarbot run-failure-checks

@hangc0276
Copy link
Contributor Author

/pulsarbot run-failure-checks

@hangc0276
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor

Motivation

when consumer send fetch request to broker server, it contains fetch message number telling the server how many messges should be pushed to consumer client. However, the broker server stores data in bookkeeper or broker cache according to entry not single message if producer produce message using batch feature. There is a gap to map the number of message to the number of entry when dealing with consumer fetch request.

Current strategy just using the following calculating formula to deal with those situation:

messagesToRead = Math.min(availablePermits, readBatchSize);

availablePermits is the number of message the consumer requested, and readBatchSize is the max read entry size set in broker.conf

Assuming availablePermits is 1000 and readBatchSize is 500, and each entry contains 1000 messages, messagesToRead will be 500 according to this formula. The broker server will read 500 entry, that is 500 * 1000 = 500,000 messages from bookkeeper or broker cache and push 500,000 messages to consumer at one time though the consumer just need 1000 messages, which leading the consumer cost too much memory to store the fetched message, especially when we increase readBatchSize to increase bookkeeper read throughput.

Changes

I add an variable avgMessagesPerEntry to record average messages stored in one entry. It will update when broker server push message to the consumer using the following calculating formula

avgMessagesPerEntry = avgMessagePerEntry * avgPercent + (1 - avgPercent) * new Value

avgMessagePerEntry is the history average message number per entry and new Value is the message number per entry in the fetch request the broker read from cache or bookkeeper. avgPercent is a final value 0.9, and the value just control the history avgMessagePerEntry decay rate when update new one. The avgMessagePerEntry initial value is 1000.

When dealing with consumer fetch request, it will map fetch requst number to entry number according to the following formula:

messagesToRead = Math.min((int) Math.ceil(availablePermits * 1.0 / avgMessagesPerEntry), readBatchSize);

I also expose the avgMessagePerEntry static value to consumer stat metric json.

@codelipenghui codelipenghui merged commit 4883e1b into apache:master May 29, 2020
Huanli-Meng pushed a commit to Huanli-Meng/pulsar that referenced this pull request Jun 1, 2020
…xpose avgMessagesPerEntry metric (apache#6719)

### Motivation
when consumer send fetch request to broker server, it contains fetch message number telling the server how many messges should be pushed to consumer client. However, the broker server stores data in bookkeeper or broker cache according to entry not single message if producer produce message using batch feature. There is a gap to map the number of message to the number of entry when dealing with consumer fetch request.

Current strategy just using the following calculating formula to deal with those situation:
```
messagesToRead = Math.min(availablePermits, readBatchSize);
```
`availablePermits` is the number of message the consumer requested, and `readBatchSize` is the max read entry size set in broker.conf

Assuming `availablePermits` is 1000 and `readBatchSize` is 500, and each entry contains 1000 messages, messagesToRead will be 500 according to this formula. The broker server will read 500 entry, that is `500 * 1000 = 500,000` messages from bookkeeper or broker cache and push `500,000` messages to consumer at one time though the consumer just need 1000 messages, which leading the consumer cost too much memory to store the fetched message, especially when we increase `readBatchSize` to increase bookkeeper read throughput.

### Changes
I add an variable `avgMessagesPerEntry` to record average messages stored in one entry. It will update when broker server push message to the consumer using the following calculating formula
```
avgMessagesPerEntry = avgMessagePerEntry * avgPercent + (1 - avgPercent) * new Value
```
`avgMessagePerEntry` is the history average message number per entry and `new Value` is the message number per entry in the fetch request the broker read from cache or bookkeeper. `avgPercent` is a final value 0.9, and the value just control the history avgMessagePerEntry decay rate when update new one. The avgMessagePerEntry initial value is 1000.

When dealing with consumer fetch request, it will map fetch requst number to entry number according to the following formula:
```
messagesToRead = Math.min((int) Math.ceil(availablePermits * 1.0 / avgMessagesPerEntry), readBatchSize);
```

I also expose the avgMessagePerEntry static value to consumer stat metric json.
Huanli-Meng pushed a commit to Huanli-Meng/pulsar that referenced this pull request Jun 1, 2020
…xpose avgMessagesPerEntry metric (apache#6719)

### Motivation
when consumer send fetch request to broker server, it contains fetch message number telling the server how many messges should be pushed to consumer client. However, the broker server stores data in bookkeeper or broker cache according to entry not single message if producer produce message using batch feature. There is a gap to map the number of message to the number of entry when dealing with consumer fetch request.

Current strategy just using the following calculating formula to deal with those situation:
```
messagesToRead = Math.min(availablePermits, readBatchSize);
```
`availablePermits` is the number of message the consumer requested, and `readBatchSize` is the max read entry size set in broker.conf

Assuming `availablePermits` is 1000 and `readBatchSize` is 500, and each entry contains 1000 messages, messagesToRead will be 500 according to this formula. The broker server will read 500 entry, that is `500 * 1000 = 500,000` messages from bookkeeper or broker cache and push `500,000` messages to consumer at one time though the consumer just need 1000 messages, which leading the consumer cost too much memory to store the fetched message, especially when we increase `readBatchSize` to increase bookkeeper read throughput.

### Changes
I add an variable `avgMessagesPerEntry` to record average messages stored in one entry. It will update when broker server push message to the consumer using the following calculating formula
```
avgMessagesPerEntry = avgMessagePerEntry * avgPercent + (1 - avgPercent) * new Value
```
`avgMessagePerEntry` is the history average message number per entry and `new Value` is the message number per entry in the fetch request the broker read from cache or bookkeeper. `avgPercent` is a final value 0.9, and the value just control the history avgMessagePerEntry decay rate when update new one. The avgMessagePerEntry initial value is 1000.

When dealing with consumer fetch request, it will map fetch requst number to entry number according to the following formula:
```
messagesToRead = Math.min((int) Math.ceil(availablePermits * 1.0 / avgMessagesPerEntry), readBatchSize);
```

I also expose the avgMessagePerEntry static value to consumer stat metric json.
Huanli-Meng pushed a commit to Huanli-Meng/pulsar that referenced this pull request Jun 12, 2020
…xpose avgMessagesPerEntry metric (apache#6719)

### Motivation
when consumer send fetch request to broker server, it contains fetch message number telling the server how many messges should be pushed to consumer client. However, the broker server stores data in bookkeeper or broker cache according to entry not single message if producer produce message using batch feature. There is a gap to map the number of message to the number of entry when dealing with consumer fetch request.

Current strategy just using the following calculating formula to deal with those situation:
```
messagesToRead = Math.min(availablePermits, readBatchSize);
```
`availablePermits` is the number of message the consumer requested, and `readBatchSize` is the max read entry size set in broker.conf

Assuming `availablePermits` is 1000 and `readBatchSize` is 500, and each entry contains 1000 messages, messagesToRead will be 500 according to this formula. The broker server will read 500 entry, that is `500 * 1000 = 500,000` messages from bookkeeper or broker cache and push `500,000` messages to consumer at one time though the consumer just need 1000 messages, which leading the consumer cost too much memory to store the fetched message, especially when we increase `readBatchSize` to increase bookkeeper read throughput.

### Changes
I add an variable `avgMessagesPerEntry` to record average messages stored in one entry. It will update when broker server push message to the consumer using the following calculating formula
```
avgMessagesPerEntry = avgMessagePerEntry * avgPercent + (1 - avgPercent) * new Value
```
`avgMessagePerEntry` is the history average message number per entry and `new Value` is the message number per entry in the fetch request the broker read from cache or bookkeeper. `avgPercent` is a final value 0.9, and the value just control the history avgMessagePerEntry decay rate when update new one. The avgMessagePerEntry initial value is 1000.

When dealing with consumer fetch request, it will map fetch requst number to entry number according to the following formula:
```
messagesToRead = Math.min((int) Math.ceil(availablePermits * 1.0 / avgMessagesPerEntry), readBatchSize);
```

I also expose the avgMessagePerEntry static value to consumer stat metric json.
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this pull request Aug 24, 2020
…xpose avgMessagesPerEntry metric (apache#6719)

### Motivation
when consumer send fetch request to broker server, it contains fetch message number telling the server how many messges should be pushed to consumer client. However, the broker server stores data in bookkeeper or broker cache according to entry not single message if producer produce message using batch feature. There is a gap to map the number of message to the number of entry when dealing with consumer fetch request.

Current strategy just using the following calculating formula to deal with those situation:
```
messagesToRead = Math.min(availablePermits, readBatchSize);
```
`availablePermits` is the number of message the consumer requested, and `readBatchSize` is the max read entry size set in broker.conf

Assuming `availablePermits` is 1000 and `readBatchSize` is 500, and each entry contains 1000 messages, messagesToRead will be 500 according to this formula. The broker server will read 500 entry, that is `500 * 1000 = 500,000` messages from bookkeeper or broker cache and push `500,000` messages to consumer at one time though the consumer just need 1000 messages, which leading the consumer cost too much memory to store the fetched message, especially when we increase `readBatchSize` to increase bookkeeper read throughput.

### Changes
I add an variable `avgMessagesPerEntry` to record average messages stored in one entry. It will update when broker server push message to the consumer using the following calculating formula
```
avgMessagesPerEntry = avgMessagePerEntry * avgPercent + (1 - avgPercent) * new Value
```
`avgMessagePerEntry` is the history average message number per entry and `new Value` is the message number per entry in the fetch request the broker read from cache or bookkeeper. `avgPercent` is a final value 0.9, and the value just control the history avgMessagePerEntry decay rate when update new one. The avgMessagePerEntry initial value is 1000.

When dealing with consumer fetch request, it will map fetch requst number to entry number according to the following formula:
```
messagesToRead = Math.min((int) Math.ceil(availablePermits * 1.0 / avgMessagesPerEntry), readBatchSize);
```

I also expose the avgMessagePerEntry static value to consumer stat metric json.
@lordcheng10
Copy link
Contributor

lordcheng10 commented Feb 10, 2022

Why should the initial value of avgMessagePerEntry be fixed at 1000, instead of using the actual number of messages of an entry obtained by the first push as the initial value.
If the actual number of messages contained in an entry is 1, then according to the current logic, it takes more than 20 iterations before avgMessagePerEntry may be close to the real value, and before again, the number of messages pushed each time will be 1, which will seriously affect the throughput. @hangc0276

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants