diff --git a/CHANGELOG.md b/CHANGELOG.md index d3d758058..2b6738da1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,5 +19,6 @@ * [ENHANCEMENT] Add spanlogger package. #42 * [ENHANCEMENT] Add runutil.CloseWithLogOnErr function. #58 * [ENHANCEMENT] Optimise memberlist receive path when used as a backing store for rings with a large number of members. #76 #77 +* [ENHANCEMENT] Memberlist: prepare the data to send on the write before starting counting the elapsed time for `-memberlist.packet-write-timeout`, in order to reduce chances we hit the timeout when sending a packet to other node. #89 * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 diff --git a/kv/memberlist/tcp_transport.go b/kv/memberlist/tcp_transport.go index afbd1b201..c7e9b8aa9 100644 --- a/kv/memberlist/tcp_transport.go +++ b/kv/memberlist/tcp_transport.go @@ -439,16 +439,15 @@ func (t *TCPTransport) writeTo(b []byte, addr string) error { } }() - if t.cfg.PacketWriteTimeout > 0 { - deadline := time.Now().Add(t.cfg.PacketWriteTimeout) - err := c.SetDeadline(deadline) - if err != nil { - return fmt.Errorf("setting deadline: %v", err) - } - } + // Compute the digest *before* setting the deadline on the connection (so that the time + // it takes to compute the digest is not taken in account). + // We use md5 as quick and relatively short hash, not in cryptographic context. + // It's also used to detect if the whole packet has been received on the receiver side. + digest := md5.Sum(b) - buf := bytes.Buffer{} - buf.WriteByte(byte(packet)) + // Prepare the header *before* setting the deadline on the connection. + headerBuf := bytes.Buffer{} + headerBuf.WriteByte(byte(packet)) // We need to send our address to the other side, otherwise other side can only see IP and port from TCP header. // But that doesn't match our node address (new TCP connection has new random port), which confuses memberlist. @@ -459,10 +458,18 @@ func (t *TCPTransport) writeTo(b []byte, addr string) error { return fmt.Errorf("local address too long") } - buf.WriteByte(byte(len(ourAddr))) - buf.WriteString(ourAddr) + headerBuf.WriteByte(byte(len(ourAddr))) + headerBuf.WriteString(ourAddr) - _, err = c.Write(buf.Bytes()) + if t.cfg.PacketWriteTimeout > 0 { + deadline := time.Now().Add(t.cfg.PacketWriteTimeout) + err := c.SetDeadline(deadline) + if err != nil { + return fmt.Errorf("setting deadline: %v", err) + } + } + + _, err = c.Write(headerBuf.Bytes()) if err != nil { return fmt.Errorf("sending local address: %v", err) } @@ -475,9 +482,7 @@ func (t *TCPTransport) writeTo(b []byte, addr string) error { return fmt.Errorf("sending data: short write") } - // Append digest. We use md5 as quick and relatively short hash, not in cryptographic context. - // This helped to find some bugs, so let's keep it. - digest := md5.Sum(b) + // Append digest. n, err = c.Write(digest[:]) if err != nil { return fmt.Errorf("digest: %v", err)