From d5dd3d6953fdace6b93b712c7d72eb7d58c35826 Mon Sep 17 00:00:00 2001 From: zing22845 <60884519+zing22845@users.noreply.github.com> Date: Thu, 12 Oct 2023 11:12:15 +0800 Subject: [PATCH] Add EventCacheCount as member of BinlogSyncerConfig to limit streamer's event channel size (#830) * lower memory usage by reducing event chan size in streamer When consuming events from stream is blocked, the chan may use too much memory * make streamer's event chan size configurable When the consumption speed of the event chan in streamer cannot catch up with its production speed, leading to an accumulation of events, the current fixed channel size of 10240 might occupy significant memory, potentially triggering an Out Of Memory (OOM) condition. Making the size of the event chan configurable would allow for controlling the memory usage of the streamer. * update cfg name StreamerChanSize to EventCacheSize * EventCacheSize is renamed to EventCacheCount, to make the variable name more reflective of its actual meaning. --- replication/binlogstreamer.go | 10 +++++++++- replication/binlogsyncer.go | 7 ++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/replication/binlogstreamer.go b/replication/binlogstreamer.go index 61254fba0..72bc7ddd0 100644 --- a/replication/binlogstreamer.go +++ b/replication/binlogstreamer.go @@ -85,9 +85,17 @@ func (s *BinlogStreamer) closeWithError(err error) { } func NewBinlogStreamer() *BinlogStreamer { + return NewBinlogStreamerWithChanSize(10240) +} + +func NewBinlogStreamerWithChanSize(chanSize int) *BinlogStreamer { s := new(BinlogStreamer) - s.ch = make(chan *BinlogEvent, 10240) + if chanSize <= 0 { + chanSize = 10240 + } + + s.ch = make(chan *BinlogEvent, chanSize) s.ech = make(chan error, 4) return s diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 9c70740fc..a5ca96359 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -122,6 +122,8 @@ type BinlogSyncerConfig struct { RowsEventDecodeFunc func(*RowsEvent, []byte) error DiscardGTIDSet bool + + EventCacheCount int } // BinlogSyncer syncs binlog event from server. @@ -166,6 +168,9 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer { dialer := &net.Dialer{} cfg.Dialer = dialer.DialContext } + if cfg.EventCacheCount == 0 { + cfg.EventCacheCount = 10240 + } // Clear the Password to avoid outputing it in log. pass := cfg.Password @@ -393,7 +398,7 @@ func (b *BinlogSyncer) prepare() error { func (b *BinlogSyncer) startDumpStream() *BinlogStreamer { b.running = true - s := NewBinlogStreamer() + s := NewBinlogStreamerWithChanSize(b.cfg.EventCacheCount) b.wg.Add(1) go b.onStream(s)