Skip to content

Commit

Permalink
Add EventCacheCount as member of BinlogSyncerConfig to limit streamer…
Browse files Browse the repository at this point in the history
…'s event channel size (#830)

* lower memory usage by reducing event chan size in streamer

When consuming events from stream is blocked, the chan may use too much memory

* make streamer's event chan size configurable

When the consumption speed of the event chan in streamer cannot catch up with its production speed, leading to an accumulation of events, the current fixed channel size of 10240 might occupy significant memory, potentially triggering an Out Of Memory (OOM) condition. Making the size of the event chan configurable would allow for controlling the memory usage of the streamer.

* update cfg name StreamerChanSize to EventCacheSize

* EventCacheSize is renamed to EventCacheCount, to make the variable name more reflective of its actual meaning.
  • Loading branch information
zing22845 authored Oct 12, 2023
1 parent f0f8617 commit d5dd3d6
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
10 changes: 9 additions & 1 deletion replication/binlogstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,17 @@ func (s *BinlogStreamer) closeWithError(err error) {
}

func NewBinlogStreamer() *BinlogStreamer {
return NewBinlogStreamerWithChanSize(10240)
}

func NewBinlogStreamerWithChanSize(chanSize int) *BinlogStreamer {
s := new(BinlogStreamer)

s.ch = make(chan *BinlogEvent, 10240)
if chanSize <= 0 {
chanSize = 10240
}

s.ch = make(chan *BinlogEvent, chanSize)
s.ech = make(chan error, 4)

return s
Expand Down
7 changes: 6 additions & 1 deletion replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ type BinlogSyncerConfig struct {
RowsEventDecodeFunc func(*RowsEvent, []byte) error

DiscardGTIDSet bool

EventCacheCount int
}

// BinlogSyncer syncs binlog event from server.
Expand Down Expand Up @@ -166,6 +168,9 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer {
dialer := &net.Dialer{}
cfg.Dialer = dialer.DialContext
}
if cfg.EventCacheCount == 0 {
cfg.EventCacheCount = 10240
}

// Clear the Password to avoid outputing it in log.
pass := cfg.Password
Expand Down Expand Up @@ -393,7 +398,7 @@ func (b *BinlogSyncer) prepare() error {
func (b *BinlogSyncer) startDumpStream() *BinlogStreamer {
b.running = true

s := NewBinlogStreamer()
s := NewBinlogStreamerWithChanSize(b.cfg.EventCacheCount)

b.wg.Add(1)
go b.onStream(s)
Expand Down

0 comments on commit d5dd3d6

Please sign in to comment.