diff --git a/server/monitor.go b/server/monitor.go index 1173f4e75d9..87e2e5420ba 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2902,6 +2902,7 @@ type JSzOptions struct { Accounts bool `json:"accounts,omitempty"` Streams bool `json:"streams,omitempty"` Consumer bool `json:"consumer,omitempty"` + DirectConsumer bool `json:"direct_consumer,omitempty"` Config bool `json:"config,omitempty"` LeaderOnly bool `json:"leader_only,omitempty"` Offset int `json:"offset,omitempty"` @@ -2950,6 +2951,7 @@ type StreamDetail struct { Config *StreamConfig `json:"config,omitempty"` State StreamState `json:"state,omitempty"` Consumer []*ConsumerInfo `json:"consumer_detail,omitempty"` + DirectConsumer []*ConsumerInfo `json:"direct_consumer_detail,omitempty"` Mirror *StreamSourceInfo `json:"mirror,omitempty"` Sources []*StreamSourceInfo `json:"sources,omitempty"` RaftGroup string `json:"stream_raft_group,omitempty"` @@ -3007,7 +3009,7 @@ type JSInfo struct { Total int `json:"total"` } -func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, optRaft, optStreamLeader bool) *AccountDetail { +func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optDirectConsumers, optCfg, optRaft, optStreamLeader bool) *AccountDetail { jsa.mu.RLock() acc := jsa.account name := acc.GetName() @@ -3089,6 +3091,18 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, } } } + if optDirectConsumers { + for _, consumer := range stream.getDirectConsumers() { + cInfo := consumer.info() + if cInfo == nil { + continue + } + if !optCfg { + cInfo.Config = nil + } + sdet.DirectConsumer = append(sdet.Consumer, cInfo) + } + } } detail.Streams = append(detail.Streams, sdet) } @@ -3112,7 +3126,7 @@ func (s *Server) JszAccount(opts *JSzOptions) (*AccountDetail, error) { if !ok { return nil, fmt.Errorf("account %q not jetstream enabled", acc) } - return s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups, opts.StreamLeaderOnly), nil + return s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.DirectConsumer, opts.Config, opts.RaftGroups, opts.StreamLeaderOnly), nil } // helper to get cluster info from node via dummy group @@ -3280,7 +3294,7 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) { jsi.AccountDetails = make([]*AccountDetail, 0, len(accounts)) for _, jsa := range accounts { - detail := s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups, opts.StreamLeaderOnly) + detail := s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.DirectConsumer, opts.Config, opts.RaftGroups, opts.StreamLeaderOnly) jsi.AccountDetails = append(jsi.AccountDetails, detail) } } @@ -3305,6 +3319,10 @@ func (s *Server) HandleJsz(w http.ResponseWriter, r *http.Request) { if err != nil { return } + directConsumers, err := decodeBool(w, r, "direct-consumers") + if err != nil { + return + } config, err := decodeBool(w, r, "config") if err != nil { return @@ -3336,6 +3354,7 @@ func (s *Server) HandleJsz(w http.ResponseWriter, r *http.Request) { Accounts: accounts, Streams: streams, Consumer: consumers, + DirectConsumer: directConsumers, Config: config, LeaderOnly: leader, Offset: offset, diff --git a/server/monitor_test.go b/server/monitor_test.go index 6adf9eee47d..7983fa1dc16 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -5241,6 +5241,23 @@ func TestMonitorJsz(t *testing.T) { } } }) + t.Run("direct-consumers", func(t *testing.T) { + for _, url := range []string{monUrl1} { + info := readJsInfo(url + "?acc=ACC&consumers=true&direct-consumers=true") + if len(info.AccountDetails) != 1 { + t.Fatalf("expected account ACC to be returned by %s but got %v", url, info) + } + // It could take time for the sourcing to set up. + checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { + if !slices.ContainsFunc(info.AccountDetails[0].Streams, func(stream StreamDetail) bool { + return len(stream.DirectConsumer) > 0 + }) { + return fmt.Errorf("expected direct consumer info in detail returned by %v", url) + } + return nil + }) + } + }) t.Run("config", func(t *testing.T) { for _, url := range []string{monUrl1, monUrl2} { info := readJsInfo(url + "?acc=ACC&consumers=true&config=true") diff --git a/server/stream.go b/server/stream.go index a3aad6ce296..c46f7e12cb3 100644 --- a/server/stream.go +++ b/server/stream.go @@ -7198,6 +7198,20 @@ func (mset *stream) getPublicConsumers() []*consumer { return obs } +// This returns all consumers that are DIRECT. +func (mset *stream) getDirectConsumers() []*consumer { + mset.clsMu.RLock() + defer mset.clsMu.RUnlock() + + var obs []*consumer + for _, o := range mset.cList { + if o.cfg.Direct { + obs = append(obs, o) + } + } + return obs +} + // 2 minutes plus up to 30s jitter. const ( defaultCheckInterestStateT = 2 * time.Minute