Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 57 additions & 44 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,18 +302,21 @@ type pinfo struct {

// outbound holds pending data for a socket.
type outbound struct {
nb net.Buffers // Pending buffers for send, each has fixed capacity as per nbPool below.
wnb net.Buffers // Working copy of "nb", reused on each flushOutbound call, partial writes may leave entries here for next iteration.
pb int64 // Total pending/queued bytes.
fsp int32 // Flush signals that are pending per producer from readLoop's pcd.
sg *sync.Cond // To signal writeLoop that there is data to flush.
wdl time.Duration // Snapshot of write deadline.
mp int64 // Snapshot of max pending for client.
lft time.Duration // Last flush time for Write.
stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in.
cw *s2.Writer
nb [][]byte // Pending buffers for send, each has fixed capacity as per nbPool below.
wnb [nbMaxVectorSize][]byte // Working copy of "nb", reused on each flushOutbound call, partial writes may leave entries here for next iteration.
wnbn int // How many elements are used in wnb?
pb int64 // Total pending/queued bytes.
fsp int32 // Flush signals that are pending per producer from readLoop's pcd.
sg *sync.Cond // To signal writeLoop that there is data to flush.
wdl time.Duration // Snapshot of write deadline.
mp int64 // Snapshot of max pending for client.
lft time.Duration // Last flush time for Write.
stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in.
cw *s2.Writer
}

const nbMaxVectorSize = 1024 // == IOV_MAX on Linux/Darwin and most other Unices (except Solaris/AIX)

const nbPoolSizeSmall = 512 // Underlying array size of small buffer
const nbPoolSizeMedium = 4096 // Underlying array size of medium buffer
const nbPoolSizeLarge = 65536 // Underlying array size of large buffer
Expand Down Expand Up @@ -1536,8 +1539,9 @@ func (c *client) flushOutbound() bool {
if c.isClosed() {
for i := range c.out.wnb {
nbPoolPut(c.out.wnb[i])
c.out.wnb[i] = nil
}
c.out.wnb = nil
c.out.wnbn = 0
}
c.flags.clear(flushOutbound)
}()
Expand All @@ -1563,7 +1567,11 @@ func (c *client) flushOutbound() bool {
// will just frame up "nb" and append it onto whatever is left on "wnb".
// "nb" will be set to nil so that we can manipulate "collapsed" outside
// of the client's lock, which is interesting in case of compression.
c.out.nb = nil
if consumed := copy(c.out.wnb[c.out.wnbn:], collapsed); consumed > 0 {
c.out.nb = c.out.nb[:copy(c.out.nb[0:], c.out.nb[consumed:])]
c.out.wnbn += consumed
}
vectors := c.out.wnbn

// In case it goes away after releasing the lock.
nc := c.nc
Expand All @@ -1590,7 +1598,7 @@ func (c *client) flushOutbound() bool {
bb := bytes.Buffer{}

cw.Reset(&bb)
for _, buf := range collapsed {
for _, buf := range c.out.wnb[:c.out.wnbn] {
if _, err = cw.Write(buf); err != nil {
break
}
Expand All @@ -1605,31 +1613,38 @@ func (c *client) flushOutbound() bool {
c.markConnAsClosed(WriteError)
return false
}
collapsed = append(net.Buffers(nil), bb.Bytes())
attempted = int64(len(collapsed[0]))
var i int
for i = 0; bb.Len() > 0; i++ {
if i < nbMaxVectorSize {
c.out.wnb[i] = nbPoolGet(bb.Len())
c.out.wnb[i] = c.out.wnb[i][:cap(c.out.wnb[i])]
} else {
// We've optimistically tried to use reusable buffers up to
// this point but we're now at the vector limit and the size
// after compression is still bigger than the wnb, so allocate
// a new single buffer to fit the rest in the last vector.
c.out.wnb[i] = make([]byte, bb.Len())
}
n, err := bb.Read(c.out.wnb[i])
if err != nil {
c.Errorf("Error queuing compressed data: %v", err)
// We need to grab the lock now before marking as closed and exiting
c.mu.Lock()
c.markConnAsClosed(WriteError)
return false
}
c.out.wnb[i] = c.out.wnb[i][:n]
}
c.out.wnbn = i
}

// This is safe to do outside of the lock since "collapsed" is no longer
// referenced in c.out.nb (which can be modified in queueOutboud() while
// the lock is released).
c.out.wnb = append(c.out.wnb, collapsed...)
var _orig [1024][]byte
orig := append(_orig[:0], c.out.wnb...)

// Since WriteTo is lopping things off the beginning, we need to remember
// the start position of the underlying array so that we can get back to it.
// Otherwise we'll always "slide forward" and that will result in reallocs.
startOfWnb := c.out.wnb[0:]

// flush here
start := time.Now()

// FIXME(dlc) - writev will do multiple IOs past 1024 on
// most platforms, need to account for that with deadline?
nc.SetWriteDeadline(start.Add(wdl))

// Actual write to the socket.
n, err := c.out.wnb.WriteTo(nc)
nc.SetWriteDeadline(start.Add(wdl))
var wnb net.Buffers = c.out.wnb[:c.out.wnbn]
n, err := wnb.WriteTo(nc)
nc.SetWriteDeadline(time.Time{})

lft := time.Since(start)
Expand All @@ -1649,8 +1664,9 @@ func (c *client) flushOutbound() bool {
// buffers have been lopped off the beginning, so in order to return
// them to the pool, we need to look at the difference between "orig"
// and "wnb".
for i := 0; i < len(orig)-len(c.out.wnb); i++ {
nbPoolPut(orig[i])
for i := 0; i < c.out.wnbn-len(wnb); i++ {
nbPoolPut(c.out.wnb[i])
c.out.wnb[i] = nil
}

// At this point it's possible that "nb" has been modified by another
Expand All @@ -1660,13 +1676,9 @@ func (c *client) flushOutbound() bool {
// remains up to the beginning of the array to prevent reallocating.
// Anything left in "wnb" has already been framed for WebSocket conns
// so leave them alone for the next call to flushOutbound.
c.out.wnb = append(startOfWnb[:0], c.out.wnb...)

// If we've written everything but the underlying array of our working
// buffer has grown excessively then free it — the GC will tidy it up
// and we can allocate a new one next time.
if len(c.out.wnb) == 0 && cap(c.out.wnb) > nbPoolSizeLarge*8 {
c.out.wnb = nil
c.out.wnbn = copy(c.out.wnb[0:], wnb)
for i := c.out.wnbn; i < len(c.out.wnb); i++ {
c.out.wnb[i] = nil
}

// Ignore ErrShortWrite errors, they will be handled as partials.
Expand All @@ -1675,7 +1687,7 @@ func (c *client) flushOutbound() bool {
// Handle timeout error (slow consumer) differently
if ne, ok := err.(net.Error); ok && ne.Timeout() {
gotWriteTimeout = true
if closed := c.handleWriteTimeout(n, attempted, len(orig)); closed {
if closed := c.handleWriteTimeout(n, attempted, vectors); closed {
return true
}
} else {
Expand Down Expand Up @@ -1714,7 +1726,7 @@ func (c *client) flushOutbound() bool {
}
// Check if the connection is recovering from being a slow consumer.
if !gotWriteTimeout && c.flags.isSet(isSlowConsumer) {
c.Noticef("Slow Consumer Recovered: Flush took %.3fs with %d chunks of %d total bytes.", time.Since(start).Seconds(), len(orig), attempted)
c.Noticef("Slow Consumer Recovered: Flush took %.3fs with %d chunks of %d total bytes.", time.Since(start).Seconds(), vectors, attempted)
c.flags.clear(isSlowConsumer)
}

Expand Down Expand Up @@ -5251,8 +5263,9 @@ func (c *client) flushAndClose(minimalFlush bool) {
if !c.flags.isSet(flushOutbound) {
for i := range c.out.wnb {
nbPoolPut(c.out.wnb[i])
c.out.wnb[i] = nil
}
c.out.wnb = nil
c.out.wnbn = 0
}
// This seem to be important (from experimentation) for the GC to release
// the connection.
Expand Down