Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1530,6 +1530,11 @@ func (c *client) readLoop(pre []byte) {
acc.stats.Unlock()
}

if c.kind == CLIENT {
atomic.AddInt64(&s.inClientMsgs, inMsgs)
atomic.AddInt64(&s.inClientBytes, inBytes)
}

atomic.AddInt64(&s.inMsgs, inMsgs)
atomic.AddInt64(&s.inBytes, inBytes)
}
Expand Down Expand Up @@ -5006,6 +5011,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
var dlvExtraSize int64
var dlvRouteMsgs int64
var dlvLeafMsgs int64
var dlvClientMsgs int64

// We need to know if this is a MQTT producer because they send messages
// without CR_LF (we otherwise remove the size of CR_LF from message size).
Expand All @@ -5019,12 +5025,15 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
totalBytes := dlvMsgs*int64(len(msg)) + dlvExtraSize
routeBytes := dlvRouteMsgs*int64(len(msg)) + dlvExtraSize
leafBytes := dlvLeafMsgs*int64(len(msg)) + dlvExtraSize
// dlvExtraSize applies to route/leaf header overhead, not client deliveries
clientBytes := dlvClientMsgs * int64(len(msg))

// For non MQTT producers, remove the CR_LF * number of messages
if !prodIsMQTT {
totalBytes -= dlvMsgs * int64(LEN_CR_LF)
routeBytes -= dlvRouteMsgs * int64(LEN_CR_LF)
leafBytes -= dlvLeafMsgs * int64(LEN_CR_LF)
clientBytes -= dlvClientMsgs * int64(LEN_CR_LF)
}

if acc != nil {
Expand All @@ -5045,6 +5054,9 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
if srv := c.srv; srv != nil {
atomic.AddInt64(&srv.outMsgs, dlvMsgs)
atomic.AddInt64(&srv.outBytes, totalBytes)

atomic.AddInt64(&srv.outClientMsgs, dlvClientMsgs)
atomic.AddInt64(&srv.outClientBytes, clientBytes)
}
}

Expand Down Expand Up @@ -5140,6 +5152,9 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
// We don't count internal deliveries, so do only when sub.icb is nil.
if sub.icb == nil {
dlvMsgs++
if sub.client.kind == CLIENT {
Comment thread
alexbozhenko marked this conversation as resolved.
dlvClientMsgs++
}
}
didDeliver = true
}
Expand Down Expand Up @@ -5367,6 +5382,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
dlvRouteMsgs++
case LEAF:
dlvLeafMsgs++
case CLIENT:
dlvClientMsgs++
}
}
// Do the rest even when message delivery was skipped.
Expand Down
121 changes: 121 additions & 0 deletions server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3897,3 +3897,124 @@ func TestFlushOutboundS2CompressionPoolBufferRecycling(t *testing.T) {
t.Fatalf("Too many allocs per iteration (%.1f); pool buffers are likely being leaked", allocs)
}
}

func TestClientMsgsMetric(t *testing.T) {
o1 := DefaultOptions()
o1.ServerName = "S1"
s1 := RunServer(o1)
defer s1.Shutdown()

o2 := DefaultOptions()
o2.ServerName = "S2"
o2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o1.Cluster.Port))
s2 := RunServer(o2)
defer s2.Shutdown()

checkClusterFormed(t, s1, s2)

ncS1 := natsConnect(t, s1.ClientURL(), nats.IgnoreDiscoveredServers())
defer ncS1.Close()

ncS2 := natsConnect(t, s2.ClientURL(), nats.IgnoreDiscoveredServers())
defer ncS2.Close()

// Echo the message back
natsSub(t, ncS1, "foo", func(m *nats.Msg) { m.Respond(m.Data) })
natsSub(t, ncS2, "bar", func(m *nats.Msg) { m.Respond(m.Data) })
ncS1.Flush()
ncS2.Flush()

checkSubInterest(t, s1, globalAccountName, "bar", 5*time.Second)
checkSubInterest(t, s2, globalAccountName, "foo", 5*time.Second)

