diff --git a/server/monitor.go b/server/monitor.go index 84b173e5bd8..bcd7843051c 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1880,6 +1880,14 @@ type GatewayzOptions struct { // AccountName will limit the list of accounts to that account name (makes Accounts implicit) AccountName string `json:"account_name"` + + // AccountSubscriptions indicates if subscriptions should be included in the results. + // Note: This is used only if `Accounts` or `AccountName` are specified. + AccountSubscriptions bool `json:"subscriptions"` + + // AccountSubscriptionsDetail indicates if subscription details should be included in the results. + // Note: This is used only if `Accounts` or `AccountName` are specified. + AccountSubscriptionsDetail bool `json:"subscriptions_detail"` } // Gatewayz represents detailed information on Gateways @@ -1902,12 +1910,14 @@ type RemoteGatewayz struct { // AccountGatewayz represents interest mode for this account type AccountGatewayz struct { - Name string `json:"name"` - InterestMode string `json:"interest_mode"` - NoInterestCount int `json:"no_interest_count,omitempty"` - InterestOnlyThreshold int `json:"interest_only_threshold,omitempty"` - TotalSubscriptions int `json:"num_subs,omitempty"` - NumQueueSubscriptions int `json:"num_queue_subs,omitempty"` + Name string `json:"name"` + InterestMode string `json:"interest_mode"` + NoInterestCount int `json:"no_interest_count,omitempty"` + InterestOnlyThreshold int `json:"interest_only_threshold,omitempty"` + TotalSubscriptions int `json:"num_subs,omitempty"` + NumQueueSubscriptions int `json:"num_queue_subs,omitempty"` + Subs []string `json:"subscriptions_list,omitempty"` + SubsDetail []SubDetail `json:"subscriptions_list_detail,omitempty"` } // Gatewayz returns a Gatewayz struct containing information about gateways. @@ -2033,14 +2043,14 @@ func createOutboundAccountsGatewayz(opts *GatewayzOptions, gw *gateway) []*Accou if !ok { return nil } - a := createAccountOutboundGatewayz(accName, ei) + a := createAccountOutboundGatewayz(opts, accName, ei) return []*AccountGatewayz{a} } accs := make([]*AccountGatewayz, 0, 4) gw.outsim.Range(func(k, v any) bool { name := k.(string) - a := createAccountOutboundGatewayz(name, v) + a := createAccountOutboundGatewayz(opts, name, v) accs = append(accs, a) return true }) @@ -2048,7 +2058,7 @@ func createOutboundAccountsGatewayz(opts *GatewayzOptions, gw *gateway) []*Accou } // Returns an AccountGatewayz for this gateway outbound connection -func createAccountOutboundGatewayz(name string, ei any) *AccountGatewayz { +func createAccountOutboundGatewayz(opts *GatewayzOptions, name string, ei any) *AccountGatewayz { a := &AccountGatewayz{ Name: name, InterestOnlyThreshold: gatewayMaxRUnsubBeforeSwitch, @@ -2060,6 +2070,23 @@ func createAccountOutboundGatewayz(name string, ei any) *AccountGatewayz { a.NoInterestCount = len(e.ni) a.NumQueueSubscriptions = e.qsubs a.TotalSubscriptions = int(e.sl.Count()) + if opts.AccountSubscriptions || opts.AccountSubscriptionsDetail { + var subsa [4096]*subscription + subs := subsa[:0] + e.sl.All(&subs) + if opts.AccountSubscriptions { + a.Subs = make([]string, 0, len(subs)) + } else { + a.SubsDetail = make([]SubDetail, 0, len(subs)) + } + for _, sub := range subs { + if opts.AccountSubscriptions { + a.Subs = append(a.Subs, string(sub.subject)) + } else { + a.SubsDetail = append(a.SubsDetail, newClientSubDetail(sub)) + } + } + } e.RUnlock() } else { a.InterestMode = Optimistic.String() @@ -2151,6 +2178,10 @@ func (s *Server) HandleGatewayz(w http.ResponseWriter, r *http.Request) { s.httpReqStats[GatewayzPath]++ s.mu.Unlock() + subs, subsDet, err := decodeSubs(w, r) + if err != nil { + return + } accs, err := decodeBool(w, r, "accs") if err != nil { return @@ -2162,9 +2193,11 @@ func (s *Server) HandleGatewayz(w http.ResponseWriter, r *http.Request) { } opts := &GatewayzOptions{ - Name: gwName, - Accounts: accs, - AccountName: accName, + Name: gwName, + Accounts: accs, + AccountName: accName, + AccountSubscriptions: subs, + AccountSubscriptionsDetail: subsDet, } gw, err := s.Gatewayz(opts) if err != nil { diff --git a/server/monitor_test.go b/server/monitor_test.go index 5bd6ae41be9..db32fc010c4 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -3756,6 +3756,149 @@ func TestMonitorGatewayzAccounts(t *testing.T) { }) } +func TestMonitorGatewayzWithSubs(t *testing.T) { + resetPreviousHTTPConnections() + + ob := testDefaultOptionsForGateway("B") + aA := NewAccount("A") + aB := NewAccount("B") + ob.Accounts = append(ob.Accounts, aA, aB) + ob.Users = append(ob.Users, + &User{Username: "a", Password: "a", Account: aA}, + &User{Username: "b", Password: "b", Account: aB}) + sb := runGatewayServer(ob) + defer sb.Shutdown() + + oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb) + oa.HTTPHost = "127.0.0.1" + oa.HTTPPort = MONITOR_PORT + aA = NewAccount("A") + aB = NewAccount("B") + oa.Accounts = append(oa.Accounts, aA, aB) + oa.Users = append(oa.Users, + &User{Username: "a", Password: "a", Account: aA}, + &User{Username: "b", Password: "b", Account: aB}) + sa := runGatewayServer(oa) + defer sa.Shutdown() + + waitForOutboundGateways(t, sa, 1, 2*time.Second) + waitForInboundGateways(t, sa, 1, 2*time.Second) + + waitForOutboundGateways(t, sb, 1, 2*time.Second) + waitForInboundGateways(t, sb, 1, 2*time.Second) + + ncA := natsConnect(t, sb.ClientURL(), nats.UserInfo("a", "a")) + defer ncA.Close() + natsSubSync(t, ncA, "foo") + natsFlush(t, ncA) + + ncB := natsConnect(t, sb.ClientURL(), nats.UserInfo("b", "b")) + defer ncB.Close() + natsSubSync(t, ncB, "foo") + natsQueueSubSync(t, ncB, "bar", "baz") + natsFlush(t, ncB) + + checkGWInterestOnlyModeInterestOn(t, sa, "B", "A", "foo") + checkGWInterestOnlyModeInterestOn(t, sa, "B", "B", "foo") + checkForRegisteredQSubInterest(t, sa, "B", "B", "bar", 1, time.Second) + + for _, test := range []struct { + url string + allAccs bool + opts *GatewayzOptions + }{ + {"accs=1&subs=1", true, &GatewayzOptions{Accounts: true, AccountSubscriptions: true}}, + {"accs=1&subs=detail", true, &GatewayzOptions{Accounts: true, AccountSubscriptionsDetail: true}}, + {"acc_name=B&subs=1", false, &GatewayzOptions{AccountName: "B", AccountSubscriptions: true}}, + {"acc_name=B&subs=detail", false, &GatewayzOptions{AccountName: "B", AccountSubscriptionsDetail: true}}, + } { + t.Run(test.url, func(t *testing.T) { + gatewayzURL := fmt.Sprintf("http://127.0.0.1:%d/gatewayz?%s", sa.MonitorAddr().Port, test.url) + for pollMode := 0; pollMode < 2; pollMode++ { + gw := pollGatewayz(t, sa, pollMode, gatewayzURL, test.opts) + require_Equal(t, len(gw.OutboundGateways), 1) + ogw, ok := gw.OutboundGateways["B"] + require_True(t, ok) + require_NotNil(t, ogw) + var expected int + if test.allAccs { + expected = 3 // A + B + $G + } else { + expected = 1 // B + } + require_Len(t, len(ogw.Accounts), expected) + accs := map[string]*AccountGatewayz{} + for _, a := range ogw.Accounts { + // Do not include the global account there. + if a.Name == globalAccountName { + continue + } + accs[a.Name] = a + } + // Update the expected number of accounts if we asked for all accounts. + if test.allAccs { + expected-- + } + // The account B should always be present. + _, ok = accs["B"] + require_True(t, ok) + if expected == 2 { + _, ok = accs["A"] + require_True(t, ok) + } + // Now that we know we have the proper account(s), check the content. + for n, a := range accs { + require_NotNil(t, a) + require_Equal(t, a.Name, n) + totalSubs := 1 + var numQueueSubs int + if n == "B" { + totalSubs++ + numQueueSubs = 1 + } + require_Equal(t, a.TotalSubscriptions, totalSubs) + require_Equal(t, a.NumQueueSubscriptions, numQueueSubs) + + m := map[string]*SubDetail{} + if test.opts.AccountSubscriptions { + require_Len(t, len(a.Subs), totalSubs) + require_Len(t, len(a.SubsDetail), 0) + for _, sub := range a.Subs { + m[sub] = nil + } + } else { + require_Len(t, len(a.Subs), 0) + require_Len(t, len(a.SubsDetail), totalSubs) + for _, sub := range a.SubsDetail { + m[sub.Subject] = &sub + } + } + sd, ok := m["foo"] + require_True(t, ok) + if test.opts.AccountSubscriptionsDetail { + require_NotNil(t, sd) + require_Equal(t, sd.Queue, _EMPTY_) + } else { + require_True(t, sd == nil) + } + sd, ok = m["bar"] + if numQueueSubs == 1 { + require_True(t, ok) + if test.opts.AccountSubscriptionsDetail { + require_NotNil(t, sd) + require_Equal(t, sd.Queue, "baz") + } else { + require_True(t, sd == nil) + } + } else { + require_False(t, ok) + } + } + } + }) + } +} + func TestMonitorRoutezRTT(t *testing.T) { // Do not change default PingInterval and expect RTT to still be reported diff --git a/test/ocsp_peer_test.go b/test/ocsp_peer_test.go index 60b8ad892ec..a6b2e3269ba 100644 --- a/test/ocsp_peer_test.go +++ b/test/ocsp_peer_test.go @@ -1475,7 +1475,6 @@ func TestOCSPResponseCacheMonitor(t *testing.T) { s, _ := RunServerWithConfig(conf) defer s.Shutdown() v := monitorGetVarzHelper(t, 8222) - fmt.Println("Expect:", test.expect) var ct string if v.OCSPResponseCache != nil { ct = v.OCSPResponseCache.Type