Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

set deadline before stats write/read #918

Merged
merged 8 commits into from
May 28, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion stats/config/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"flag"
"strings"
"time"

"github.com/grafana/metrictank/stats"
"github.com/raintank/worldping-api/pkg/log"
Expand All @@ -14,13 +15,15 @@ var prefix string
var addr string
var interval int
var bufferSize int
var timeout time.Duration

func ConfigSetup() {
inStats := flag.NewFlagSet("stats", flag.ExitOnError)
inStats.BoolVar(&enabled, "enabled", true, "enable sending graphite messages for instrumentation")
inStats.StringVar(&prefix, "prefix", "metrictank.stats.default.$instance", "stats prefix (will add trailing dot automatically if needed)")
inStats.StringVar(&addr, "addr", "localhost:2003", "graphite address")
inStats.IntVar(&interval, "interval", 1, "interval at which to send statistics")
inStats.DurationVar(&timeout, "timeout", time.Second*10, "timeout after which a write is considered not successful")
inStats.IntVar(&bufferSize, "buffer-size", 20000, "how many messages (holding all measurements from one interval. rule of thumb: a message is ~25kB) to buffer up in case graphite endpoint is unavailable. With the default of 20k you will use max about 500MB and bridge 5 hours of downtime when needed")
globalconf.Register("stats", inStats)
}
Expand All @@ -36,7 +39,7 @@ func ConfigProcess(instance string) {
func Start() {
if enabled {
stats.NewMemoryReporter()
stats.NewGraphite(prefix, addr, interval, bufferSize)
stats.NewGraphite(prefix, addr, interval, bufferSize, timeout)
} else {
stats.NewDevnull()
log.Warn("running metrictank without instrumentation.")
Expand Down
36 changes: 34 additions & 2 deletions stats/out_graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stats

import (
"bytes"
"io"
"net"
"time"

Expand All @@ -25,10 +26,11 @@ type Graphite struct {
prefix []byte
addr string

timeout time.Duration
toGraphite chan []byte
}

func NewGraphite(prefix, addr string, interval int, bufferSize int) {
func NewGraphite(prefix, addr string, interval, bufferSize int, timeout time.Duration) {
if len(prefix) != 0 && prefix[len(prefix)-1] != '.' {
prefix = prefix + "."
}
Expand All @@ -44,6 +46,7 @@ func NewGraphite(prefix, addr string, interval int, bufferSize int) {
prefix: []byte(prefix),
addr: addr,
toGraphite: make(chan []byte, bufferSize),
timeout: timeout,
}
go g.writer()
go g.reporter(interval)
Expand Down Expand Up @@ -80,7 +83,6 @@ func (g *Graphite) reporter(interval int) {
}

// writer connects to graphite and submits all pending data to it
// TODO: conn.Write() returns no error for a while when the remote endpoint is down, the reconnect happens with a delay. this can also cause lost data for a second or two.
func (g *Graphite) writer() {
var conn net.Conn
var err error
Expand All @@ -92,6 +94,7 @@ func (g *Graphite) writer() {
conn, err = net.Dial("tcp", g.addr)
if err == nil {
log.Info("stats now connected to %s", g.addr)
go g.checkEOF(conn)
} else {
log.Warn("stats dialing %s failed: %s. will retry", g.addr, err.Error())
}
Expand All @@ -105,6 +108,7 @@ func (g *Graphite) writer() {
var ok bool
for !ok {
conn = assureConn()
conn.SetDeadline(time.Now().Add(g.timeout))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove the TODO for this function? looks like we can

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to just be conn.SetWriteDeadline(). Calling SetDeadline() will cause the checkEOF() read to timeout

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, updated

pre := time.Now()
_, err = conn.Write(buf)
if err == nil {
Expand All @@ -118,3 +122,31 @@ func (g *Graphite) writer() {
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a race condition here. note how the write routine can set conn to nil, but checkEOF requires it to be non-nil.
particularly, the conn.Close() will activate the Read in checkEOF which will get an error, and try to call Close() on a pointer that can be nil.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you're right... that means i'll need to put a lock around conn

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that should do it: 113554f

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good but why do we need the changes to how the conn variable is being set?

Copy link
Member

@woodsaj woodsaj May 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having conn = assureConn() is redundent.

The assureConn func is working with the exact same conn that we are going to write to as they are all in the same scope.

}
}

// normally the remote end should never write anything back
// but we know when we get EOF that the other end closed the conn
// if not for this, we can happily write and flush without getting errors (in Go) but getting RST tcp packets back (!)
// props to Tv` for this trick.
func (g *Graphite) checkEOF(conn net.Conn) {
b := make([]byte, 1024)
for {
num, err := conn.Read(b)
if err == io.EOF {
log.Info("Graphite.checkEOF: remote closed conn. closing conn")
conn.Close()
return
}

// just in case i misunderstand something or the remote behaves badly
if num != 0 {
log.Warn("Graphite.checkEOF: read unexpected data from peer: %s\n", b[:num])
continue
}

if err != io.EOF {
log.Warn("Graphite.checkEOF: %s\n", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also log "closing conn" for clarity and symmetry with the other error case.

Copy link
Contributor Author

@replay replay May 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated again 108f75e

conn.Close()
return
}
}
}