// Request foo and bar from non-local servers,
// to make sure client messages are counted correctly.
fooMsg := "6bytes"
_, err := ncS2.Request("foo", []byte(fooMsg), 5*time.Second)
if err != nil {
t.Fatalf("Error on receiving: %v", err)
}
barMsg := "ninebytes"
_, err = ncS1.Request("bar", []byte(barMsg), 5*time.Second)
if err != nil {
t.Fatalf("Error on receiving: %v", err)
}
err = ncS1.Flush()
if err != nil {
t.Fatalf("Error on flushing connection: %v", err)
}
err = ncS2.Flush()
if err != nil {
t.Fatalf("Error on flushing connection: %v", err)
}

// In/out Msgs/Bytes include routed messages, including STATSZ heartbeats too.
// So there should be at least 2 foo and 2 bar messages received and sent by each server,
// but depending on the test timing, we may also catch some STATSZ messages.
require_True(t, atomic.LoadInt64(&s1.inMsgs) >= 4)
require_True(t, atomic.LoadInt64(&s1.inBytes) >= int64(len(fooMsg)*2+len(barMsg)*2))
require_True(t, atomic.LoadInt64(&s2.inMsgs) >= 4)
require_True(t, atomic.LoadInt64(&s2.inBytes) >= int64(len(fooMsg)*2+len(barMsg)*2))

require_True(t, atomic.LoadInt64(&s1.outMsgs) >= 4)
require_True(t, atomic.LoadInt64(&s1.outBytes) >= int64(len(fooMsg)*2+len(barMsg)*2))
require_True(t, atomic.LoadInt64(&s2.outMsgs) >= 4)
require_True(t, atomic.LoadInt64(&s2.outBytes) >= int64(len(fooMsg)*2+len(barMsg)*2))

// In/out ClientMsgs/Bytes only count client messages.
require_Equal(t, atomic.LoadInt64(&s1.inClientMsgs), 2)
require_Equal(t, atomic.LoadInt64(&s1.inClientBytes), int64(len(fooMsg)+len(barMsg)))
require_Equal(t, atomic.LoadInt64(&s2.inClientMsgs), 2)
require_Equal(t, atomic.LoadInt64(&s2.inClientBytes), int64(len(fooMsg)+len(barMsg)))

require_Equal(t, atomic.LoadInt64(&s1.outClientMsgs), 2)
require_Equal(t, atomic.LoadInt64(&s1.outClientBytes), int64(len(fooMsg)+len(barMsg)))
require_Equal(t, atomic.LoadInt64(&s2.outClientMsgs), 2)
require_Equal(t, atomic.LoadInt64(&s2.outClientBytes), int64(len(fooMsg)+len(barMsg)))

// Now test that messages delivered as part of queue subscriptions are counted correctly
natsQueueSub(t, ncS1, "orders.new", "workers", func(m *nats.Msg) { m.Respond(m.Data) })
natsQueueSub(t, ncS2, "orders.new", "workers", func(m *nats.Msg) { m.Respond(m.Data) })
ncS1.Flush()
ncS2.Flush()

orderMsg := "order"
_, err = ncS1.Request("orders.new", []byte(orderMsg), 5*time.Second)
if err != nil {
t.Fatalf("Error on receiving: %v", err)
}
err = ncS1.Flush()
if err != nil {
t.Fatalf("Error on flushing connection: %v", err)
}
err = ncS2.Flush()
if err != nil {
t.Fatalf("Error on flushing connection: %v", err)
}

