Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure flush happens and correctly lock connection for a series of unflushed writes #161

Merged
merged 4 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,18 @@ func (ch *Channel) sendOpen(msg message) (err error) {
return ch.sendClosed(msg)
}

ch.connection.startSendUnflushed()

// Flush the buffer only after all the Frames that comprise the Message
// have been written to maximise benefits of using a buffered writer.
defer func() {
if endError := ch.connection.endSendUnflushed(); endError != nil {
if err == nil {
err = endError
}
}
}()

// We use sendUnflushed() in this method as sending the message requires
// sending multiple Frames (methodFrame, headerFrame, N x bodyFrame).
// Flushing after each Frame is inefficient, as it negates much of the
Expand Down Expand Up @@ -275,10 +287,6 @@ func (ch *Channel) sendOpen(msg message) (err error) {
return
}
}

// Flush the buffer only after all the Frames that comprise the Message
// have been written to maximise benefits of using a buffered writer.
err = ch.connection.flush()
} else {
// If the channel is closed, use Channel.sendClosed()
if ch.IsClosed() {
Expand Down
17 changes: 14 additions & 3 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,19 @@ func (c *Connection) send(f frame) error {
return err
}

// This method is intended to be used with sendUnflushed() to start a sequence
// of sendUnflushed() calls
func (c *Connection) startSendUnflushed() {
c.sendM.Lock()
}

// This method is intended to be used with sendUnflushed() to end a sequence
// of sendUnflushed() calls and flush the connection
func (c *Connection) endSendUnflushed() error {
defer c.sendM.Unlock()
return c.flush()
}

// sendUnflushed performs an *Unflushed* write. It is otherwise equivalent to
// send(), and we provide a separate flush() function to explicitly flush the
// buffer after all Frames are written.
Expand All @@ -468,9 +481,7 @@ func (c *Connection) sendUnflushed(f frame) error {
return ErrClosed
}

c.sendM.Lock()
err := f.write(c.writer.w)
c.sendM.Unlock()
err := c.writer.WriteFrameNoFlush(f)
lukebakken marked this conversation as resolved.
Show resolved Hide resolved

if err != nil {
// shutdown could be re-entrant from signaling notify chans
Expand Down
5 changes: 5 additions & 0 deletions write.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ import (
"time"
)

func (w *writer) WriteFrameNoFlush(frame frame) (err error) {
err = frame.write(w.w)
return
}

func (w *writer) WriteFrame(frame frame) (err error) {
if err = frame.write(w.w); err != nil {
return
Expand Down