Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EventCacheCount as member of BinlogSyncerConfig to limit streamer's event channel size #830

Merged
merged 5 commits into from
Oct 12, 2023

Conversation

zing22845
Copy link
Contributor

#829
I have tested on my own MySQL instance with an 8GB size Binlog, when the channel size is the default 10240, during the Binlog transmission, occasional nearly 2GB memory usage will be seen.
Additionally, I have written a test program to calculate the theoretical maximum memory usage size under different channel size situations. The theoretical result match the test.

This is the theoretical usage calc function:
`
func (c *MySQLClient) GetMaxWindowSize(ctx context.Context, streamerLength uint16) (es *EventStatistic, err error) {
es = new(EventStatistic)
es.StreamerChannelLength = streamerLength
startTime := time.Now()

// get streamer
_, streamer, err := c.GetBinlogStreamer(true, streamerLength)
if err != nil {
	return nil, err
}
// defer syncer.Close()
defer func() {
	es.Duration = time.Since(startTime)
}()

// move window
var window []uint32
var currentWindowSize uint32
var ev *replication.BinlogEvent

var filename string
var offset uint32

var f *os.File
defer func() {
	if f != nil {
		f.Close()
	}
}()

// get event time
parser := replication.NewBinlogParser()
parser.SetRawMode(true)
for {
	eventCTX, cancel := context.WithTimeout(ctx, 5*time.Second)
	ev, err = streamer.GetEvent(eventCTX)
	cancel()
	if err == context.DeadlineExceeded {
		return es, nil
	}
	if ev == nil {
		return es, nil
	}
	es.Count++
	// calc the event size
	eventSize := ev.Header.EventSize
	es.TotalSize += uint64(eventSize)

	// add event to the window
	window = append(window, eventSize)
	currentWindowSize += eventSize

	// move event out of window
	if uint16(len(window)) > streamerLength {
		currentWindowSize -= window[0]
		window = window[1:]
	}

	if currentWindowSize > es.MaxStreamerChannelSize {
		es.MaxStreamerChannelSize = currentWindowSize
	}

	offset = ev.Header.LogPos

	if ev.Header.EventType == replication.ROTATE_EVENT {
		rotateEvent := ev.Event.(*replication.RotateEvent)
		filename = string(rotateEvent.NextLogName)

		if ev.Header.Timestamp == 0 || offset == 0 {
			// fake rotate event
			continue
		}
	} else if ev.Header.EventType == replication.FORMAT_DESCRIPTION_EVENT {
		// FormateDescriptionEvent is the first event in binlog, we will close old one and create a new

		if f != nil {
			f.Close()
		}

		if len(filename) == 0 {
			return nil, errors.Errorf("empty binlog filename for FormateDescriptionEvent")
		}

		f, err = os.OpenFile(path.Join("./", filename), os.O_CREATE|os.O_WRONLY, 0644)
		if err != nil {
			return nil, errors.Trace(err)
		}

		// write binlog header fe'bin'
		if _, err = f.Write(replication.BinLogFileHeader); err != nil {
			return nil, errors.Trace(err)
		}
	}

	if n, err := f.Write(ev.RawData); err != nil {
		return nil, errors.Trace(err)
	} else if n != len(ev.RawData) {
		return nil, errors.Trace(io.ErrShortWrite)
	}
	if es.Count > 1 && ev.Header.EventType == replication.ROTATE_EVENT {
		return es, nil
	}
}

}`

When consuming events from stream is blocked, the chan may use too much memory
When the consumption speed of the event chan in streamer cannot catch up with its production speed, leading to an accumulation of events, the current fixed channel size of 10240 might occupy significant memory, potentially triggering an Out Of Memory (OOM) condition. Making the size of the event chan configurable would allow for controlling the memory usage of the streamer.
Copy link
Collaborator

@lance6716 lance6716 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rest LGTM.

This PR is good enough. And in future if you find it's hard to set a fixed value of cache count due to binlog events vary in event size, we can introduce another memory-based limit. For example, in GetEvent and parseEvent we maintain the sum of approximate memory consumption in cache queue, and block the streamer from reading the binlog when it costs too much memory.

@@ -122,6 +122,8 @@ type BinlogSyncerConfig struct {
RowsEventDecodeFunc func(*RowsEvent, []byte) error

DiscardGTIDSet bool

EventCacheSize int
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name "XXXSize" makes me think it's using the unit of memory, (EventCacheSize = 1024 means the cache will not exceed 1KB), maybe "EventCacheCount" is a better name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed! I'll work on the fixes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Directly limiting memory usage requires dynamically calculating the memory occupancy of the Event Cache during the streaming process, which might lead to a decrease in overall efficiency.
In scenarios where the network conditions are not too poor, even setting the EventCacheCount to a single digit won't have much worse performance compared to 10240, while at the same time, the memory of the Event Cache will be limited to the size of a few Events, which is enough to achieve a balance between efficiency and resource occupancy.

@lance6716 lance6716 changed the title Add EventCacheSize as member of BinlogSyncerConfig to limit streamer's event channel size Add EventCacheCount as member of BinlogSyncerConfig to limit streamer's event channel size Oct 12, 2023
@lance6716 lance6716 merged commit d5dd3d6 into go-mysql-org:master Oct 12, 2023
13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants