Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix inefficient use of buffers that reduces the potential throughput of basicPublish #142

Merged
merged 2 commits into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,23 @@ func (ch *Channel) sendOpen(msg message) (err error) {
return ch.sendClosed(msg)
}

if err = ch.connection.send(&methodFrame{
// We use sendUnflushed() in this method as sending the message requires
// sending multiple Frames (methodFrame, headerFrame, N x bodyFrame).
// Flushing after each Frame is inefficient, as it negates much of the
// benefit of using a buffered writer and results in more syscalls than
// necessary. Flushing buffers after every frame can have a significant
// performance impact when sending (e.g. basicPublish) small messages,
// so sendUnflushed() performs an *Unflushed* write, but is otherwise
// equivalent to the send() method. We later use the separate flush
// method to explicitly flush the buffer after all Frames are written.
if err = ch.connection.sendUnflushed(&methodFrame{
ChannelId: ch.id,
Method: content,
}); err != nil {
return
}

if err = ch.connection.send(&headerFrame{
if err = ch.connection.sendUnflushed(&headerFrame{
ChannelId: ch.id,
ClassId: class,
Size: uint64(len(body)),
Expand All @@ -259,13 +268,17 @@ func (ch *Channel) sendOpen(msg message) (err error) {
j = len(body)
}

if err = ch.connection.send(&bodyFrame{
if err = ch.connection.sendUnflushed(&bodyFrame{
ChannelId: ch.id,
Body: body[i:j],
}); err != nil {
return
}
}

// Flush the buffer only after all the Frames that comprise the Message
// have been written to maximise benefits of using a buffered writer.
err = ch.connection.flush()
} else {
// If the channel is closed, use Channel.sendClosed()
if ch.IsClosed() {
Expand Down
60 changes: 60 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,66 @@ func (c *Connection) send(f frame) error {
return err
}

// sendUnflushed performs an *Unflushed* write. It is otherwise equivalent to
// send(), and we provide a separate flush() function to explicitly flush the
// buffer after all Frames are written.
//
// Why is this a thing?
//
// send() method uses writer.WriteFrame(), which will write the Frame then
// flush the buffer. For cases like the sendOpen() method on Channel, which
// sends multiple Frames (methodFrame, headerFrame, N x bodyFrame), flushing
// after each Frame is inefficient as it negates much of the benefit of using a
// buffered writer, and results in more syscalls than necessary. Flushing buffers
// after every frame can have a significant performance impact when sending
// (basicPublish) small messages, so this method performs an *Unflushed* write
// but is otherwise equivalent to send() method, and we provide a separate
// flush method to explicitly flush the buffer after all Frames are written.
func (c *Connection) sendUnflushed(f frame) error {
if c.IsClosed() {
return ErrClosed
}

c.sendM.Lock()
err := f.write(c.writer.w)
c.sendM.Unlock()

if err != nil {
// shutdown could be re-entrant from signaling notify chans
go c.shutdown(&Error{
Code: FrameError,
Reason: err.Error(),
})
}

return err
}

// This method is intended to be used with sendUnflushed() to explicitly flush
// the buffer after all required Frames have been written to the buffer.
func (c *Connection) flush() (err error) {
if buf, ok := c.writer.w.(*bufio.Writer); ok {
err = buf.Flush()

// Moving send notifier to flush increases basicPublish for the small message
// case. As sendUnflushed + flush is used for the case of sending semantically
// related Frames (e.g. a Message like basicPublish) there is no real advantage
// to sending per Frame vice per "group of related Frames" and for the case of
// small messages time.Now() is (relatively) expensive.
if err == nil {
// Broadcast we sent a frame, reducing heartbeats, only
// if there is something that can receive - like a non-reentrant
// call or if the heartbeater isn't running
select {
case c.sends <- time.Now():
default:
}
}
}

return
}

func (c *Connection) shutdown(err *Error) {
atomic.StoreInt32(&c.closed, 1)

Expand Down