diff --git a/server/accounts.go b/server/accounts.go index 09656ebce6e..5b84b3b8b62 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -50,7 +50,15 @@ var maxSubLimitReportThreshold = defaultMaxSubLimitReportThreshold // Account are subject namespace definitions. By default no messages are shared between accounts. // You can share via Exports and Imports of Streams and Services. type Account struct { - stats + // Total stats for the account. + stats struct { + sync.Mutex + stats // Totals + gw stats // Gateways + rt stats // Routes + ln stats // Leafnodes + } + gwReplyMapping Name string Nkey string diff --git a/server/client.go b/server/client.go index b21d8e54a33..0a69c486879 100644 --- a/server/client.go +++ b/server/client.go @@ -1446,14 +1446,25 @@ func (c *client) readLoop(pre []byte) { // Updates stats for client and server that were collected // from parsing through the buffer. if c.in.msgs > 0 { - atomic.AddInt64(&c.inMsgs, int64(c.in.msgs)) - atomic.AddInt64(&c.inBytes, int64(c.in.bytes)) + inMsgs := int64(c.in.msgs) + inBytes := int64(c.in.bytes) + + atomic.AddInt64(&c.inMsgs, inMsgs) + atomic.AddInt64(&c.inBytes, inBytes) + if acc != nil { - atomic.AddInt64(&acc.inMsgs, int64(c.in.msgs)) - atomic.AddInt64(&acc.inBytes, int64(c.in.bytes)) + acc.stats.Lock() + acc.stats.inMsgs += inMsgs + acc.stats.inBytes += inBytes + if c.kind == LEAF { + acc.stats.ln.inMsgs += int64(inMsgs) + acc.stats.ln.inBytes += int64(inBytes) + } + acc.stats.Unlock() } - atomic.AddInt64(&s.inMsgs, int64(c.in.msgs)) - atomic.AddInt64(&s.inBytes, int64(c.in.bytes)) + + atomic.AddInt64(&s.inMsgs, inMsgs) + atomic.AddInt64(&s.inBytes, inBytes) } // Signal to writeLoop to flush to socket. @@ -1806,7 +1817,9 @@ func (c *client) handleWriteTimeout(written, attempted int64, numChunks int) boo c.srv.scStats.leafs.Add(1) } if c.acc != nil { - atomic.AddInt64(&c.acc.slowConsumers, 1) + c.acc.stats.Lock() + c.acc.stats.slowConsumers++ + c.acc.stats.Unlock() } c.Noticef("Slow Consumer %s: WriteDeadline of %v exceeded with %d chunks of %d total bytes.", scState, c.out.wdl, numChunks, attempted) @@ -2356,7 +2369,9 @@ func (c *client) queueOutbound(data []byte) { atomic.AddInt64(&c.srv.slowConsumers, 1) c.srv.scStats.clients.Add(1) if c.acc != nil { - atomic.AddInt64(&c.acc.slowConsumers, 1) + c.acc.stats.Lock() + c.acc.stats.slowConsumers++ + c.acc.stats.Unlock() } c.Noticef("Slow Consumer Detected: MaxPending of %d Exceeded", c.out.mp) c.markConnAsClosed(SlowConsumerPendingBytes) @@ -4728,6 +4743,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, // by having an extra size var dlvMsgs int64 var dlvExtraSize int64 + var dlvRouteMsgs int64 + var dlvLeafMsgs int64 // We need to know if this is a MQTT producer because they send messages // without CR_LF (we otherwise remove the size of CR_LF from message size). @@ -4737,15 +4754,33 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, if dlvMsgs == 0 { return } + totalBytes := dlvMsgs*int64(len(msg)) + dlvExtraSize + routeBytes := dlvRouteMsgs*int64(len(msg)) + dlvExtraSize + leafBytes := dlvLeafMsgs*int64(len(msg)) + dlvExtraSize + // For non MQTT producers, remove the CR_LF * number of messages if !prodIsMQTT { totalBytes -= dlvMsgs * int64(LEN_CR_LF) + routeBytes -= dlvRouteMsgs * int64(LEN_CR_LF) + leafBytes -= dlvLeafMsgs * int64(LEN_CR_LF) } + if acc != nil { - atomic.AddInt64(&acc.outMsgs, dlvMsgs) - atomic.AddInt64(&acc.outBytes, totalBytes) + acc.stats.Lock() + acc.stats.outMsgs += dlvMsgs + acc.stats.outBytes += totalBytes + if dlvRouteMsgs > 0 { + acc.stats.rt.outMsgs += dlvRouteMsgs + acc.stats.rt.outBytes += routeBytes + } + if dlvLeafMsgs > 0 { + acc.stats.ln.outMsgs += dlvLeafMsgs + acc.stats.ln.outBytes += leafBytes + } + acc.stats.Unlock() } + if srv := c.srv; srv != nil { atomic.AddInt64(&srv.outMsgs, dlvMsgs) atomic.AddInt64(&srv.outBytes, totalBytes) @@ -5066,6 +5101,12 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, // Update only if not skipped. if !skipDelivery && sub.icb == nil { dlvMsgs++ + switch sub.client.kind { + case ROUTER: + dlvRouteMsgs++ + case LEAF: + dlvLeafMsgs++ + } } // Do the rest even when message delivery was skipped. didDeliver = true @@ -5156,6 +5197,12 @@ sendToRoutesOrLeafs: if c.deliverMsg(prodIsMQTT, rt.sub, acc, subject, reply, mh, dmsg, false) { if rt.sub.icb == nil { dlvMsgs++ + switch dc.kind { + case ROUTER: + dlvRouteMsgs++ + case LEAF: + dlvLeafMsgs++ + } dlvExtraSize += int64(len(dmsg) - len(msg)) } didDeliver = true diff --git a/server/events.go b/server/events.go index 8005a0d2c1a..d579618bb18 100644 --- a/server/events.go +++ b/server/events.go @@ -400,12 +400,19 @@ type GatewayStat struct { NumInbound int `json:"inbound_connections"` } -// DataStats reports how may msg and bytes. Applicable for both sent and received. -type DataStats struct { +type dataStats struct { Msgs int64 `json:"msgs"` Bytes int64 `json:"bytes"` } +// DataStats reports how may msg and bytes. Applicable for both sent and received. +type DataStats struct { + dataStats + Gateways dataStats `json:"gateways,omitempty"` + Routes dataStats `json:"routes,omitempty"` + Leafs dataStats `json:"leafs,omitempty"` +} + // Used for internally queueing up messages that the server wants to send. type pubMsg struct { c *client @@ -842,12 +849,16 @@ func routeStat(r *client) *RouteStat { rs := &RouteStat{ ID: r.cid, Sent: DataStats{ - Msgs: r.outMsgs, - Bytes: r.outBytes, + dataStats: dataStats{ + Msgs: r.outMsgs, + Bytes: r.outBytes, + }, }, Received: DataStats{ - Msgs: atomic.LoadInt64(&r.inMsgs), - Bytes: atomic.LoadInt64(&r.inBytes), + dataStats: dataStats{ + Msgs: atomic.LoadInt64(&r.inMsgs), + Bytes: atomic.LoadInt64(&r.inBytes), + }, }, Pending: int(r.out.pb), } @@ -946,8 +957,10 @@ func (s *Server) sendStatsz(subj string) { // Note that *client.out[Msgs|Bytes] are not set using atomic, // unlike the in[Msgs|bytes]. gs.Sent = DataStats{ - Msgs: c.outMsgs, - Bytes: c.outBytes, + dataStats: dataStats{ + Msgs: c.outMsgs, + Bytes: c.outBytes, + }, } c.mu.Unlock() // Gather matching inbound connections @@ -2402,22 +2415,57 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj ...string) { func (a *Account) statz() *AccountStat { localConns := a.numLocalConnections() leafConns := a.numLocalLeafNodes() - return &AccountStat{ - Account: a.Name, - Name: a.getNameTagLocked(), - Conns: localConns, - LeafNodes: leafConns, - TotalConns: localConns + leafConns, - NumSubs: a.sl.Count(), - Received: DataStats{ - Msgs: atomic.LoadInt64(&a.inMsgs), - Bytes: atomic.LoadInt64(&a.inBytes), + + a.stats.Lock() + received := DataStats{ + dataStats: dataStats{ + Msgs: a.stats.inMsgs, + Bytes: a.stats.inBytes, }, - Sent: DataStats{ - Msgs: atomic.LoadInt64(&a.outMsgs), - Bytes: atomic.LoadInt64(&a.outBytes), + Gateways: dataStats{ + Msgs: a.stats.gw.inMsgs, + Bytes: a.stats.gw.inBytes, + }, + Routes: dataStats{ + Msgs: a.stats.rt.inMsgs, + Bytes: a.stats.rt.inBytes, + }, + Leafs: dataStats{ + Msgs: a.stats.ln.inMsgs, + Bytes: a.stats.ln.inBytes, }, - SlowConsumers: atomic.LoadInt64(&a.slowConsumers), + } + sent := DataStats{ + dataStats: dataStats{ + Msgs: a.stats.outMsgs, + Bytes: a.stats.outBytes, + }, + Gateways: dataStats{ + Msgs: a.stats.gw.outMsgs, + Bytes: a.stats.gw.outBytes, + }, + Routes: dataStats{ + Msgs: a.stats.rt.outMsgs, + Bytes: a.stats.rt.outBytes, + }, + Leafs: dataStats{ + Msgs: a.stats.ln.outMsgs, + Bytes: a.stats.ln.outBytes, + }, + } + slowConsumers := a.stats.slowConsumers + a.stats.Unlock() + + return &AccountStat{ + Account: a.Name, + Name: a.getNameTagLocked(), + Conns: localConns, + LeafNodes: leafConns, + TotalConns: localConns + leafConns, + NumSubs: a.sl.Count(), + Received: received, + Sent: sent, + SlowConsumers: slowConsumers, } } @@ -2533,12 +2581,16 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string) MQTTClient: c.getMQTTClientID(), }, Sent: DataStats{ - Msgs: atomic.LoadInt64(&c.inMsgs), - Bytes: atomic.LoadInt64(&c.inBytes), + dataStats: dataStats{ + Msgs: atomic.LoadInt64(&c.inMsgs), + Bytes: atomic.LoadInt64(&c.inBytes), + }, }, Received: DataStats{ - Msgs: c.outMsgs, - Bytes: c.outBytes, + dataStats: dataStats{ + Msgs: c.outMsgs, + Bytes: c.outBytes, + }, }, Reason: reason, } @@ -2587,12 +2639,16 @@ func (s *Server) sendAuthErrorEvent(c *client) { MQTTClient: c.getMQTTClientID(), }, Sent: DataStats{ - Msgs: c.inMsgs, - Bytes: c.inBytes, + dataStats: dataStats{ + Msgs: c.inMsgs, + Bytes: c.inBytes, + }, }, Received: DataStats{ - Msgs: c.outMsgs, - Bytes: c.outBytes, + dataStats: dataStats{ + Msgs: c.outMsgs, + Bytes: c.outBytes, + }, }, Reason: AuthenticationViolation.String(), } @@ -2645,12 +2701,16 @@ func (s *Server) sendAccountAuthErrorEvent(c *client, acc *Account, reason strin MQTTClient: c.getMQTTClientID(), }, Sent: DataStats{ - Msgs: c.inMsgs, - Bytes: c.inBytes, + dataStats: dataStats{ + Msgs: c.inMsgs, + Bytes: c.inBytes, + }, }, Received: DataStats{ - Msgs: c.outMsgs, - Bytes: c.outBytes, + dataStats: dataStats{ + Msgs: c.outMsgs, + Bytes: c.outBytes, + }, }, Reason: reason, } diff --git a/server/events_test.go b/server/events_test.go index be20373f114..3d90ea12796 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -1292,8 +1292,8 @@ func TestAccountReqMonitoring(t *testing.T) { // query statz/conns for account resp, err = ncSys.Request(statz(acc.Name), nil, time.Second) require_NoError(t, err) - respContentAcc := []string{`"conns":1,`, `"total_conns":1`, `"slow_consumers":0`, `"sent":{"msgs":0,"bytes":0}`, - `"received":{"msgs":0,"bytes":0}`, `"num_subscriptions":`, fmt.Sprintf(`"acc":"%s"`, acc.Name)} + respContentAcc := []string{`"conns":1,`, `"total_conns":1`, `"slow_consumers":0`, `"sent":{"msgs":0,"bytes":0`, + `"received":{"msgs":0,"bytes":0`, `"num_subscriptions":`, fmt.Sprintf(`"acc":"%s"`, acc.Name)} require_Contains(t, string(resp.Data), respContentAcc...) rIb := ncSys.NewRespInbox() @@ -1350,11 +1350,11 @@ func TestAccountReqMonitoring(t *testing.T) { // Since we now have processed our own message, sent msgs will be at least 1. payload := string(resp.Data) - respContentAcc = []string{`"conns":1,`, `"total_conns":1`, `"slow_consumers":0`, `"sent":{"msgs":1,"bytes":0}`, fmt.Sprintf(`"acc":"%s"`, acc.Name)} + respContentAcc = []string{`"conns":1,`, `"total_conns":1`, `"slow_consumers":0`, `"sent":{"msgs":1,"bytes":0`, fmt.Sprintf(`"acc":"%s"`, acc.Name)} require_Contains(t, payload, respContentAcc...) // Depending on timing, statz message could be accounted too. - receivedOK := strings.Contains(payload, `"received":{"msgs":1,"bytes":0}`) || strings.Contains(payload, `"received":{"msgs":2,"bytes":0}`) + receivedOK := strings.Contains(payload, `"received":{"msgs":1,"bytes":0`) || strings.Contains(payload, `"received":{"msgs":2,"bytes":0`) require_True(t, receivedOK) _, err = rSub.NextMsg(200 * time.Millisecond) require_Error(t, err) diff --git a/server/gateway.go b/server/gateway.go index 5f4d3582347..962858f99b0 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -2732,8 +2732,12 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr totalBytes -= dlvMsgs * int64(LEN_CR_LF) } if acc != nil { - atomic.AddInt64(&acc.outMsgs, dlvMsgs) - atomic.AddInt64(&acc.outBytes, totalBytes) + acc.stats.Lock() + acc.stats.outMsgs += dlvMsgs + acc.stats.outBytes += totalBytes + acc.stats.gw.outMsgs += dlvMsgs + acc.stats.gw.outBytes += totalBytes + acc.stats.Unlock() } atomic.AddInt64(&srv.outMsgs, dlvMsgs) atomic.AddInt64(&srv.outBytes, totalBytes) @@ -3077,7 +3081,8 @@ func (c *client) processInboundGatewayMsg(msg []byte) { // Update statistics c.in.msgs++ // The msg includes the CR_LF, so pull back out for accounting. - c.in.bytes += int32(len(msg) - LEN_CR_LF) + size := len(msg) - LEN_CR_LF + c.in.bytes += int32(size) if c.opts.Verbose { c.sendOK() @@ -3102,6 +3107,13 @@ func (c *client) processInboundGatewayMsg(msg []byte) { return } + acc.stats.Lock() + acc.stats.inMsgs++ + acc.stats.inBytes += int64(size) + acc.stats.gw.inMsgs++ + acc.stats.gw.inBytes += int64(size) + acc.stats.Unlock() + // Check if this is a service reply subject (_R_) noInterest := len(r.psubs) == 0 checkNoInterest := true diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index a3ab344477a..07b93e3a556 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -8224,8 +8224,15 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ // If we are using the system account for NRG, add in the extra sent msgs and bytes to our account // so that the end user / account owner has visibility. if node.IsSystemAccount() && mset.acc != nil && r > 1 { - atomic.AddInt64(&mset.acc.outMsgs, int64(r-1)) - atomic.AddInt64(&mset.acc.outBytes, int64(len(esm)*(r-1))) + outMsgs := int64(r - 1) + outBytes := int64(len(esm) * (r - 1)) + + mset.acc.stats.Lock() + mset.acc.stats.outMsgs += outMsgs + mset.acc.stats.outBytes += outBytes + mset.acc.stats.rt.outMsgs += outMsgs + mset.acc.stats.rt.outBytes += outBytes + mset.acc.stats.Unlock() } } diff --git a/server/monitor_test.go b/server/monitor_test.go index 88aa2016b11..a1b8d1953c9 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -4384,7 +4384,6 @@ func runMonitorServerWithOperator(t *testing.T, sysName, accName string) ([]*Ser } cluster { name: c1 - accounts: [%s] listen: %d routes: [ nats-route://127.0.0.1:%d, @@ -4408,7 +4407,7 @@ func runMonitorServerWithOperator(t *testing.T, sysName, accName string) ([]*Ser %s : %s %s : %s } - `, test.port, test.mport, dir, accPub, test.cport, test.route1, test.gport, test.gateway1, test.lport, fmt.Sprintf("n%d", i), ojwt, sysPub, accPub, accJwt, sysPub, sysJwt))) + `, test.port, test.mport, dir, test.cport, test.route1, test.gport, test.gateway1, test.lport, fmt.Sprintf("n%d", i), ojwt, sysPub, accPub, accJwt, sysPub, sysJwt))) s, _ := RunServerWithConfig(conf) servers = append(servers, s) @@ -4467,7 +4466,7 @@ func runMonitorServerWithOperator(t *testing.T, sysName, accName string) ([]*Ser http: 127.0.0.1:%d leafnodes: { remotes: [ - {url: "nats://127.0.0.1:%d", credentials: "%s"}, + {url: "nats://127.0.0.1:%d", credentials: "%s", account: "APP"}, ] } accounts { @@ -4490,6 +4489,8 @@ func runMonitorServerWithOperator(t *testing.T, sysName, accName string) ([]*Ser } checkForJSClusterUp(t, servers[:3]...) + waitForOutboundGateways(t, servers[0], 1, 2*time.Second) + waitForOutboundGateways(t, servers[3], 1, 2*time.Second) return servers, sysKp, accKp } @@ -4597,6 +4598,124 @@ func TestMonitorAccountStatzOperatorMode(t *testing.T) { } } +func TestMonitorAccountStatzDataStatsOperatorMode(t *testing.T) { + sysName := "SYS" + accName := "APP" + + srvs, _, accKp := runMonitorServerWithOperator(t, sysName, accName) + for _, s := range srvs { + defer s.Shutdown() + } + // First three servers are the cluster. + n0 := srvs[0] + n1 := srvs[1] + + // Gateway server. + n3 := srvs[3] + + // Leafnode server. + n4 := srvs[4] + + accPub, _ := accKp.PublicKey() + + _, aCreds := createUser(t, accKp) + + n0c, err := nats.Connect(n0.ClientURL(), nats.UserCredentials(aCreds)) + require_NoError(t, err) + defer n0c.Close() + + n1c, err := nats.Connect(n1.ClientURL(), nats.UserCredentials(aCreds)) + require_NoError(t, err) + defer n1c.Close() + + n3c, err := nats.Connect(n3.ClientURL(), nats.UserCredentials(aCreds)) + require_NoError(t, err) + defer n3c.Close() + + // No auth user for leafnode. + n4c, err := nats.Connect(n4.ClientURL()) + require_NoError(t, err) + defer n4c.Close() + + // Subscription over a route. + _, err = n1c.Subscribe("foo", func(m *nats.Msg) {}) + require_NoError(t, err) + + // Subscription over a gateway. + _, err = n3c.Subscribe("bar", func(m *nats.Msg) {}) + require_NoError(t, err) + + // Subscription over a leafnode. + _, err = n4c.Subscribe("baz", func(m *nats.Msg) {}) + require_NoError(t, err) + + // Subscription propagation. + time.Sleep(10 * time.Millisecond) + + err = n0c.Publish("foo", []byte("Hello")) + require_NoError(t, err) + + err = n0c.Publish("bar", []byte("Hello")) + require_NoError(t, err) + + err = n0c.Publish("baz", []byte("Hello")) + require_NoError(t, err) + + // Publish propagation. + time.Sleep(10 * time.Millisecond) + + for pollMode := 0; pollMode < 2; pollMode++ { + for _, s := range srvs { + a := pollAccountStatz(t, s, pollMode, fmt.Sprintf("http://127.0.0.1:%d%s?unused=1", s.MonitorAddr().Port, AccountStatzPath), &AccountStatzOptions{IncludeUnused: true}) + + for _, acc := range a.Accounts { + if acc.Account != accPub { + continue + } + + switch s.Name() { + case "n0": + // Should have received three messages due to the three publishes. + // Should have sent one over a route to n1 for foo. + // Should have sent one over a gateway to n3 for bar. + // Should have sent one over a leaf node to n4 for baz. + require_Equal(t, acc.Sent.Msgs, 3) + require_Equal(t, acc.Sent.Routes.Msgs, 1) + require_Equal(t, acc.Sent.Gateways.Msgs, 1) + require_Equal(t, acc.Sent.Leafs.Msgs, 1) + require_Equal(t, acc.Received.Msgs, 3) + case "n1": + // Should have sent 1 message to a client. + // Should have received 1 message from n0. + require_Equal(t, acc.Sent.Msgs, 1) + require_Equal(t, acc.Sent.Routes.Msgs, 0) + require_Equal(t, acc.Received.Msgs, 1) + require_Equal(t, acc.Received.Routes.Msgs, 1) + case "n2": + // Should have not received anything. + require_Equal(t, acc.Sent.Msgs, 0) + require_Equal(t, acc.Sent.Bytes, 0) + // Gateway, connected to n0 + case "n3": + // Should have received 1 message from n0. + // Should have sent 1 message to a client. + require_Equal(t, acc.Sent.Msgs, 1) + require_Equal(t, acc.Sent.Gateways.Msgs, 0) + require_Equal(t, acc.Received.Msgs, 1) + require_Equal(t, acc.Received.Gateways.Msgs, 1) + // Leafnode, connected to n0 + case "n4": + // Should have received 1 message from n0. + // Should have sent 1 message to a client. + require_Equal(t, acc.Sent.Msgs, 1) + require_Equal(t, acc.Received.Msgs, 1) + require_Equal(t, acc.Received.Leafs.Msgs, 1) + } + } + } + } +} + func TestMonitorAccountzAccountIssuerUpdate(t *testing.T) { // create an operator set of keys okp, err := nkeys.CreateOperator() diff --git a/server/route.go b/server/route.go index 8196f5b2aa0..2d72e6fe58d 100644 --- a/server/route.go +++ b/server/route.go @@ -449,7 +449,8 @@ func (c *client) processInboundRoutedMsg(msg []byte) { // Update statistics c.in.msgs++ // The msg includes the CR_LF, so pull back out for accounting. - c.in.bytes += int32(len(msg) - LEN_CR_LF) + size := len(msg) - LEN_CR_LF + c.in.bytes += int32(size) if c.opts.Verbose { c.sendOK() @@ -472,6 +473,13 @@ func (c *client) processInboundRoutedMsg(msg []byte) { return } + acc.stats.Lock() + acc.stats.inMsgs++ + acc.stats.inBytes += int64(size) + acc.stats.rt.inMsgs++ + acc.stats.rt.inBytes += int64(size) + acc.stats.Unlock() + // Check for no interest, short circuit if so. // This is the fanout scale. if len(r.psubs)+len(r.qsubs) > 0 { diff --git a/server/server.go b/server/server.go index e7ac469019c..cec99231678 100644 --- a/server/server.go +++ b/server/server.go @@ -388,7 +388,6 @@ type nodeInfo struct { accountNRG bool } -// Make sure all are 64bits for atomic use type stats struct { inMsgs int64 outMsgs int64