// We do not know which client the message will be routed to, so check both cases.
// If the queue subscriber on S1 receives the message, S1 will send total 4 client messages:
// 2 messages from previous step, qsub message, and echo reply.
if atomic.LoadInt64(&s1.outClientMsgs) == 4 {
require_Equal(t, atomic.LoadInt64(&s2.outClientMsgs), 2)

require_Equal(t, atomic.LoadInt64(&s1.outClientBytes),
int64(len(fooMsg)+len(barMsg)+len(orderMsg)*2))

require_Equal(t, atomic.LoadInt64(&s2.outClientBytes),
int64(len(fooMsg)+len(barMsg)))

// If message received by S2, both servers will have 3 messages total.
} else if atomic.LoadInt64(&s1.outClientMsgs) == 3 {
require_Equal(t, atomic.LoadInt64(&s2.outClientMsgs), 3)

require_Equal(t, atomic.LoadInt64(&s1.outClientBytes),
int64(len(fooMsg)+len(barMsg)+len(orderMsg)))

require_Equal(t, atomic.LoadInt64(&s2.outClientBytes),
int64(len(fooMsg)+len(barMsg)+len(orderMsg)))

} else {
t.Fatalf("Did not get expected outClientMsg/Bytes for message sent on qsub")
}
}
6 changes: 6 additions & 0 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,9 @@ type ServerStats struct {
ActiveAccounts int `json:"active_accounts"`
NumSubs uint32 `json:"subscriptions"`
Sent DataStats `json:"sent"`
SentToClients DataStats `json:"sent_to_clients"`
Received DataStats `json:"received"`
ReceivedFromClients DataStats `json:"received_from_clients"`
SlowConsumers int64 `json:"slow_consumers"`
SlowConsumersStats *SlowConsumersStats `json:"slow_consumer_stats,omitempty"`
StaleConnections int64 `json:"stale_connections,omitempty"`
Expand Down Expand Up @@ -946,8 +948,12 @@ func (s *Server) sendStatsz(subj string) {
m.Stats.ActiveAccounts = int(atomic.LoadInt32(&s.activeAccounts))
m.Stats.Received.Msgs = atomic.LoadInt64(&s.inMsgs)
m.Stats.Received.Bytes = atomic.LoadInt64(&s.inBytes)
m.Stats.ReceivedFromClients.Msgs = atomic.LoadInt64(&s.inClientMsgs)
m.Stats.ReceivedFromClients.Bytes = atomic.LoadInt64(&s.inClientBytes)
m.Stats.Sent.Msgs = atomic.LoadInt64(&s.outMsgs)
m.Stats.Sent.Bytes = atomic.LoadInt64(&s.outBytes)
m.Stats.SentToClients.Msgs = atomic.LoadInt64(&s.outClientMsgs)
m.Stats.SentToClients.Bytes = atomic.LoadInt64(&s.outClientBytes)
m.Stats.SlowConsumers = atomic.LoadInt64(&s.slowConsumers)
// Evaluate the slow consumer stats, but set it only if one of the value is not 0.
scs := &SlowConsumersStats{
Expand Down
13 changes: 13 additions & 0 deletions server/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1885,6 +1885,19 @@ func TestServerEventsStatsZ(t *testing.T) {
if m.Stats.Received.Msgs < 1 {
t.Fatalf("Did not match received msgs of >=1, got %d", m.Stats.Received.Msgs)
}
if m.Stats.ReceivedFromClients.Msgs < 1 {
t.Fatalf("Did not match received from client msgs of >=1, got %d", m.Stats.ReceivedFromClients.Msgs)
}
if m.Stats.ReceivedFromClients.Bytes < 1 {
t.Fatalf("Did not match received from client bytes of >=1, got %d", m.Stats.ReceivedFromClients.Bytes)
}
if m.Stats.SentToClients.Msgs < 1 {
t.Fatalf("Did not match sent to client msgs of >= 1, got %d", m.Stats.SentToClients.Msgs)
}
if m.Stats.SentToClients.Bytes < 1 {
t.Fatalf("Did not match sent to client bytes of >= 1, got %d", m.Stats.SentToClients.Bytes)
}

// Default pool size + 1 for system account
expectedRoutes := DEFAULT_ROUTE_POOL_SIZE + 1
if lr := len(m.Stats.Routes); lr != expectedRoutes {
Expand Down
16 changes: 12 additions & 4 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1258,10 +1258,14 @@ type Varz struct {
Routes int `json:"routes"` // Routes is the number of connected route servers
Remotes int `json:"remotes"` // Remotes is the configured route remote endpoints
Leafs int `json:"leafnodes"` // Leafs is the number connected leafnode clients
InMsgs int64 `json:"in_msgs"` // InMsgs is the number of messages this server received
OutMsgs int64 `json:"out_msgs"` // OutMsgs is the number of message this server sent
InBytes int64 `json:"in_bytes"` // InBytes is the number of bytes this server received
OutBytes int64 `json:"out_bytes"` // OutMsgs is the number of bytes this server sent
InMsgs int64 `json:"in_msgs"` // InMsgs is the total number of messages this server received. This includes messages from the clients, routers, gateways and leaf nodes
InBytes int64 `json:"in_bytes"` // InBytes is the total number of bytes this server received. This includes messages from the clients, routers, gateways and leaf nodes
InClientMsgs int64 `json:"in_client_msgs"` // InClientMsgs is the number of messages this server received from the clients
InClientBytes int64 `json:"in_client_bytes"` // InClientBytes is the number of bytes this server received from the clients
OutMsgs int64 `json:"out_msgs"` // OutMsgs is the total number of message this server sent. This includes messages sent to the clients, routers, gateways and leaf nodes
OutBytes int64 `json:"out_bytes"` // OutBytes is the total number of bytes this server sent. This includes messages sent to the clients, routers, gateways and leaf nodes
OutClientMsgs int64 `json:"out_client_msgs"` // OutClientMsgs is the number of messages this server sent to the clients
OutClientBytes int64 `json:"out_client_bytes"` // OutClientBytes is the number of bytes this server sent to the clients
SlowConsumers int64 `json:"slow_consumers"` // SlowConsumers is the total count of clients that were disconnected since start due to being slow consumers
StaleConnections int64 `json:"stale_connections"` // StaleConnections is the total count of stale connections that were detected
StalledClients int64 `json:"stalled_clients"` // StalledClients is the total number of times that clients have been stalled.
Expand Down Expand Up @@ -1862,6 +1866,10 @@ func (s *Server) updateVarzRuntimeFields(v *Varz, forceUpdate bool, pcpu float64
v.InBytes = atomic.LoadInt64(&s.inBytes)
v.OutMsgs = atomic.LoadInt64(&s.outMsgs)
v.OutBytes = atomic.LoadInt64(&s.outBytes)
v.InClientMsgs = atomic.LoadInt64(&s.inClientMsgs)
v.InClientBytes = atomic.LoadInt64(&s.inClientBytes)
v.OutClientMsgs = atomic.LoadInt64(&s.outClientMsgs)
v.OutClientBytes = atomic.LoadInt64(&s.outClientBytes)
v.SlowConsumers = atomic.LoadInt64(&s.slowConsumers)
v.StalledClients = atomic.LoadInt64(&s.stalls)
v.SlowConsumersStats = &SlowConsumersStats{
Expand Down
18 changes: 15 additions & 3 deletions server/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,15 +304,27 @@ func TestMonitorHandleVarz(t *testing.T) {
if v.InMsgs != 1 {
t.Fatalf("Expected InMsgs of 1, got %v\n", v.InMsgs)
}
if v.OutMsgs != 1 {
t.Fatalf("Expected OutMsgs of 1, got %v\n", v.OutMsgs)
}
if v.InBytes != 5 {
t.Fatalf("Expected InBytes of 5, got %v\n", v.InBytes)
}
if v.OutMsgs != 1 {
t.Fatalf("Expected OutMsgs of 1, got %v\n", v.OutMsgs)
}
if v.OutBytes != 5 {
t.Fatalf("Expected OutBytes of 5, got %v\n", v.OutBytes)
}
if v.InClientMsgs != 1 {
t.Fatalf("Expected InClientMsgs of 1, got %v\n", v.InClientMsgs)
}
if v.InClientBytes != 5 {
t.Fatalf("Expected InClientBytes of 5, got %v\n", v.InClientBytes)
}
if v.OutClientMsgs != 1 {
t.Fatalf("Expected OutClientMsgs of 1, got %v\n", v.OutClientMsgs)
}
if v.OutClientBytes != 5 {
t.Fatalf("Expected OutClientBytes of 5, got %v\n", v.OutClientBytes)
}
if v.Subscriptions <= 10 {
t.Fatalf("Expected Subscriptions of at least 10, got %v\n", v.Subscriptions)
}
Expand Down
6 changes: 5 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,13 @@ type nodeInfo struct {

type stats struct {
inMsgs int64
outMsgs int64
inBytes int64
inClientMsgs int64
inClientBytes int64
outMsgs int64
outBytes int64
outClientMsgs int64
outClientBytes int64
Comment thread
alexbozhenko marked this conversation as resolved.
slowConsumers int64
staleConnections int64
stalls int64
Expand Down
Loading