Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add throttle to prevent cpu load #812

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

alkaaf
Copy link

@alkaaf alkaaf commented Aug 8, 2023

i hope this is usable. i need this when resume from certain bin log position

Copy link
Collaborator

@lance6716 lance6716 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And can you explain your problem first? I guess you are using this library and other processes in the same machine, and this library comsumes too much CPU. But a more common way is to use cgroup or something to limit the resource of the process using this library.

// throttle the event, prevent cpu flood
if b.cfg.ThrottleCap > 0 && len(s.ch) >= b.cfg.ThrottleCap {
b.cfg.Logger.Infof(`throttling binlog read`)
time.Sleep(b.cfg.ThrottleDuration)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should use

select {
case <-time.After...
case <-b.ctx.Done...
}

to find context cancel timely

@alkaaf
Copy link
Author

alkaaf commented Aug 8, 2023

I have experience using maxwell for etl from mysql to mongodb where reading result from maxwell i store in redis queue. there are some workers consume queue to store it in mongodb

This approach has a lot of overhead from the network side and also expensive (redis, worker, maxwell that using java). therefore I need a cheaper, concise, reliable and consistent approach where delay is not an issue.

the ability to continue from the last log position was something I was looking for from this library. the problem is that when this app fails/stops some time and when restarting the app, the channel gets flooded with events starting from a certain position. it causes event processing to slow down to even stall (yeah you are right that i do event processing too on the same machine even application). this is why limiting resources is not an option.

therefore I need something to delay the binlog read by a few milliseconds when the channel reaches a certain size. (in my case, when the channel size is 512 ,the read will be delayed by 1 second)

@lance6716
Copy link
Collaborator

lance6716 commented Aug 8, 2023

I'm still not very clear about your usage, but I'd ask do you think it's a good idea to add an option to control the channel size? So when channel is full, sending to channel will be blocked and no further CPU / network resources will be consumed.

func NewBinlogStreamer() *BinlogStreamer {
s := new(BinlogStreamer)
s.ch = make(chan *BinlogEvent, 10240)

Maybe you can set the channel size to 512, and the delay will not be as bad as 1 second.

@lance6716
Copy link
Collaborator

Hi, can you check #830 is helpful to your problem?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants