Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,15 @@ var maxSubLimitReportThreshold = defaultMaxSubLimitReportThreshold
// Account are subject namespace definitions. By default no messages are shared between accounts.
// You can share via Exports and Imports of Streams and Services.
type Account struct {
stats
// Total stats for the account.
stats struct {
sync.Mutex
stats // Totals
gw stats // Gateways
rt stats // Routes
ln stats // Leafnodes
}

gwReplyMapping
Name string
Nkey string
Expand Down
67 changes: 57 additions & 10 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1446,14 +1446,25 @@ func (c *client) readLoop(pre []byte) {
// Updates stats for client and server that were collected
// from parsing through the buffer.
if c.in.msgs > 0 {
atomic.AddInt64(&c.inMsgs, int64(c.in.msgs))
atomic.AddInt64(&c.inBytes, int64(c.in.bytes))
inMsgs := int64(c.in.msgs)
inBytes := int64(c.in.bytes)

atomic.AddInt64(&c.inMsgs, inMsgs)
atomic.AddInt64(&c.inBytes, inBytes)

if acc != nil {
atomic.AddInt64(&acc.inMsgs, int64(c.in.msgs))
atomic.AddInt64(&acc.inBytes, int64(c.in.bytes))
acc.stats.Lock()
acc.stats.inMsgs += inMsgs
acc.stats.inBytes += inBytes
if c.kind == LEAF {
acc.stats.ln.inMsgs += int64(inMsgs)
acc.stats.ln.inBytes += int64(inBytes)
}
acc.stats.Unlock()
}
atomic.AddInt64(&s.inMsgs, int64(c.in.msgs))
atomic.AddInt64(&s.inBytes, int64(c.in.bytes))

atomic.AddInt64(&s.inMsgs, inMsgs)
atomic.AddInt64(&s.inBytes, inBytes)
}

// Signal to writeLoop to flush to socket.
Expand Down Expand Up @@ -1806,7 +1817,9 @@ func (c *client) handleWriteTimeout(written, attempted int64, numChunks int) boo
c.srv.scStats.leafs.Add(1)
}
if c.acc != nil {
atomic.AddInt64(&c.acc.slowConsumers, 1)
c.acc.stats.Lock()
c.acc.stats.slowConsumers++
c.acc.stats.Unlock()
}
c.Noticef("Slow Consumer %s: WriteDeadline of %v exceeded with %d chunks of %d total bytes.",
scState, c.out.wdl, numChunks, attempted)
Expand Down Expand Up @@ -2356,7 +2369,9 @@ func (c *client) queueOutbound(data []byte) {
atomic.AddInt64(&c.srv.slowConsumers, 1)
c.srv.scStats.clients.Add(1)
if c.acc != nil {
atomic.AddInt64(&c.acc.slowConsumers, 1)
c.acc.stats.Lock()
c.acc.stats.slowConsumers++
c.acc.stats.Unlock()
}
c.Noticef("Slow Consumer Detected: MaxPending of %d Exceeded", c.out.mp)
c.markConnAsClosed(SlowConsumerPendingBytes)
Expand Down Expand Up @@ -4728,6 +4743,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
// by having an extra size
var dlvMsgs int64
var dlvExtraSize int64
var dlvRouteMsgs int64
var dlvLeafMsgs int64

// We need to know if this is a MQTT producer because they send messages
// without CR_LF (we otherwise remove the size of CR_LF from message size).
Expand All @@ -4737,15 +4754,33 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
if dlvMsgs == 0 {
return
}

totalBytes := dlvMsgs*int64(len(msg)) + dlvExtraSize
routeBytes := dlvRouteMsgs*int64(len(msg)) + dlvExtraSize
leafBytes := dlvLeafMsgs*int64(len(msg)) + dlvExtraSize

// For non MQTT producers, remove the CR_LF * number of messages
if !prodIsMQTT {
totalBytes -= dlvMsgs * int64(LEN_CR_LF)
routeBytes -= dlvRouteMsgs * int64(LEN_CR_LF)
leafBytes -= dlvLeafMsgs * int64(LEN_CR_LF)
}

if acc != nil {
atomic.AddInt64(&acc.outMsgs, dlvMsgs)
atomic.AddInt64(&acc.outBytes, totalBytes)
acc.stats.Lock()
acc.stats.outMsgs += dlvMsgs
acc.stats.outBytes += totalBytes
if dlvRouteMsgs > 0 {
acc.stats.rt.outMsgs += dlvRouteMsgs
acc.stats.rt.outBytes += routeBytes
}
if dlvLeafMsgs > 0 {
acc.stats.ln.outMsgs += dlvLeafMsgs
acc.stats.ln.outBytes += leafBytes
}
acc.stats.Unlock()
}

if srv := c.srv; srv != nil {
atomic.AddInt64(&srv.outMsgs, dlvMsgs)
atomic.AddInt64(&srv.outBytes, totalBytes)
Expand Down Expand Up @@ -5066,6 +5101,12 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
// Update only if not skipped.
if !skipDelivery && sub.icb == nil {
dlvMsgs++
switch sub.client.kind {
case ROUTER:
dlvRouteMsgs++
case LEAF:
dlvLeafMsgs++
}
}
// Do the rest even when message delivery was skipped.
didDeliver = true
Expand Down Expand Up @@ -5156,6 +5197,12 @@ sendToRoutesOrLeafs:
if c.deliverMsg(prodIsMQTT, rt.sub, acc, subject, reply, mh, dmsg, false) {
if rt.sub.icb == nil {
dlvMsgs++
switch dc.kind {
case ROUTER:
dlvRouteMsgs++
case LEAF:
dlvLeafMsgs++
}
dlvExtraSize += int64(len(dmsg) - len(msg))
}
didDeliver = true
Expand Down
128 changes: 94 additions & 34 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,12 +400,19 @@ type GatewayStat struct {
NumInbound int `json:"inbound_connections"`
}

// DataStats reports how may msg and bytes. Applicable for both sent and received.
type DataStats struct {
type dataStats struct {
Msgs int64 `json:"msgs"`
Bytes int64 `json:"bytes"`
}

// DataStats reports how may msg and bytes. Applicable for both sent and received.
type DataStats struct {
dataStats
Gateways dataStats `json:"gateways,omitempty"`
Routes dataStats `json:"routes,omitempty"`
Leafs dataStats `json:"leafs,omitempty"`
}

// Used for internally queueing up messages that the server wants to send.
type pubMsg struct {
c *client
Expand Down Expand Up @@ -842,12 +849,16 @@ func routeStat(r *client) *RouteStat {
rs := &RouteStat{
ID: r.cid,
Sent: DataStats{
Msgs: r.outMsgs,
Bytes: r.outBytes,
dataStats: dataStats{
Msgs: r.outMsgs,
Bytes: r.outBytes,
},
},
Received: DataStats{
Msgs: atomic.LoadInt64(&r.inMsgs),
Bytes: atomic.LoadInt64(&r.inBytes),
dataStats: dataStats{
Msgs: atomic.LoadInt64(&r.inMsgs),
Bytes: atomic.LoadInt64(&r.inBytes),
},
},
Pending: int(r.out.pb),
}
Expand Down Expand Up @@ -946,8 +957,10 @@ func (s *Server) sendStatsz(subj string) {
// Note that *client.out[Msgs|Bytes] are not set using atomic,
// unlike the in[Msgs|bytes].
gs.Sent = DataStats{
Msgs: c.outMsgs,
Bytes: c.outBytes,
dataStats: dataStats{
Msgs: c.outMsgs,
Bytes: c.outBytes,
},
}
c.mu.Unlock()
// Gather matching inbound connections
Expand Down Expand Up @@ -2402,22 +2415,57 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj ...string) {
func (a *Account) statz() *AccountStat {
localConns := a.numLocalConnections()
leafConns := a.numLocalLeafNodes()
return &AccountStat{
Account: a.Name,
Name: a.getNameTagLocked(),
Conns: localConns,
LeafNodes: leafConns,
TotalConns: localConns + leafConns,
NumSubs: a.sl.Count(),
Received: DataStats{
Msgs: atomic.LoadInt64(&a.inMsgs),
Bytes: atomic.LoadInt64(&a.inBytes),

a.stats.Lock()
received := DataStats{
dataStats: dataStats{
Msgs: a.stats.inMsgs,
Bytes: a.stats.inBytes,
},
Sent: DataStats{
Msgs: atomic.LoadInt64(&a.outMsgs),
Bytes: atomic.LoadInt64(&a.outBytes),
Gateways: dataStats{
Msgs: a.stats.gw.inMsgs,
Bytes: a.stats.gw.inBytes,
},
Routes: dataStats{
Msgs: a.stats.rt.inMsgs,
Bytes: a.stats.rt.inBytes,
},
Leafs: dataStats{
Msgs: a.stats.ln.inMsgs,
Bytes: a.stats.ln.inBytes,
},
SlowConsumers: atomic.LoadInt64(&a.slowConsumers),
}
sent := DataStats{
dataStats: dataStats{
Msgs: a.stats.outMsgs,
Bytes: a.stats.outBytes,
},
Gateways: dataStats{
Msgs: a.stats.gw.outMsgs,
Bytes: a.stats.gw.outBytes,
},
Routes: dataStats{
Msgs: a.stats.rt.outMsgs,
Bytes: a.stats.rt.outBytes,
},
Leafs: dataStats{
Msgs: a.stats.ln.outMsgs,
Bytes: a.stats.ln.outBytes,
},
}
slowConsumers := a.stats.slowConsumers
a.stats.Unlock()

return &AccountStat{
Account: a.Name,
Name: a.getNameTagLocked(),
Conns: localConns,
LeafNodes: leafConns,
TotalConns: localConns + leafConns,
NumSubs: a.sl.Count(),
Received: received,
Sent: sent,
SlowConsumers: slowConsumers,
}
}

Expand Down Expand Up @@ -2533,12 +2581,16 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string)
MQTTClient: c.getMQTTClientID(),
},
Sent: DataStats{
Msgs: atomic.LoadInt64(&c.inMsgs),
Bytes: atomic.LoadInt64(&c.inBytes),
dataStats: dataStats{
Msgs: atomic.LoadInt64(&c.inMsgs),
Bytes: atomic.LoadInt64(&c.inBytes),
},
},
Received: DataStats{
Msgs: c.outMsgs,
Bytes: c.outBytes,
dataStats: dataStats{
Msgs: c.outMsgs,
Bytes: c.outBytes,
},
},
Reason: reason,
}
Expand Down Expand Up @@ -2587,12 +2639,16 @@ func (s *Server) sendAuthErrorEvent(c *client) {
MQTTClient: c.getMQTTClientID(),
},
Sent: DataStats{
Msgs: c.inMsgs,
Bytes: c.inBytes,
dataStats: dataStats{
Msgs: c.inMsgs,
Bytes: c.inBytes,
},
},
Received: DataStats{
Msgs: c.outMsgs,
Bytes: c.outBytes,
dataStats: dataStats{
Msgs: c.outMsgs,
Bytes: c.outBytes,
},
},
Reason: AuthenticationViolation.String(),
}
Expand Down Expand Up @@ -2645,12 +2701,16 @@ func (s *Server) sendAccountAuthErrorEvent(c *client, acc *Account, reason strin
MQTTClient: c.getMQTTClientID(),
},
Sent: DataStats{
Msgs: c.inMsgs,
Bytes: c.inBytes,
dataStats: dataStats{
Msgs: c.inMsgs,
Bytes: c.inBytes,
},
},
Received: DataStats{
Msgs: c.outMsgs,
Bytes: c.outBytes,
dataStats: dataStats{
Msgs: c.outMsgs,
Bytes: c.outBytes,
},
},
Reason: reason,
}
Expand Down
8 changes: 4 additions & 4 deletions server/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1292,8 +1292,8 @@ func TestAccountReqMonitoring(t *testing.T) {
// query statz/conns for account
resp, err = ncSys.Request(statz(acc.Name), nil, time.Second)
require_NoError(t, err)
respContentAcc := []string{`"conns":1,`, `"total_conns":1`, `"slow_consumers":0`, `"sent":{"msgs":0,"bytes":0}`,
`"received":{"msgs":0,"bytes":0}`, `"num_subscriptions":`, fmt.Sprintf(`"acc":"%s"`, acc.Name)}
respContentAcc := []string{`"conns":1,`, `"total_conns":1`, `"slow_consumers":0`, `"sent":{"msgs":0,"bytes":0`,
`"received":{"msgs":0,"bytes":0`, `"num_subscriptions":`, fmt.Sprintf(`"acc":"%s"`, acc.Name)}
require_Contains(t, string(resp.Data), respContentAcc...)

rIb := ncSys.NewRespInbox()
Expand Down Expand Up @@ -1350,11 +1350,11 @@ func TestAccountReqMonitoring(t *testing.T) {

// Since we now have processed our own message, sent msgs will be at least 1.
payload := string(resp.Data)
respContentAcc = []string{`"conns":1,`, `"total_conns":1`, `"slow_consumers":0`, `"sent":{"msgs":1,"bytes":0}`, fmt.Sprintf(`"acc":"%s"`, acc.Name)}
respContentAcc = []string{`"conns":1,`, `"total_conns":1`, `"slow_consumers":0`, `"sent":{"msgs":1,"bytes":0`, fmt.Sprintf(`"acc":"%s"`, acc.Name)}
require_Contains(t, payload, respContentAcc...)

// Depending on timing, statz message could be accounted too.
receivedOK := strings.Contains(payload, `"received":{"msgs":1,"bytes":0}`) || strings.Contains(payload, `"received":{"msgs":2,"bytes":0}`)
receivedOK := strings.Contains(payload, `"received":{"msgs":1,"bytes":0`) || strings.Contains(payload, `"received":{"msgs":2,"bytes":0`)
require_True(t, receivedOK)
_, err = rSub.NextMsg(200 * time.Millisecond)
require_Error(t, err)
Expand Down
18 changes: 15 additions & 3 deletions server/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -2732,8 +2732,12 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr
totalBytes -= dlvMsgs * int64(LEN_CR_LF)
}
if acc != nil {
atomic.AddInt64(&acc.outMsgs, dlvMsgs)
atomic.AddInt64(&acc.outBytes, totalBytes)
acc.stats.Lock()
acc.stats.outMsgs += dlvMsgs
acc.stats.outBytes += totalBytes
acc.stats.gw.outMsgs += dlvMsgs
acc.stats.gw.outBytes += totalBytes
acc.stats.Unlock()
}
atomic.AddInt64(&srv.outMsgs, dlvMsgs)
atomic.AddInt64(&srv.outBytes, totalBytes)
Expand Down Expand Up @@ -3077,7 +3081,8 @@ func (c *client) processInboundGatewayMsg(msg []byte) {
// Update statistics
c.in.msgs++
// The msg includes the CR_LF, so pull back out for accounting.
c.in.bytes += int32(len(msg) - LEN_CR_LF)
size := len(msg) - LEN_CR_LF
c.in.bytes += int32(size)

if c.opts.Verbose {
c.sendOK()
Expand All @@ -3102,6 +3107,13 @@ func (c *client) processInboundGatewayMsg(msg []byte) {
return
}

acc.stats.Lock()
acc.stats.inMsgs++
acc.stats.inBytes += int64(size)
acc.stats.gw.inMsgs++
acc.stats.gw.inBytes += int64(size)
acc.stats.Unlock()

// Check if this is a service reply subject (_R_)
noInterest := len(r.psubs) == 0
checkNoInterest := true
Expand Down
Loading
Loading