diff --git a/server/client.go b/server/client.go index 21e3f536998..8e1e1825704 100644 --- a/server/client.go +++ b/server/client.go @@ -1530,6 +1530,11 @@ func (c *client) readLoop(pre []byte) { acc.stats.Unlock() } + if c.kind == CLIENT { + atomic.AddInt64(&s.inClientMsgs, inMsgs) + atomic.AddInt64(&s.inClientBytes, inBytes) + } + atomic.AddInt64(&s.inMsgs, inMsgs) atomic.AddInt64(&s.inBytes, inBytes) } @@ -5006,6 +5011,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, var dlvExtraSize int64 var dlvRouteMsgs int64 var dlvLeafMsgs int64 + var dlvClientMsgs int64 // We need to know if this is a MQTT producer because they send messages // without CR_LF (we otherwise remove the size of CR_LF from message size). @@ -5019,12 +5025,15 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, totalBytes := dlvMsgs*int64(len(msg)) + dlvExtraSize routeBytes := dlvRouteMsgs*int64(len(msg)) + dlvExtraSize leafBytes := dlvLeafMsgs*int64(len(msg)) + dlvExtraSize + // dlvExtraSize applies to route/leaf header overhead, not client deliveries + clientBytes := dlvClientMsgs * int64(len(msg)) // For non MQTT producers, remove the CR_LF * number of messages if !prodIsMQTT { totalBytes -= dlvMsgs * int64(LEN_CR_LF) routeBytes -= dlvRouteMsgs * int64(LEN_CR_LF) leafBytes -= dlvLeafMsgs * int64(LEN_CR_LF) + clientBytes -= dlvClientMsgs * int64(LEN_CR_LF) } if acc != nil { @@ -5045,6 +5054,9 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, if srv := c.srv; srv != nil { atomic.AddInt64(&srv.outMsgs, dlvMsgs) atomic.AddInt64(&srv.outBytes, totalBytes) + + atomic.AddInt64(&srv.outClientMsgs, dlvClientMsgs) + atomic.AddInt64(&srv.outClientBytes, clientBytes) } } @@ -5140,6 +5152,9 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, // We don't count internal deliveries, so do only when sub.icb is nil. if sub.icb == nil { dlvMsgs++ + if sub.client.kind == CLIENT { + dlvClientMsgs++ + } } didDeliver = true } @@ -5367,6 +5382,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, dlvRouteMsgs++ case LEAF: dlvLeafMsgs++ + case CLIENT: + dlvClientMsgs++ } } // Do the rest even when message delivery was skipped. diff --git a/server/client_test.go b/server/client_test.go index 65363fc3b22..f0855854bb3 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -3897,3 +3897,124 @@ func TestFlushOutboundS2CompressionPoolBufferRecycling(t *testing.T) { t.Fatalf("Too many allocs per iteration (%.1f); pool buffers are likely being leaked", allocs) } } + +func TestClientMsgsMetric(t *testing.T) { + o1 := DefaultOptions() + o1.ServerName = "S1" + s1 := RunServer(o1) + defer s1.Shutdown() + + o2 := DefaultOptions() + o2.ServerName = "S2" + o2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o1.Cluster.Port)) + s2 := RunServer(o2) + defer s2.Shutdown() + + checkClusterFormed(t, s1, s2) + + ncS1 := natsConnect(t, s1.ClientURL(), nats.IgnoreDiscoveredServers()) + defer ncS1.Close() + + ncS2 := natsConnect(t, s2.ClientURL(), nats.IgnoreDiscoveredServers()) + defer ncS2.Close() + + // Echo the message back + natsSub(t, ncS1, "foo", func(m *nats.Msg) { m.Respond(m.Data) }) + natsSub(t, ncS2, "bar", func(m *nats.Msg) { m.Respond(m.Data) }) + ncS1.Flush() + ncS2.Flush() + + checkSubInterest(t, s1, globalAccountName, "bar", 5*time.Second) + checkSubInterest(t, s2, globalAccountName, "foo", 5*time.Second) + + // Request foo and bar from non-local servers, + // to make sure client messages are counted correctly. + fooMsg := "6bytes" + _, err := ncS2.Request("foo", []byte(fooMsg), 5*time.Second) + if err != nil { + t.Fatalf("Error on receiving: %v", err) + } + barMsg := "ninebytes" + _, err = ncS1.Request("bar", []byte(barMsg), 5*time.Second) + if err != nil { + t.Fatalf("Error on receiving: %v", err) + } + err = ncS1.Flush() + if err != nil { + t.Fatalf("Error on flushing connection: %v", err) + } + err = ncS2.Flush() + if err != nil { + t.Fatalf("Error on flushing connection: %v", err) + } + + // In/out Msgs/Bytes include routed messages, including STATSZ heartbeats too. + // So there should be at least 2 foo and 2 bar messages received and sent by each server, + // but depending on the test timing, we may also catch some STATSZ messages. + require_True(t, atomic.LoadInt64(&s1.inMsgs) >= 4) + require_True(t, atomic.LoadInt64(&s1.inBytes) >= int64(len(fooMsg)*2+len(barMsg)*2)) + require_True(t, atomic.LoadInt64(&s2.inMsgs) >= 4) + require_True(t, atomic.LoadInt64(&s2.inBytes) >= int64(len(fooMsg)*2+len(barMsg)*2)) + + require_True(t, atomic.LoadInt64(&s1.outMsgs) >= 4) + require_True(t, atomic.LoadInt64(&s1.outBytes) >= int64(len(fooMsg)*2+len(barMsg)*2)) + require_True(t, atomic.LoadInt64(&s2.outMsgs) >= 4) + require_True(t, atomic.LoadInt64(&s2.outBytes) >= int64(len(fooMsg)*2+len(barMsg)*2)) + + // In/out ClientMsgs/Bytes only count client messages. + require_Equal(t, atomic.LoadInt64(&s1.inClientMsgs), 2) + require_Equal(t, atomic.LoadInt64(&s1.inClientBytes), int64(len(fooMsg)+len(barMsg))) + require_Equal(t, atomic.LoadInt64(&s2.inClientMsgs), 2) + require_Equal(t, atomic.LoadInt64(&s2.inClientBytes), int64(len(fooMsg)+len(barMsg))) + + require_Equal(t, atomic.LoadInt64(&s1.outClientMsgs), 2) + require_Equal(t, atomic.LoadInt64(&s1.outClientBytes), int64(len(fooMsg)+len(barMsg))) + require_Equal(t, atomic.LoadInt64(&s2.outClientMsgs), 2) + require_Equal(t, atomic.LoadInt64(&s2.outClientBytes), int64(len(fooMsg)+len(barMsg))) + + // Now test that messages delivered as part of queue subscriptions are counted correctly + natsQueueSub(t, ncS1, "orders.new", "workers", func(m *nats.Msg) { m.Respond(m.Data) }) + natsQueueSub(t, ncS2, "orders.new", "workers", func(m *nats.Msg) { m.Respond(m.Data) }) + ncS1.Flush() + ncS2.Flush() + + orderMsg := "order" + _, err = ncS1.Request("orders.new", []byte(orderMsg), 5*time.Second) + if err != nil { + t.Fatalf("Error on receiving: %v", err) + } + err = ncS1.Flush() + if err != nil { + t.Fatalf("Error on flushing connection: %v", err) + } + err = ncS2.Flush() + if err != nil { + t.Fatalf("Error on flushing connection: %v", err) + } + + // We do not know which client the message will be routed to, so check both cases. + // If the queue subscriber on S1 receives the message, S1 will send total 4 client messages: + // 2 messages from previous step, qsub message, and echo reply. + if atomic.LoadInt64(&s1.outClientMsgs) == 4 { + require_Equal(t, atomic.LoadInt64(&s2.outClientMsgs), 2) + + require_Equal(t, atomic.LoadInt64(&s1.outClientBytes), + int64(len(fooMsg)+len(barMsg)+len(orderMsg)*2)) + + require_Equal(t, atomic.LoadInt64(&s2.outClientBytes), + int64(len(fooMsg)+len(barMsg))) + + // If message received by S2, both servers will have 3 messages total. + } else if atomic.LoadInt64(&s1.outClientMsgs) == 3 { + require_Equal(t, atomic.LoadInt64(&s2.outClientMsgs), 3) + + require_Equal(t, atomic.LoadInt64(&s1.outClientBytes), + int64(len(fooMsg)+len(barMsg)+len(orderMsg))) + + require_Equal(t, atomic.LoadInt64(&s2.outClientBytes), + int64(len(fooMsg)+len(barMsg)+len(orderMsg))) + + } else { + t.Fatalf("Did not get expected outClientMsg/Bytes for message sent on qsub") + } +} diff --git a/server/events.go b/server/events.go index a484ddc7d82..ce4fef4f8c1 100644 --- a/server/events.go +++ b/server/events.go @@ -373,7 +373,9 @@ type ServerStats struct { ActiveAccounts int `json:"active_accounts"` NumSubs uint32 `json:"subscriptions"` Sent DataStats `json:"sent"` + SentToClients DataStats `json:"sent_to_clients"` Received DataStats `json:"received"` + ReceivedFromClients DataStats `json:"received_from_clients"` SlowConsumers int64 `json:"slow_consumers"` SlowConsumersStats *SlowConsumersStats `json:"slow_consumer_stats,omitempty"` StaleConnections int64 `json:"stale_connections,omitempty"` @@ -946,8 +948,12 @@ func (s *Server) sendStatsz(subj string) { m.Stats.ActiveAccounts = int(atomic.LoadInt32(&s.activeAccounts)) m.Stats.Received.Msgs = atomic.LoadInt64(&s.inMsgs) m.Stats.Received.Bytes = atomic.LoadInt64(&s.inBytes) + m.Stats.ReceivedFromClients.Msgs = atomic.LoadInt64(&s.inClientMsgs) + m.Stats.ReceivedFromClients.Bytes = atomic.LoadInt64(&s.inClientBytes) m.Stats.Sent.Msgs = atomic.LoadInt64(&s.outMsgs) m.Stats.Sent.Bytes = atomic.LoadInt64(&s.outBytes) + m.Stats.SentToClients.Msgs = atomic.LoadInt64(&s.outClientMsgs) + m.Stats.SentToClients.Bytes = atomic.LoadInt64(&s.outClientBytes) m.Stats.SlowConsumers = atomic.LoadInt64(&s.slowConsumers) // Evaluate the slow consumer stats, but set it only if one of the value is not 0. scs := &SlowConsumersStats{ diff --git a/server/events_test.go b/server/events_test.go index 68d2b83c55a..52d5500e822 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -1885,6 +1885,19 @@ func TestServerEventsStatsZ(t *testing.T) { if m.Stats.Received.Msgs < 1 { t.Fatalf("Did not match received msgs of >=1, got %d", m.Stats.Received.Msgs) } + if m.Stats.ReceivedFromClients.Msgs < 1 { + t.Fatalf("Did not match received from client msgs of >=1, got %d", m.Stats.ReceivedFromClients.Msgs) + } + if m.Stats.ReceivedFromClients.Bytes < 1 { + t.Fatalf("Did not match received from client bytes of >=1, got %d", m.Stats.ReceivedFromClients.Bytes) + } + if m.Stats.SentToClients.Msgs < 1 { + t.Fatalf("Did not match sent to client msgs of >= 1, got %d", m.Stats.SentToClients.Msgs) + } + if m.Stats.SentToClients.Bytes < 1 { + t.Fatalf("Did not match sent to client bytes of >= 1, got %d", m.Stats.SentToClients.Bytes) + } + // Default pool size + 1 for system account expectedRoutes := DEFAULT_ROUTE_POOL_SIZE + 1 if lr := len(m.Stats.Routes); lr != expectedRoutes { diff --git a/server/monitor.go b/server/monitor.go index 4475f48be09..eebfe55b1ce 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1258,10 +1258,14 @@ type Varz struct { Routes int `json:"routes"` // Routes is the number of connected route servers Remotes int `json:"remotes"` // Remotes is the configured route remote endpoints Leafs int `json:"leafnodes"` // Leafs is the number connected leafnode clients - InMsgs int64 `json:"in_msgs"` // InMsgs is the number of messages this server received - OutMsgs int64 `json:"out_msgs"` // OutMsgs is the number of message this server sent - InBytes int64 `json:"in_bytes"` // InBytes is the number of bytes this server received - OutBytes int64 `json:"out_bytes"` // OutMsgs is the number of bytes this server sent + InMsgs int64 `json:"in_msgs"` // InMsgs is the total number of messages this server received. This includes messages from the clients, routers, gateways and leaf nodes + InBytes int64 `json:"in_bytes"` // InBytes is the total number of bytes this server received. This includes messages from the clients, routers, gateways and leaf nodes + InClientMsgs int64 `json:"in_client_msgs"` // InClientMsgs is the number of messages this server received from the clients + InClientBytes int64 `json:"in_client_bytes"` // InClientBytes is the number of bytes this server received from the clients + OutMsgs int64 `json:"out_msgs"` // OutMsgs is the total number of message this server sent. This includes messages sent to the clients, routers, gateways and leaf nodes + OutBytes int64 `json:"out_bytes"` // OutBytes is the total number of bytes this server sent. This includes messages sent to the clients, routers, gateways and leaf nodes + OutClientMsgs int64 `json:"out_client_msgs"` // OutClientMsgs is the number of messages this server sent to the clients + OutClientBytes int64 `json:"out_client_bytes"` // OutClientBytes is the number of bytes this server sent to the clients SlowConsumers int64 `json:"slow_consumers"` // SlowConsumers is the total count of clients that were disconnected since start due to being slow consumers StaleConnections int64 `json:"stale_connections"` // StaleConnections is the total count of stale connections that were detected StalledClients int64 `json:"stalled_clients"` // StalledClients is the total number of times that clients have been stalled. @@ -1862,6 +1866,10 @@ func (s *Server) updateVarzRuntimeFields(v *Varz, forceUpdate bool, pcpu float64 v.InBytes = atomic.LoadInt64(&s.inBytes) v.OutMsgs = atomic.LoadInt64(&s.outMsgs) v.OutBytes = atomic.LoadInt64(&s.outBytes) + v.InClientMsgs = atomic.LoadInt64(&s.inClientMsgs) + v.InClientBytes = atomic.LoadInt64(&s.inClientBytes) + v.OutClientMsgs = atomic.LoadInt64(&s.outClientMsgs) + v.OutClientBytes = atomic.LoadInt64(&s.outClientBytes) v.SlowConsumers = atomic.LoadInt64(&s.slowConsumers) v.StalledClients = atomic.LoadInt64(&s.stalls) v.SlowConsumersStats = &SlowConsumersStats{ diff --git a/server/monitor_test.go b/server/monitor_test.go index 95131f4011c..ccb225b3372 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -304,15 +304,27 @@ func TestMonitorHandleVarz(t *testing.T) { if v.InMsgs != 1 { t.Fatalf("Expected InMsgs of 1, got %v\n", v.InMsgs) } - if v.OutMsgs != 1 { - t.Fatalf("Expected OutMsgs of 1, got %v\n", v.OutMsgs) - } if v.InBytes != 5 { t.Fatalf("Expected InBytes of 5, got %v\n", v.InBytes) } + if v.OutMsgs != 1 { + t.Fatalf("Expected OutMsgs of 1, got %v\n", v.OutMsgs) + } if v.OutBytes != 5 { t.Fatalf("Expected OutBytes of 5, got %v\n", v.OutBytes) } + if v.InClientMsgs != 1 { + t.Fatalf("Expected InClientMsgs of 1, got %v\n", v.InClientMsgs) + } + if v.InClientBytes != 5 { + t.Fatalf("Expected InClientBytes of 5, got %v\n", v.InClientBytes) + } + if v.OutClientMsgs != 1 { + t.Fatalf("Expected OutClientMsgs of 1, got %v\n", v.OutClientMsgs) + } + if v.OutClientBytes != 5 { + t.Fatalf("Expected OutClientBytes of 5, got %v\n", v.OutClientBytes) + } if v.Subscriptions <= 10 { t.Fatalf("Expected Subscriptions of at least 10, got %v\n", v.Subscriptions) } diff --git a/server/server.go b/server/server.go index 0fce7f51c96..a1d14e74922 100644 --- a/server/server.go +++ b/server/server.go @@ -402,9 +402,13 @@ type nodeInfo struct { type stats struct { inMsgs int64 - outMsgs int64 inBytes int64 + inClientMsgs int64 + inClientBytes int64 + outMsgs int64 outBytes int64 + outClientMsgs int64 + outClientBytes int64 slowConsumers int64 staleConnections int64 stalls int64